京東金融資料分析案例(二)
阿新 • • 發佈:2018-12-30
任務 5
利用 spark streaming 實時分析每個頁面點選次數和不同年齡段消費總金額
步驟:編寫 Kafka produer 程式讀取hdfs上的檔案每隔一段時間產生資料,然後使用spark streaming讀取kafka中的資料進行分析,分析結果寫入到redis中。
(1)將 t_click 資料依次寫入 kafka 中的 t_click 主題中,每條資料寫入間隔為10 毫秒,其中 uid 為 key,click_time+” ,” +pid 為 value
public class ClickProducer {
private static KafkaProducer<String,String> producer;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("producer.ClickProducer");
JavaSparkContext sc = new JavaSparkContext(conf);
//從hdfs上讀取資料
JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_click");
//kafka的broker的地址(localhost:9092)這裡設定常量
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Map<String, Object> props = new HashMap<String, Object>();
props.put("bootstrap.servers", brokers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(props);
//迴圈讀取到的資料,每隔十秒傳送一條
input.foreach(new VoidFunction<String>() {
public void call(String s) throws Exception {
String[] split = s.split(",");
JSONObject event = new JSONObject();
event.put("uid",split[1]+","+split[2]);
ProducerRecord<String, String> msg =
new ProducerRecord<String, String>(KafkaRedisConfig.KAFKA_CLICK_TOPIC, event.toString());
producer.send(msg);
System.out.println("Message sent: " + event);
Thread.sleep(1000);
}
});
}
}
執行結果
```
{"uid":"2016-10-04 12:22:30,1"}
{"uid":"2016-08-09 11:24:13,9"}
{"uid":"2016-09-27 14:40:37,10"}
{"uid":"2016-10-04 12:18:42,6"}
......
(2)將 t_order 資料依次寫入 kafka 中的 t_order 主題中,每條資料寫入間隔為10 毫秒,其中 uid 為 key,uid+” ,” +price + “,” + discount 為value
public class OrderProducer {
private static KafkaProducer<String,String> producer;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("producer.OrderProducer");
JavaSparkContext sc = new JavaSparkContext(conf);
//從hdfs上讀取資料
JavaRDD<String> input = sc.textFile("hdfs://master:9000/warehouse/t_order");
//t_order 主題 使用常量定義
final String topic = KafkaRedisConfig.KAFKA_ORDER_TOPIC;
//kafka的broker的地址(localhost:9092)這裡設定常量
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Map<String, Object> props = new HashMap<String, Object>();
props.put("bootstrap.servers", brokers);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(props);
//迴圈讀取到的資料,每隔十秒傳送一條
input.foreach(new VoidFunction<String>() {
public void call(String s) throws Exception {
String[] split = s.split(",");
JSONObject event = new JSONObject();
if(!split[0].contains("uid")){
event.put("uid",split[0]+","+split[2]+","+split[5]);
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, event.toString());
producer.send(msg);
System.out.println("Message sent: " + event);
}
Thread.sleep(2000);
}
});
}
}
執行結果
{"uid":"55792,1.4898961024,0"}
{"uid":"45370,3.9950093311,0"}
{"uid":"85278,0.6658123361,0"}
......
(3)編寫 spark streaming 程式,依次讀取 kafka 中 t_click 主題資料,並統計: 每個頁面累計點選次數,並存入 redis,其中 key 為” click+pid” ,value 為累計的次數
public class ClickStreamingAnalysis {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("ClickStreamingAnalysis");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLogLevel("WARN");
// Kafka configurations
String[] topics = KafkaRedisConfig.KAFKA_CLICK_TOPIC.split("\\,");
System.out.println("Topics: " + Arrays.toString(topics));
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder");
// Create a direct stream 這裡使用spark-streaming-kafka-0-8_2.11中的kafkautil
JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class,
StringDecoder.class, StringDecoder.class,
kafkaParams,
new HashSet<String>(Arrays.asList(topics)));
//接收到的資料格式為{"uid":"2016-10-04 12:22:30,1"} 建立json物件
JavaDStream<JSONObject> events = kafkaStream.map(new Function<Tuple2<String, String>, JSONObject>() {
public JSONObject call(Tuple2<String, String> line) throws Exception {
System.out.println("line:" + line._2());
JSONObject data = JSON.parseObject(line._2());
return data;
}
});
//取出pid 並map成(pid,1)的格式,然後聚合即可算出此批次該pid的點選次數
JavaPairDStream<String, Long> clickDs = events.mapToPair(new PairFunction<JSONObject, String, Long>() {
public Tuple2<String, Long> call(JSONObject json) throws Exception {
// System.out.println("clickUID:" + json.getString("uid"));
return new Tuple2<String, Long>(json.getString("uid").split(",")[1], 1L);
}
}).reduceByKey(new Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
//定義一個redis的hashkey
final String clickHashKey = "pid::click";
clickDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
public void call(JavaPairRDD<String, Long> rdd) throws Exception {
rdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Long>>>() {
public void call(Iterator<Tuple2<String, Long>> tuple2Iterator) throws Exception { //在foreachPartition中建立jdeis的連線可以減少連線
Jedis jedis = JavaRedisClient.get().getResource();
try{
while (tuple2Iterator.hasNext()){
Tuple2<String, Long> next = tuple2Iterator.next();
String pid = "click"+next._1();
Long clickCount = next._2();
jedis.hincrBy(clickHashKey, pid, clickCount);
System.out.println(pid+":"+clickCount);
}
}catch (Exception e){
System.out.println("error:"+e);
}
//用完一定要關了,不然連線池洩露程式就卡主了
jedis.close();
}
});
}
});
ssc.start();
ssc.awaitTermination();
}
}
執行結果redis
127.0.0.1:6379> HGETALL age::money
1) "30"
2) "88.71581602079999521"
3) "40"
4) "33.95183371870000115"
5) "35"
......
(4)編寫 spark streaming 程式,依次讀取 kafka 中 t_order 主題資料,並統計:不同年齡段消費總金額,並存入 redis,其中 key 為” age” ,value 為累計的消費金額
public class OrderStreamingAnalysis {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("OrderStreamingAnalysis");
// final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2));
//kafka中接收到的資料格式{"uid":"55792,1.4898961024,0"} 裡面沒有年齡,所以需要跟user表做join,
//因為一個程式中只能有一個sparkcontext所以通過JavaStreamingContext 獲得
final SQLContext sqlcontext = new SQLContext(ssc.sparkContext());
ssc.sparkContext().setLogLevel("WARN");
final Dataset<Row> userDs = sqlcontext.read().csv("hdfs://master:9000/warehouse/t_user");
//設定schema資訊
StructType userSchema = new StructType()
.add("uid", "string", false)
.add("age", "string", false)
.add("six", "string", true)
.add("active_date", "string", false)
.add("limit", "string", false);
final Dataset<Row> userDf = sqlcontext.createDataFrame(userDs.toJavaRDD(), userSchema);
// Kafka configurations
String[] topics = KafkaRedisConfig.KAFKA_ORDER_TOPIC.split("\\,");
System.out.println("Topics: " + Arrays.toString(topics));
String brokers = KafkaRedisConfig.KAFKA_ADDR;
Set<String> topicsSet = new HashSet<String>(Arrays.asList(topics));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
//初始化dstream 這裡使用spark-streaming-kafka-0-10_2.11中的kafkautil
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams)
);
//讀取到的string轉化為json物件
JavaDStream<JSONObject> events = kafkaStream.map(new Function<ConsumerRecord<String, String>, JSONObject>() {
public JSONObject call(ConsumerRecord<String, String> line) throws Exception {
System.out.println("line:" + line.value());
return JSON.parseObject(line.value());
}
});
//取出uid和金額
JavaPairDStream<String, Double> orderDs = events.mapToPair(new PairFunction<JSONObject, String, Double>() {
public Tuple2<String, Double> call(JSONObject json) throws Exception {
String[] strs = json.getString("uid").split(",");
return new Tuple2<String, Double>(strs[0], Double.parseDouble(strs[1]) - Double.parseDouble(strs[2]));
}
});
orderDs.foreachRDD(new VoidFunction<JavaPairRDD<String, Double>>() {
public void call(JavaPairRDD<String, Double> rdd) throws Exception {
JavaRDD<Row> mapRow = rdd.map(new Function<Tuple2<String, Double>, Row>() {
public Row call(Tuple2<String, Double> v1) throws Exception {
String uid = v1._1();
Double money = v1._2();
// System.out.println("orderUID:" + uid+":"+money);
return RowFactory.create(uid, money);
}
});
StructType orderSchema = new StructType()
.add("uid", "string", false)
.add("money", "Double", false);
Dataset<Row> orderDf = sqlcontext.createDataFrame(mapRow, orderSchema);
//定義一個redis的hashkey
final String moneyHashKey = "age::money";
//查詢
Dataset<Row> count = orderDf.join(userDf, orderDf.col("uid").equalTo(userDf.col("uid")))
.select("age", "money")
.groupBy("age")
.sum("money");
count.printSchema();
count.repartition(3).foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> t) throws Exception {
Jedis jedis = JavaRedisClient.get().getResource();
try {
if(t.hasNext()){
Row row = t.next();
String age = row.getString(0);
Double money = row.getDouble(1);
System.out.println(age+"::::"+money);
jedis.hincrByFloat(moneyHashKey, age, money);
}
}catch (Exception e){
System.out.println("error"+e);
}
jedis.close();
}
});
}
});
ssc.start();
ssc.awaitTermination();
}
執行結果redis
127.0.0.1:6379> HGETALL age::money
1) "30"
2) "107.51128448799999422"
3) "40"
4) "39.40304406300000115"
5) "35"
6) "83.02971674589999735"
......
OK完成