Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二十五)Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄(以及遇到的問題)。
需求:
目前kafka的topic上有一批數據,這些數據被分配到9個不同的partition中(就是發布時key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)這些數據的唯一鍵值:int_id+start_time,其中int_id和start_time是topic record中的記錄。這9組數據按照唯一鍵值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.primarykey1)。
偽代碼:
m組成字段包含:
public class MS_PLRULQX {private String key; private String int_id; private String start_time; private long MS_PLRULQX_00; private long MS_PLRULQX_01; public String getPrimaryKey() { return this.int_id + "_" + this.scan_start_time; } }
完整MS_PLRULQX類定義:
import java.io.Serializable;View Codeimport org.apache.spark.sql.Row; public class MS_PLRULQX implements Serializable, Comparable<MS_PLRULQX> { private static final long serialVersionUID = -2873721171908282946L; public MS_PLRULQX() { } public MS_PLRULQX(Row row) { this.key = row.getAs("key");this.int_id = row.getAs("int_id"); this.start_time = row.getAs("start_time"); this.MS_PLRULQX_00 = row.getAs("MS_PLRULQX_00"); this.MS_PLRULQX_01 = row.getAs("MS_PLRULQX_01"); } private String key; private String int_id; private String start_time; private long MS_PLRULQX_00; private long MS_PLRULQX_01; public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getInt_id() { return int_id; } public void setInt_id(String int_id) { this.int_id = int_id; } public String getStart_time() { return start_time; } public void setStart_time(String start_time) { this.start_time = start_time; } public long getMS_PLRULQX_00() { return MS_PLRULQX_00; } public void setMS_PLRULQX_00(long MS_PLRULQX_00) { this.MS_PLRULQX_00 = MS_PLRULQX_00; } public long getMS_PLRULQX_01() { return MS_PLRULQX_01; } public void setMS_PLRULQX_01(long MS_PLRULQX_01) { this.MS_PLRULQX_01 = MS_PLRULQX_01; } public String getPrimaryKey() { return this.int_id + "_" + this.scan_start_time; } @Override public int compareTo(MS_PLRULQX other) { // key format:MS_PLRULQX1,MS_PLRULQX2,..MS_PLRULQX9 if (this.getKey().toLowerCase().indexOf("MS_PLRULQX".toLowerCase()) != -1) { NumberUtils numberUtils = new NumberUtils(); String thisKeyStr = this.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), ""); String otherKeyStr = other.getKey().toLowerCase().replace("MS_PLRULQX".toLowerCase(), ""); if (numberUtils.isNumber(thisKeyStr)) { int thisKeyValue = Integer.valueOf(thisKeyStr); int otherKeyValue = Integer.valueOf(otherKeyStr); if (thisKeyValue > otherKeyValue) { return 1; } else if (thisKeyStr == otherKeyStr) { return 0; } else { return -1; } } } return this.key.compareTo(other.key); } }
MS_PLRULQX在9個topic中各有一份,把它們拼接起來,拼接條件primarykey相同的數據才能一起拼接,拼接後保留實體字段如下:
public class MS_PLRULQX_Combine implements Serializable { private String key; private String int_id; private String start_time; private long mr_packetlossrateulqci_1_00; private long mr_packetlossrateulqci_1_01; private long mr_packetlossrateulqci_2_00; private long mr_packetlossrateulqci_2_01; private long mr_packetlossrateulqci_3_00; private long mr_packetlossrateulqci_3_01; private long mr_packetlossrateulqci_4_00; private long mr_packetlossrateulqci_4_01; private long mr_packetlossrateulqci_5_00; private long mr_packetlossrateulqci_5_01; private long mr_packetlossrateulqci_6_00; private long mr_packetlossrateulqci_6_01; private long mr_packetlossrateulqci_7_00; private long mr_packetlossrateulqci_7_01; private long mr_packetlossrateulqci_8_00; private long mr_packetlossrateulqci_8_01; private long mr_packetlossrateulqci_9_00; private long mr_packetlossrateulqci_9_01; }
完整MS_PLRULQX_Combine 類定義:
import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class MS_PLRULQX_Combine implements Serializable { private static final long serialVersionUID = -944128402186054489L; public MS_PLRULQX_Combine() { } public MS_PLRULQX_Combine(List<MS_PLRULQX> list) { int sizeOfList = list.size(); if (sizeOfList > 9) { throw new RuntimeException("the measurement group items‘s length(" + list.size() + ") over than 9"); } if (sizeOfList >= 1) { setItem1(list.get(0)); } if (sizeOfList >= 2) { setItem2(list.get(1)); } if (sizeOfList >= 3) { setItem3(list.get(2)); } if (sizeOfList >= 4) { setItem4(list.get(3)); } if (sizeOfList >= 5) { setItem5(list.get(4)); } if (sizeOfList >= 6) { setItem6(list.get(5)); } if (sizeOfList >= 7) { setItem7(list.get(6)); } if (sizeOfList >= 8) { setItem8(list.get(7)); } if (sizeOfList >= 9) { setItem9(list.get(8)); } } private void setItem9(MS_PLRULQX item9) { if (item9 != null) { this.mr_packetlossrateulqci_9_00 = item9.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_9_01 = item9.getMr_packetlossrateulqci_01(); } } private void setItem8(MS_PLRULQX item8) { if (item8 != null) { this.mr_packetlossrateulqci_8_00 = item8.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_8_01 = item8.getMr_packetlossrateulqci_01(); } } private void setItem7(MS_PLRULQX item7) { if (item7 != null) { this.mr_packetlossrateulqci_7_00 = item7.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_7_01 = item7.getMr_packetlossrateulqci_01(); } } private void setItem6(MS_PLRULQX item6) { if (item6 != null) { this.mr_packetlossrateulqci_6_00 = item6.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_6_01 = item6.getMr_packetlossrateulqci_01(); } } private void setItem5(MS_PLRULQX item5) { if (item5 != null) { this.mr_packetlossrateulqci_5_00 = item5.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_5_01 = item5.getMr_packetlossrateulqci_01(); } } private void setItem4(MS_PLRULQX item4) { if (item4 != null) { this.mr_packetlossrateulqci_4_00 = item4.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_4_01 = item4.getMr_packetlossrateulqci_01(); } } private void setItem3(MS_PLRULQX item3) { if (item3 != null) { this.mr_packetlossrateulqci_3_00 = item3.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_3_01 = item3.getMr_packetlossrateulqci_01(); } } private void setItem2(MS_PLRULQX item2) { if (item2 != null) { this.mr_packetlossrateulqci_2_00 = item2.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_2_01 = item2.getMr_packetlossrateulqci_01(); } } private void setItem1(MS_PLRULQX item1) { if (item1 != null) { this.key = item1.getKey(); this.int_id = item1.getInt_id(); this.start_time = item1.getStart_time(); this.mr_packetlossrateulqci_1_00 = item1.getMr_packetlossrateulqci_00(); this.mr_packetlossrateulqci_1_01 = item1.getMr_packetlossrateulqci_01(); } } private String key; private String int_id; private String start_time; private long mr_packetlossrateulqci_1_00; private long mr_packetlossrateulqci_1_01; private long mr_packetlossrateulqci_2_00; private long mr_packetlossrateulqci_2_01; private long mr_packetlossrateulqci_3_00; private long mr_packetlossrateulqci_3_01; private long mr_packetlossrateulqci_4_00; private long mr_packetlossrateulqci_4_01; private long mr_packetlossrateulqci_5_00; private long mr_packetlossrateulqci_5_01; private long mr_packetlossrateulqci_6_00; private long mr_packetlossrateulqci_6_01; private long mr_packetlossrateulqci_7_00; private long mr_packetlossrateulqci_7_01; private long mr_packetlossrateulqci_8_00; private long mr_packetlossrateulqci_8_01; private long mr_packetlossrateulqci_9_00; private long mr_packetlossrateulqci_9_01; public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getInt_id() { return int_id; } public void setInt_id(String int_id) { this.int_id = int_id; } public String getStart_time() { return start_time; } public void setStart_time(String start_time) { this.start_time = start_time; } public long getMr_packetlossrateulqci_1_00() { return mr_packetlossrateulqci_1_00; } public void setMr_packetlossrateulqci_1_00(long mr_packetlossrateulqci_1_00) { this.mr_packetlossrateulqci_1_00 = mr_packetlossrateulqci_1_00; } public long getMr_packetlossrateulqci_1_01() { return mr_packetlossrateulqci_1_01; } public void setMr_packetlossrateulqci_1_01(long mr_packetlossrateulqci_1_01) { this.mr_packetlossrateulqci_1_01 = mr_packetlossrateulqci_1_01; } public long getMr_packetlossrateulqci_2_00() { return mr_packetlossrateulqci_2_00; } public void setMr_packetlossrateulqci_2_00(long mr_packetlossrateulqci_2_00) { this.mr_packetlossrateulqci_2_00 = mr_packetlossrateulqci_2_00; } public long getMr_packetlossrateulqci_2_01() { return mr_packetlossrateulqci_2_01; } public void setMr_packetlossrateulqci_2_01(long mr_packetlossrateulqci_2_01) { this.mr_packetlossrateulqci_2_01 = mr_packetlossrateulqci_2_01; } public long getMr_packetlossrateulqci_3_00() { return mr_packetlossrateulqci_3_00; } public void setMr_packetlossrateulqci_3_00(long mr_packetlossrateulqci_3_00) { this.mr_packetlossrateulqci_3_00 = mr_packetlossrateulqci_3_00; } public long getMr_packetlossrateulqci_3_01() { return mr_packetlossrateulqci_3_01; } public void setMr_packetlossrateulqci_3_01(long mr_packetlossrateulqci_3_01) { this.mr_packetlossrateulqci_3_01 = mr_packetlossrateulqci_3_01; } public long getMr_packetlossrateulqci_4_00() { return mr_packetlossrateulqci_4_00; } public void setMr_packetlossrateulqci_4_00(long mr_packetlossrateulqci_4_00) { this.mr_packetlossrateulqci_4_00 = mr_packetlossrateulqci_4_00; } public long getMr_packetlossrateulqci_4_01() { return mr_packetlossrateulqci_4_01; } public void setMr_packetlossrateulqci_4_01(long mr_packetlossrateulqci_4_01) { this.mr_packetlossrateulqci_4_01 = mr_packetlossrateulqci_4_01; } public long getMr_packetlossrateulqci_5_00() { return mr_packetlossrateulqci_5_00; } public void setMr_packetlossrateulqci_5_00(long mr_packetlossrateulqci_5_00) { this.mr_packetlossrateulqci_5_00 = mr_packetlossrateulqci_5_00; } public long getMr_packetlossrateulqci_5_01() { return mr_packetlossrateulqci_5_01; } public void setMr_packetlossrateulqci_5_01(long mr_packetlossrateulqci_5_01) { this.mr_packetlossrateulqci_5_01 = mr_packetlossrateulqci_5_01; } public long getMr_packetlossrateulqci_6_00() { return mr_packetlossrateulqci_6_00; } public void setMr_packetlossrateulqci_6_00(long mr_packetlossrateulqci_6_00) { this.mr_packetlossrateulqci_6_00 = mr_packetlossrateulqci_6_00; } public long getMr_packetlossrateulqci_6_01() { return mr_packetlossrateulqci_6_01; } public void setMr_packetlossrateulqci_6_01(long mr_packetlossrateulqci_6_01) { this.mr_packetlossrateulqci_6_01 = mr_packetlossrateulqci_6_01; } public long getMr_packetlossrateulqci_7_00() { return mr_packetlossrateulqci_7_00; } public void setMr_packetlossrateulqci_7_00(long mr_packetlossrateulqci_7_00) { this.mr_packetlossrateulqci_7_00 = mr_packetlossrateulqci_7_00; } public long getMr_packetlossrateulqci_7_01() { return mr_packetlossrateulqci_7_01; } public void setMr_packetlossrateulqci_7_01(long mr_packetlossrateulqci_7_01) { this.mr_packetlossrateulqci_7_01 = mr_packetlossrateulqci_7_01; } public long getMr_packetlossrateulqci_8_00() { return mr_packetlossrateulqci_8_00; } public void setMr_packetlossrateulqci_8_00(long mr_packetlossrateulqci_8_00) { this.mr_packetlossrateulqci_8_00 = mr_packetlossrateulqci_8_00; } public long getMr_packetlossrateulqci_8_01() { return mr_packetlossrateulqci_8_01; } public void setMr_packetlossrateulqci_8_01(long mr_packetlossrateulqci_8_01) { this.mr_packetlossrateulqci_8_01 = mr_packetlossrateulqci_8_01; } public long getMr_packetlossrateulqci_9_00() { return mr_packetlossrateulqci_9_00; } public void setMr_packetlossrateulqci_9_00(long mr_packetlossrateulqci_9_00) { this.mr_packetlossrateulqci_9_00 = mr_packetlossrateulqci_9_00; } public long getMr_packetlossrateulqci_9_01() { return mr_packetlossrateulqci_9_01; } public void setMr_packetlossrateulqci_9_01(long mr_packetlossrateulqci_9_01) { this.mr_packetlossrateulqci_9_01 = mr_packetlossrateulqci_9_01; } }View Code
從topic上獲取數據流:
Dataset<Row> dsParsed = this.sparkSession.readStream().format("kafka").options(this.kafkaOptions).option("subscribe", topicName) .option("startingOffsets", "earliest").load(); String waterMarkName = "query" + this.getTopicEncodeName(topicName) + "Agg"; int windowDuration = 2 * 60; int slideDuration = 60; try { dsParsed.withWatermark("timestamp", "2 hour").createTempView(waterMarkName); } catch (AnalysisException e1) { e1.printStackTrace(); throw new RuntimeException(e1); } String aggSQL = "xxx"; Dataset<Row> dsSQL1 = sparkSession.sql(aggSQL); dsSQL1.printSchema();
對獲取的數據流按照key進行數據拼接:
正確的處理方式:按照key對數據進行分組,然後對同一組數據按照key進行排序,之後完成數據合並,把合並結果打印到console上。
KeyValueGroupedDataset<String, Row> tuple2Dataset = dsSQL1.groupByKey((MapFunction<Row, String>) row -> { String int_id = row.getAs("int_id"); String start_time = row.getAs("start_time"); String key = int_id + "_" + start_time; return key; }, Encoders.STRING()); Dataset<MS_PLRULQX_Combine> tuple2FlatMapDataset = tuple2Dataset.flatMapGroups( new FlatMapGroupsFunction<String, Row, MS_PLRULQX_Combine>() { private static final long serialVersionUID = 1400167811199763836L; @Override public Iterator<MS_PLRULQX_Combine> call(String key, Iterator<Row> values) throws Exception { List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>(); while (values.hasNext()) { Row value = values.next(); MS_PLRULQX item = new MS_PLRULQX(value); list.add(item); } Collections.sort(list, (v1, v2) -> -(v1.compareTo(v2))); return Arrays.asList(new MS_PLRULQX_Combine(list)).iterator(); } }, Encoders.bean(MS_PLRULQX_Combine.class)); Dataset<Row> rows = tuple2FlatMapDataset.toDF(); rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();
對獲取的數據流按照key進行數據拼接,另外一種方案遇到的問題:
該方案使用JavaRDD進行分組,排序,合並。
import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() { private static final long serialVersionUID = -5203498264050492910L; @Override public Tuple2<String, MS_PLRULQX> call(Row row) throws Exception { MS_PLRULQX value = new MS_PLRULQX(row); return new Tuple2<String, MS_PLRULQX>(value.getPrimaryKey(), value); } }); JavaPairRDD<String, Iterable<MS_PLRULQX>> group = pairs.groupByKey(); JavaPairRDD<String, MS_PLRULQX_Combine> keyVsValuePairRDD = group.mapToPair(tuple -> { List<MS_PLRULQX> list = new ArrayList<MS_PLRULQX>(); Iterator<MS_PLRULQX> it = tuple._2.iterator(); while (it.hasNext()) { MS_PLRULQX score = it.next(); list.add(score); } Collections.sort(list, (v1, v2) -> -(v1.compareTo(v2))); return new Tuple2<String, MS_PLRULQX_Combine>(tuple._1, new MS_PLRULQX_Combine(list)); }); JavaRDD<MS_PLRULQX_Combine> javaRDD = keyVsValuePairRDD .map(new Function<Tuple2<String, MS_PLRULQX_Combine>, MS_PLRULQX_Combine>() { private static final long serialVersionUID = -3031600976005716506L; @Override public MS_PLRULQX_Combine call(Tuple2<String, MS_PLRULQX_Combine> v1) throws Exception { return v1._2; } }); Dataset<Row> rows = this.sparkSession.createDataFrame(javaRDD, MS_PLRULQX_Combine.class); rows.writeStream().format("console").outputMode("complete").trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)).start();
sparkSession.streams().awaitAnyTermination();
拋出錯誤的位置就是:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
at com.xx.xx.streaming.drivers.XXXDriver.run(xxxxDriver.java:85) 錯誤代碼執行“JavaPairRDD<String, MS_PLRULQX> pairs = dsSQL1.toJavaRDD().mapToPair(new PairFunction<Row, String, MS_PLRULQX>() {”該行。
該錯誤代碼,看起來像是“執行了.toJavaRDD()和執行dsSQL1.show/dsSQL1.collection.foreach(println(_))一樣。”
Kafka:ZK+Kafka+Spark Streaming集群環境搭建(二十五)Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄(以及遇到的問題)。