1. 程式人生 > 程式設計 >Flink dynamic table轉成stream實戰

Flink dynamic table轉成stream實戰

Dynamic table在flink中是一個邏輯概念,也正是Dynamic table可以讓流資料支援table API和SQL。下圖是stream和dynamic table轉換關係,先將stream轉化為dynamic table,再基於dynamic table進行SQL操作生成新的dynamic table,最後將dynamic table轉化為stream。本文將重點介紹dynamic table轉化為stream的3種方式。

注:本文所涉及的程式碼全部基於flink 1.9.0以及flink-table-planner-blink_2.11

Append-only stream

官方定義如下:

A dynamic table that is only modified by INSERT changes can be converted into a stream by emitting the inserted rows.

也就是說如果dynamic table只包含了插入新資料的操作那麼就可以轉化為append-only stream,所有資料追加到stream裡面。

樣例程式碼:

public class AppendOnlyExample {
	public static void main(String[] args) throws Exception 
{ EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings); env.setParallelism(1
); DataStream<Tuple2<String,String>> data = env.fromElements( new Tuple2<>("Mary","./home"),new Tuple2<>("Bob","./cart"),new Tuple2<>("Mary","./prod?id=1"),new Tuple2<>("Liz","./prod?id=3") ); Table clicksTable = tEnv.fromDataStream(data,"user,url"); tEnv.registerTable("clicks",clicksTable); Table rTable = tEnv.sqlQuery("select user,url from clicks where user='Mary'"); DataStream ds = tEnv.toAppendStream(rTable,TypeInformation.of(new TypeHint<Tuple2<String,String>>(){})); ds.print(); env.execute(); } } 複製程式碼

執行結果:

(Mary,./prod?id=8)
(Mary,./prod?id=6)
複製程式碼

Retract stream

官方定義

A retract stream is a stream with two types of messages,add messages and retract messages. A dynamic table is converted into an retract stream by encoding an INSERT change as add message,a DELETE change as retract message,and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row. The following figure visualizes the conversion of a dynamic table into a retract stream.

樣例程式碼:

public class RetractExample {
	public static void main(String[] args) throws Exception {
		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,clicksTable);
		Table rTable = tEnv.sqlQuery("SELECT user,COUNT(url) FROM clicks GROUP BY user");

		DataStream ds = tEnv.toRetractStream(rTable,Long>>(){}));
		ds.print();
		env.execute();
	}
}
複製程式碼

執行結果:

(true,(Mary,1))
(true,(Bob,1))
(false,2))
(true,(Liz,2))
複製程式碼

對於toRetractStream的返回值是一個Tuple2<Boolean,T>型別,第一個元素為true表示這條資料為要插入的新資料,false表示需要刪除的一條舊資料。也就是說可以把***更新表中某條資料***分解為先刪除一條舊資料再插入一條新資料。

Upsert stream

官方定義:

An upsert stream is a stream with two types of messages,upsert messages and delete messages. A dynamic table that is converted into an upsert stream requires a (possibly composite) unique key. A dynamic table with unique key is converted into a stream by encoding INSERT and UPDATE changes as upsert messages and DELETE changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly. The main difference to a retract stream is that UPDATE changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.

這個模式和以上兩個模式不同的地方在於要實現將Dynamic table轉化成Upsert stream需要實現一個UpsertStreamTableSink,而不能直接使用StreamTableEnvironment進行轉換。

樣例程式碼:

public class UpsertExample {
	public static void main(String[] args) throws Exception {
		EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,"./prod?id=3"),"./prod?id=7")
		);

		Table clicksTable = tEnv.fromDataStream(data,COUNT(url) FROM clicks GROUP BY user");
		tEnv.registerTableSink("MemoryUpsertSink",new MemoryUpsertSink(rTable.getSchema()));
		rTable.insertInto("MemoryUpsertSink");

		env.execute();
	}

	private static class MemoryUpsertSink implements UpsertStreamTableSink<Tuple2<String,Long>> {
		private TableSchema schema;
		private String[] keyFields;
		private boolean isAppendOnly;

		private String[] fieldNames;
		private TypeInformation<?>[] fieldTypes;

		public MemoryUpsertSink() {

		}

		public MemoryUpsertSink(TableSchema schema) {
			this.schema = schema;
		}

		@Override
		public void setKeyFields(String[] keys) {
			this.keyFields = keys;
		}

		@Override
		public void setIsAppendOnly(Boolean isAppendOnly) {
			this.isAppendOnly = isAppendOnly;
		}

		@Override
		public TypeInformation<Tuple2<String,Long>> getRecordType() {
			return TypeInformation.of(new TypeHint<Tuple2<String,Long>>(){});
		}

		@Override
		public void emitDataStream(DataStream<Tuple2<Boolean,Tuple2<String,Long>>> dataStream) {
			consumeDataStream(dataStream);
		}

		@Override
		public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean,Long>>> dataStream) {
			return dataStream.addSink(new DataSink()).setParallelism(1);
		}

		@Override
		public TableSink<Tuple2<Boolean,Long>>> configure(String[] fieldNames,TypeInformation<?>[] fieldTypes) {
			MemoryUpsertSink memoryUpsertSink = new MemoryUpsertSink();
			memoryUpsertSink.setFieldNames(fieldNames);
			memoryUpsertSink.setFieldTypes(fieldTypes);
			memoryUpsertSink.setKeyFields(keyFields);
			memoryUpsertSink.setIsAppendOnly(isAppendOnly);

			return memoryUpsertSink;
		}

		@Override
		public String[] getFieldNames() {
			return schema.getFieldNames();
		}

		public void setFieldNames(String[] fieldNames) {
			this.fieldNames = fieldNames;
		}

		@Override
		public TypeInformation<?>[] getFieldTypes() {
			return schema.getFieldTypes();
		}

		public void setFieldTypes(TypeInformation<?>[] fieldTypes) {
			this.fieldTypes = fieldTypes;
		}
	}

	private static class DataSink extends RichSinkFunction<Tuple2<Boolean,Tuple2<String,Long>>> {
		public DataSink() {
		}

		@Override
		public void invoke(Tuple2<Boolean,Long>> value,Context context) throws Exception {
			System.out.println("send message:" + value);
		}
	}
}
複製程式碼

執行結果:

send message:(true,1))
send message:(true,2))
send message:(true,3))
複製程式碼

這種模式的返回值也是一個Tuple2<Boolean,T>型別,和Retract的區別在於更新表中的某條資料並不會返回一條刪除舊資料一條插入新資料,而是看上去真的是更新了某條資料。

Upsert stream番外篇

以上所講的內容全部都是來自於flink官網,只是附上了與其對應的樣例,可以讓讀者更直觀的感受每種模式的輸出效果。網上也有很多對官方檔案的翻譯,但是幾乎沒有文章或者樣例說明在使用UpsertStreamTableSink的時候什麼情況下返回值Tuple2<Boolean,T>第一個元素是false?話不多說直接上樣例,只要把上面的例子中的sql改為如下

String sql = "SELECT user,cnt " +
             "FROM (" +
                    "SELECT user,COUNT(url) as cnt FROM clicks GROUP BY user" +
                   ")" +
             "ORDER BY cnt LIMIT 2";
複製程式碼

返回結果:

send message:(true,1))
send message:(false,2))
send message:(false,2))
複製程式碼

具體的原理可以檢視原始碼,org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecRankorg.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSortLimit 解析sql的時候通過下面的方法得到不同的strategy,由此影響是否需要刪除原有資料的行為。

  def getStrategy(forceRecompute: Boolean = false): RankProcessStrategy = {
    if (strategy == null || forceRecompute) {
      strategy = RankProcessStrategy.analyzeRankProcessStrategy(
        inputRel,ImmutableBitSet.of(),sortCollation,cluster.getMetadataQuery)
    }
    strategy
  }
複製程式碼

知道什麼時候會產生false屬性的資料,對於理解JDBCUpsertTableSinkHBaseUpsertTableSink的使用會有很大的幫助。

總結

在閱讀flink檔案的時候,總是想通過程式碼實戰體會其中的原理,很多文章是將官網的描述翻譯成了中文,而本文是將官網的描述“翻譯”成了程式碼,希望可以幫助讀者理解官網原文中的含義。