
因为在划分子轨迹中,主要使用的字段是mmsi号、位置、速度、时间,以及划分的特征点、子轨迹段,所以只需要这几个属性即可,重写toString方法,重写序列化和反序列化方法
// bean类
class SubTrajectorBean implements Writable{
private String MMSI;
private Double Lat_d;
private Double Lon_d;
private Long unixTime;
private Integer label = -1;
private String subTrajector = null;
public String toString(){
return MMSI + "," + Lat_d + "," + Lon_d + "," + unixTime + "," + label + "," + subTrajector;
}
重写序列化和反序列化方法
// 序列化方法
@override
public void write(DataOutput dataOutput) throw IOException{
dataOutput.writeUTF(MMSI);
dataOutput.writeDouble(Lat_d);
dataOutput.writeDouble(Lon_d);
dataOutput.writeLong(unixTime);
dataOutput.writeInt(label);
dataOutput.writeUTF(subTrajector);
}
// 反序列化方法
@override
public void readFields(DataInput dataInput) throw IOException{
this.MMSI = dataInput.readUTF();
this.Lat_d = dataInput.readDouble();
this.Lon_d = dataInput.readDouble();
this.unixTime = dataInput.readLong();
this.label = dataInput.readInt();
this.subTrajector = dataInput.readUTF();
}
Map阶段
map阶段主要是过滤速度阈值,将速度小于3kn的数据点看作抛锚点过滤
// Mapper public class SubTrajectorMapper extend MapperReduce阶段{ // 输出key、value private Text outK = new Text(); private SubTrajectorBean outV = new SubTrajectorBean(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ // 转为字符串 String[] comments = value.toString(); // 判断速度是否大于3Kn,大于输出,否则过滤 if(comments[5] > 3){ String MMSI = comments[1]; Double Lat_d = Double.parseDouble(comment[7]); Double Lon_d = Double.parseDouble(comment[8]); Double unixTime = Long.parseLong(comments[9]); // 封装bean对象 outV.setMMMSI(MMSI); outV.setLat_d(Lat_d); outV.setLon_d(Lon_d); outV.setUnixTime(unixTime); // 封装Text对象 outK.set(MMSI); // 写出context context.write(outK, outV); } } }
reduce阶段需要将过滤的数据按照mmsi排序,标记特征点,按照特征点划分子轨迹段
// Reducer public class SubTrajectorReducer extends ReducerUtils工具类{ // 重写reduce方法 public void reduce(Text key, Iterable values, Context context){ // 将同一MMSI数据放到一个新列表中 List trajectorList = new ArrayList<>(1000); for(SubTrajectorBean value: values){ trajectorList.add(Utils.getNewBean(value)); } // 进行处理 // 二维数组 List[] result = new List()[]; // 二维数组索引和子轨迹索引和特征点 Integer index = 0, sub = 1, trait = 0; for(int i = 0; i < trajectorList.length()-3; i++){ // 如果等于一,则是第一个,将其label设置为1 if(trait == 0) { trajectorList.get(i).setLabel(1); trajectorList.get(i).setLabel(1); } // 判断是否时间超限 // 如果超限,将该点的label改为1 Double = time = Math.abs(trajectorList.get(i).getUnixTime() - trajectorList.get(i+1).getUnixTime()); if(Double > 360){ trajectorList.get(i).setLabel(1); // 判断子轨迹的个数是否大于10 if (trait > 10){ index += 1; sub += 1; trait = 0; }else { result[index].clear(); } } // 如果不超限,判断TF特征点 else{ SubTrajectorBean stb1 = trajectorList.get(i); SubTrajectorBean stb2 = trajectorList.get(i+1); SubTrajectorBean stb3 = trajectorList.get(i+2); SubTrajectorBean stb4 = trajectorList.get(i+3); Double T12 = Utils.getT(stb1, stb2, stb3); Double T23 = Utils.getT(stb2, stb3, stb4); if((T12 * T23) < 0){ // 为i + 2赋值label trajectorList.get(i+2).setLabel(1); // 将i+1,i+2加入result // 拼接子轨迹编号 String st = value.getMMSI() + sub; trajectorList.get(i+1).setSubTrajector(st); trajectorList.get(i+2).setSubTrajector(st); result[index].append(trajectorList.get(i+1)); result[index].append(trajectorList.get(i+2)); // 判断子轨迹的个数是否大于10个数 if (trait > 10){ index += 1; sub += 1; trait = 0; }else { result[index].clear(); } } // 拼接子轨迹编号 String st = value.getMMSI() + sub; trajectorList.get(i).setSubTrajector(st); // 添加到二维列表中 result[index].append(trajectorList.get(i)); // 写出数据 for(List values: result){ for(SubTrajectorBean value: values){ context.write(NullWritable, value); } } } } }
在处理的过程中,为了解耦,所以将个别方法单独拿出来设置成了工具类
// 工具类Utils
public class Utils{
// 得到一个新的bean对象
public static SubTrajectorBean getNewBean(SubTrajectorBean stb){
SubTrajectorBean bean = new SubTrajectorBean();
bean.setMMSI(stb.getMMSI());
bean.setLat_d(stb.getLat_d());
bean.setLon_d(stb.getLon_d());
bean.setUnixTime(stb.getUnixTime());
bean.setLabel(stb.getLabel());
bean.setSubTrajector(stb.getSubTrajector());
}
// 曲线边缘法
public static Double getT(SubTrajectorBean stb1,SubTrajectorBean stb2, SubTrajectorBean stb3){
// 计算Tmn
Double T = (stb2.getLat_d - stb1.getLat_d)(stb3.getLon_d - stb1.getLon_d) + (stb3.getLat_d - stb1.getLat_d)(stb2.getLon_d - stb1.getLon_d);
return T;
}
}
Driver类
Driver类就是典型的八股文形式,关联map和redece,设置key、value,设置路径,提交作业
// Driver类
public class SubTrajectorDriver{
public static void main(String[] args){
// 1.获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.关联Driver类
job.setJarByClass(SubTrajectorDriver.class);
// 3.关联Mapper和Reducer类
job.setMapperClass(SubTrajectorMapper.class);
job.setReducerClass(SubTrajectorReducer.class);
// 4.设置Map的输出key/value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(SubTrajectorBean.class);
// 5.设置最终的输出key/value
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SubTrajectorBean.class);
// 6.设置输入输出路径
FileInputFormat.setInputPath(job, new Path("inputPath"));
FileOutputFormat.setOutputPath(job, new Path("outputPath"));
// 7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result? 0: 1);
}
}
上述代码就是大概的子轨迹提取过程的MapReduce实现,因为疫情原因,本人封闭不让去实验室,所以机器的限制并不能真是运行该代码,代码编写也是在xp系统的文本中靠感觉编写,但是具体思路完全符合研究逻辑,代码虽不能保证完全正确统一,但是也在编写中也十分注意格式和语法,如有代码错误之处,还请指出。
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)