transwarp Slipstream 簡介之事件驅動流處理
阿新 • • 發佈:2019-01-08
1. 從流表導資料到普通表
SET streamsql.use.eventmode=true;
CREATE STREAM s1(score INT, name STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-
node127:2181", "kafka.broker.list"="tw-node127:9092");
CREATE TABLE t1(score INT, name STRING);
INSERT INTO t1 SELECT * FROM s1;
--注: 事件驅動模式下提交的StreamJob是一個常駐的Active Job。
2. 在Window Stream上做聚合後插到普通表
SET streamsql.use.eventmode=true;
SET streamsql.use.eventtime=true; --使用事件時間
CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1"
,"kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts"
,"timeformat" ="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");
CREATE STREAM s1win AS SELECCT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND); -- 建立Window Stream
CREATE TABLE t1(score INT, name STRING);
INSERT INTO t1 SELECT SUM(score), name FROM s1win GROUP BY NAME;
--在事件驅動的流處理模式下,如果StreamJob使用事件時間處理,那在建Stream表時 需要額外新增"use.lowlevel.consumer"="true"這個表屬性。
3. 兩個Window Stream關聯後插入普通表
SET streamsql.use.eventmode=true;
SET streamsql.use.eventtime=true; --使用事件時間
CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1" ,"kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts" ,"timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");
CREATE STREAM s1win AS SELECCT * FROM s1 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND); -- 建立Window Stream
CREATE STREAM s2(class INT, name STRING, ts STRING) TBLPROPERTIES(“topic”=”tps2” ,"kafka.zookeeper"="tw-node127:2181", “kafka.broker.list”=”tw-node127:9092”,”timefield”=”ts”, ”timeformat”=”yyyy-MM-dd HH:mm:ss”,"use.lowlevel.consumer"="true");
CREATE STREAM s2win AS SELECCT * FROM s2 STREAMWINDOW (LENGTH ‘4’ SECOND SLIDE ‘2’ SECOND); -- 建立Window Stream
CREATE TABLE t1(score INT, class INT, name STRING);
INSERT INTO t1 SELECT score, class, s1win.name FROM s1win JOIN s2win ON s1win.name=s2win.name;
--事件驅動的流處理模式下,如果StreamJob使用事件時間處理,那在建Stream表時 需要額外新增"use.lowlevel.consumer"="true"這個表屬性。