1. 程式人生 > 資料庫 >MLSQL Stack如何讓流除錯更加簡單詳解

MLSQL Stack如何讓流除錯更加簡單詳解

前言

有一位同學正在調研MLSQL Stack對流的支援。然後說了流除錯其實挺困難的。經過實踐,希望實現如下三點:

  • 能隨時檢視最新固定條數的Kafka資料
  • 除錯結果(sink)能列印在web控制檯
  • 流程式能自動推測json schema(現在spark是不行的)

實現這三個點之後,我發現除錯確實就變得簡單很多了。

流程

首先我新建了一個kaf_write.mlsql,裡面方便我往Kafka裡寫資料:

set abc='''
{ "x": 100,"y": 200,"z": 200,"dataType":"A group"}
{ "x": 120,"y": 100,"z": 260,"dataType":"B group"}
{ "x": 120,"dataType":"B group"}
''';
load jsonStr.`abc` as table1;

select to_json(struct(*)) as value from table1 as table2;
save append table2 as kafka.`wow` where 
kafka.bootstrap.servers="127.0.0.1:9092";

這樣我每次執行,資料就能寫入到Kafka.

接著,我寫完後,需要看看資料是不是真的都寫進去了,寫成了什麼樣子:

!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;

這句話表示,我要取樣Kafka 10條Kafka資料,該Kafka的地址為127.0.0.1:9092,主題為wow.執行結果如下:

沒有什麼問題。接著我寫一個非常簡單的流式程式:

-- the stream name,should be uniq.
set streamName="streamExample";

-- use kafkaTool to infer schema from kafka
!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;


load kafka.`wow` options 
kafka.bootstrap.servers="127.0.0.1:9092"
as newkafkatable1;


select * from newkafkatable1
as table21;


-- print in webConsole instead of terminal console.
save append table21 
as webConsole.`` 
options mode="Append"
and duration="15"
and checkpointLocation="/tmp/s-cpl4";

執行結果如下:

在終端我們也可以看到實時效果了。

補充

當然,MLSQL Stack 還有對流還有兩個特別好地方,第一個是你可以對流的事件設定http協議的callback,以及對流的處理結果再使用批SQL進行處理,最後入庫。參看如下指令碼:

-- the stream name,should be uniq.
set streamName="streamExample";


-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","offset":1,"timestamp":"2008-01-24 18:01:01.002","offset":2,"timestamp":"2008-01-24 18:01:01.003","offset":3,"offset":4,"offset":5,"timestampType":0}
''';

-- load data as table
load jsonStr.`data` as datasource;

-- convert table as stream source
load mockStream.`datasource` options 
stepSizeRange="0-3"
as newkafkatable1;

-- aggregation 
select cast(value as string) as k from newkafkatable1
as table21;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- output the the result to console.


save append table21 
as custom.`` 
options mode="append"
and duration="15"
and sourceTable="jack"
and code='''
select count(*) as c from jack as newjack;
save append newjack as parquet.`/tmp/jack`; 
'''
and checkpointLocation="/tmp/cpl15";

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對我們的支援。