計算某個人在某個區域停留時長
阿新 • • 發佈:2018-12-15
在工作的時候遇到了一個這樣的場景,統計一個人群在某個區域指定時間段內的停留時長.其實很想寫個sql去處理這個問題.但是認真想一下,其實這個邏輯不好寫.於是參考了一下別人的方案,使用MR來解決.
說明:這個MR是在阿里雲ODPS環境下執行的.
資料準備
這裡模擬了一些資料:
# 資料結構 project=example_project table=wc_in columns=msisdn:string,geohash:string,start_time:bigint,end_time:bigint,first_time:bigint,last_time:bigint,dual_time:bigint
19999999999,zzaq45ww,1539300000,1539323011,1539320733,1539320833,100 19999999999,zzaq45ww,1539300000,1539323011,1539311706,1539311806,100 19999999999,zzaq45ww,1539300000,1539323011,1539309422,1539309522,100 19999999999,zzaq45ww,1539300000,1539323011,1539317713,1539317813,100 19999999999,zzaq45ww,1539300000,1539323011,1539311974,1539312074,100 19999999999,zzaq45ww,1539300000,1539323011,1539318027,1539318127,100 19999999999,zzaq45ww,1539300000,1539323011,1539320733,1539328517,896 19999999999,zzaq45ww,1539300000,1539323011,1539311706,1539312311,247 19999999999,zzaq45ww,1539300000,1539323011,1539309422,1539312981,721 19999999999,zzaq45ww,1539300000,1539323011,1539317713,1539324718,228 19999999999,zzaq45ww,1539300000,1539323011,1539311974,1539318866,677 19999999999,zzaq44ww,1539300000,1539323011,1539318027,1539326938,732 19999999999,zzaq44ww,1539300000,1539323011,1539314486,1539317942,291 19999999999,zzaq44ww,1539300000,1539323011,1539314359,1539320011,409 19999999999,zzaq44ww,1539300000,1539323011,1539314232,1539316337,707 19999999999,zzaq44ww,1539300000,1539323011,1539314106,1539319163,853 19999999999,zzaq44ww,1539300000,1539323011,1539313979,1539320948,267 19999999999,zzaq44ww,1539300000,1539323011,1539313852,1539316317,542 19999999999,zzaq44ww,1539300000,1539323011,1539313725,1539314073,983 19999999999,zzaq44ww,1539300000,1539323011,1539313599,1539318407,886
Map階段
package myaliyun.mr;
import java.io.IOException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
public class CalStayTimeMapper extends MapperBase {
private Record key;
private Record value;
@Override
public void setup(TaskContext context) throws IOException {
key=context.createMapOutputKeyRecord();
value=context.createMapOutputValueRecord();
System.out.println("TaskID:"+context.getTaskID().toString());
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException {
key.set("msisdn",record.getString("msisdn"));
key.set("geohash",record.getString("geohash"));
value.set("start_time",record.getBigint("start_time"));
value.set("end_time",record.getBigint("end_time"));
value.set("first_time",record.getBigint("first_time"));
value.set("last_time",record.getBigint("last_time"));
value.set("dual_time",record.getBigint("dual_time"));
context.write(key,value);
}
@Override
public void cleanup(TaskContext context) throws IOException {
}
}
Reducer階段
package myaliyun.mr;
import java.io.IOException;
import java.util.Iterator;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;
public class CalStayTimeReducer extends ReducerBase {
private Record result;
@Override
public void setup(TaskContext context) throws IOException {
result=context.createOutputRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long staytime=0L;
while (values.hasNext()) {
Record val = values.next();
Long start_time = val.getBigint("start_time");
Long end_time = val.getBigint("end_time");
Long first_time = val.getBigint("first_time");
Long last_time = val.getBigint("last_time");
Long dual_time = val.getBigint("dual_time");
if (first_time>=start_time && last_time<=end_time) {
staytime+=dual_time;
}else if (first_time>=staytime && first_time<end_time && last_time>end_time) {
staytime+=(end_time-first_time);
}else if (first_time<start_time && last_time>start_time && last_time<=end_time) {
staytime+=(last_time-start_time);
}
}
String msisdn=key.getString("msisdn");
String geohash=key.getString("geohash");
if (staytime>0) {
this.result.set(0,msisdn);
this.result.set(1,geohash);
this.result.set(2,staytime);
context.write(this.result);
}
}
@Override
public void cleanup(TaskContext context) throws IOException {
}
}
任務配置
package myaliyun.mr;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
public class CalStayTimRun {
public static void main(String[] args) throws OdpsException {
if (args.length<2) {
System.err.println("Parameter [table_in table_out]");
System.exit(2);
}
JobConf job = new JobConf();
// TODO: specify map output types
job.setMapOutputKeySchema(SchemaUtils.fromString("msisdn:string,geohash:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("start_time:bigint,end_time:bigint,first_time:bigint,last_time:bigint,dual_time:bigint"));
// TODO: specify input and output tables
InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(),
job);
OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(),
job);
job.setMapperClass(myaliyun.mr.CalStayTimeMapper.class);
job.setReducerClass(myaliyun.mr.CalStayTimeReducer.class);
RunningJob rj = JobClient.runJob(job);
rj.waitForCompletion();
}
}