spark streaming + redis : 實時統計日註冊率
阿新 • • 發佈:2018-12-20
使用spark streaming 實時統計新註冊的使用者流程如下: 程式碼如下:
1, 新增maven依賴
<!--hive依賴--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.1.0</version> </dependency> <!--spark sql 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <!--spark streaming 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency> <!--redis 依賴--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
2,啟動spark流計算
SparkConf conf = new SparkConf() ; conf.setAppName("kafka") ; conf.setMaster("local[3]") ; // 先建立SparkSession final SparkSession spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() ; //建立java streaming上下文 JavaStreamingContext ssc = new JavaStreamingContext( new JavaSparkContext(spark.sparkContext()) , Durations.seconds(2)) ; //kafka引數 Map<String,Object> kafkaParams = new HashMap<String, Object>(); kafkaParams.put("bootstrap.servers" , "localhost:9092") ; kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ; kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ; kafkaParams.put("auto.offset.reset" , "latest") ; kafkaParams.put("group.id" , "raw_logs") ; kafkaParams.put("enable.auto.commit" ,"true") ; //位置策略 , 控制消費者在哪個主機上啟動 //消費者策略 , 控制消費哪個主題,哪個分割槽,哪個偏移量 LocationStrategy ls = LocationStrategies.PreferConsistent() ; List<TopicPartition> tps = new ArrayList<TopicPartition>( ) ; tps.add(new TopicPartition("raw_log_handleTopic" , 0)) ; ConsumerStrategy cs = ConsumerStrategies.Assign(tps , kafkaParams) ; //kafka訊息流 JavaDStream<ConsumerRecord<String,String>> ds1 = KafkaUtils.createDirectStream(ssc , ls ,cs) ; //3, 過濾原始日誌,提取startUp表資料 //4, 在表中查詢資料,存入redis ssc.start(); ssc.awaitTermination();
3, 過濾原始日誌,提取startUp表資料
//提取到日誌串#.#.#.#. JavaDStream<Row> ds2 = ds1.map(new Function<ConsumerRecord<String,String>, Row>() { public Row call(ConsumerRecord<String, String> v1) throws Exception { String topic = v1.topic() ; int par = v1.partition() ; long offset = v1.offset() ; String value = v1.value(); String mesg="topic= "+topic + ", partition= "+par + ", offset= "+offset + ", value= "+value; System.out.println("mesg===> " +mesg); String[] arr = value.split("#"); return RowFactory.create( Float.parseFloat(arr[0]), arr[1], arr[2], Long.parseLong(arr[3]), Integer.parseInt(arr[4]), arr[5]) ; } }) ; ds2.print(); ds2.foreachRDD(new VoidFunction<JavaRDD<Row>>() { public void call(JavaRDD<Row> rdd) throws Exception { SparkSession spark = SparkSession.builder() .config(rdd.context().getConf()) .enableHiveSupport() .getOrCreate(); StructField[] fields = new StructField[6]; fields[0] = new StructField("servertimems", DataTypes.FloatType, false, Metadata.empty()); fields[1] = new StructField("servertimestr", DataTypes.StringType, false, Metadata.empty()); fields[2] = new StructField("clientip", DataTypes.StringType, false, Metadata.empty()); fields[3] = new StructField("clienttimems", DataTypes.LongType, false, Metadata.empty()); fields[4] = new StructField("status", DataTypes.IntegerType, false, Metadata.empty()); fields[5] = new StructField("log", DataTypes.StringType, false, Metadata.empty()); StructType type = new StructType(fields); //過濾無效資料 Dataset<Row> df1 = spark.createDataFrame(rdd, type); df1.createOrReplaceTempView("_temp"); Dataset<Row> df2 = spark.sql("select forkstartuplogs(servertimestr , clienttimems , clientip , log) from _temp"); df2.createOrReplaceTempView("_temp2");
4, 在表中查詢資料,存入redis
String aggSql = "select concat(appid,'#',appversion,'#',brand,'#',appplatform,'#',devicestyle,'#',ostype,'#',deviceid) key," +
"min(createdatms) mn," +
"max(createdatms) mx from _temp2 group by " +
"concat(appid,'#',appversion,'#',brand,'#',appplatform,'#',devicestyle,'#',ostype,'#',deviceid)" ;
//在sql語句中聚合rdd內的最值
spark.sql(aggSql).foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> t) throws Exception {
//建立redis例項
Jedis redis = new Jedis("s101", 6379);
redis.select(1);
while(t.hasNext()){
Row row = t.next() ;
String key = row.getAs("key") ;
long mn = row.getAs("mn") ;
long mx = row.getAs("mx") ;
String oldvalue = redis.get(key);
if (oldvalue == null) {
redis.set(key, mn + "," + mx);
} else {
String[] arr = oldvalue.split(",");
long oldMin = Long.parseLong(arr[0]);
long oldMax = Long.parseLong(arr[1]);
redis.set(key, Math.min(mn, oldMin) + "," + Math.max(mx, oldMax));
}
}
redis.close();
}
});