1. 程式人生 > >transwarp Slipstream 簡介之事件驅動流處理

transwarp Slipstream 簡介之事件驅動流處理

1. 從流表導資料到普通表

SET streamsql.use.eventmode=true;
CREATE STREAM s1(score INT, name STRING) TBLPROPERTIES("topic"="tps1","kafka.zookeeper"="tw-
     node127:2181", "kafka.broker.list"="tw-node127:9092");
     CREATE TABLE t1(score INT, name STRING);
     INSERT INTO t1 SELECT * FROM s1;
     --注: 事件驅動模式下提交的StreamJob是一個常駐的Active Job。

2. 在Window Stream上做聚合後插到普通表

SET streamsql.use.eventmode=true;
SET streamsql.use.eventtime=true; --使用事件時間
     CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1"
     ,"kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts"
     ,"timeformat"
="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");
CREATE STREAM s1win AS SELECCT * FROM s1 STREAMWINDOW (LENGTH ‘4SECOND SLIDE ‘2SECOND); -- 建立Window Stream CREATE TABLE t1(score INT, name STRING); INSERT INTO t1 SELECT SUM(score), name FROM s1win GROUP BY NAME; --在事件驅動的流處理模式下,如果StreamJob使用事件時間處理,那在建Stream表時 需要額外新增"use.lowlevel.consumer"="true"這個表屬性。

3. 兩個Window Stream關聯後插入普通表

SET streamsql.use.eventmode=true;
SET streamsql.use.eventtime=true; --使用事件時間
CREATE STREAM s1(score INT, name STRING, ts STRING) TBLPROPERTIES("topic"="tps1" ,"kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="tw-node127:9092","timefield"="ts" ,"timeformat"="yyyy-MM-dd HH:mm:ss","use.lowlevel.consumer"="true");
CREATE STREAM s1win AS SELECCT * FROM s1 STREAMWINDOW (LENGTH ‘4SECOND SLIDE ‘2SECOND); -- 建立Window Stream
CREATE STREAM s2(class INT, name STRING, ts STRING) TBLPROPERTIES(“topic”=”tps2” ,"kafka.zookeeper"="tw-node127:2181", “kafka.broker.list”=”tw-node127:9092”,”timefield”=”ts”, ”timeformat”=”yyyy-MM-dd HH:mm:ss”,"use.lowlevel.consumer"="true");
CREATE STREAM s2win AS SELECCT * FROM s2 STREAMWINDOW (LENGTH ‘4SECOND SLIDE ‘2SECOND); -- 建立Window Stream
     CREATE TABLE t1(score INT, class INT, name STRING);
     INSERT INTO t1 SELECT score, class, s1win.name FROM s1win JOIN s2win ON s1win.name=s2win.name;
     --事件驅動的流處理模式下,如果StreamJob使用事件時間處理,那在建Stream表時 需要額外新增"use.lowlevel.consumer"="true"這個表屬性。