transwarp Slipstream 簡介之實戰應用
Application間的資源隔離
Exg:使用者在基於某個App下的Stream時,只能在改App下檢視該App下的StreamJob;當用戶退出該App時,將無法檢視其他App下的StreamJob只能檢視當前App下的StreamJob。
流上的統計
Emily接到了老闆的第一個任務:如何實現對網站訪問次數做統計。假設源資料如下:
27.0.1.125,www.transwarp.io/home.html,2016-8-14 20:12:31.132
54.231.66.16,www.transwarp.io/product.html,2016-8-14 20:43:31.213
72.21.203.5,www.transwarp.io/case.html,2016-8-14 20:45:31.132
207.241.224.2,www.transwarp.io/product.html,2016-8-14 20:46:15.540
12.129.206.133,www.transwarp.io/product.html,2016-8-14 20:47:21.332
208.111.148.7,www.transwarp.io/home.html,2016-8-14 20:50:31.876
- Emily建立一個流和一個表:
CREATE STREAM accesslog(ip STRING, url STRING, time TIMESTAMP) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ',' TBLPROPERTIES("topic"="accesslog","kafka.zookeeper"="172.16.1.128:2181"
,"kafka.broker.list"="172.16.1.128:9092");
按系統時間統計每10秒各個 url 有多少訪問量:
CREATE TABLE result(url STRING, count INT);
CREATE STREAM waccesslog AS SELECT * FROM accesslog STREAMWINDOW (LENGTH '10' SECOND SLIDE '10'
SECOND);
INSERT INTO result SELECT url, COUNT(*) FROM waccesslog GROUP BY url;
- 按訊息時間統計每10秒各個 url 有多少訪問量:
CREATE TABLE result2(url STRING, count INT);
CREATE STREAM accesslog2(ip STRING, url STRING, time TIMESTAMP)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
TBLPROPERTIES ("topic"="accesslog", "kafka.zookeeper"="172.16.1.128:2181","kafka.broker.list"
="172.16.1.128:9092","timefield"="time", "timeformat"="yyyy-MM-dd HH-mm-ss.SSS");
CREATE STREAM waccesslog2 AS SELECT * FROM accesslog2 STREAMWINWDOW sw AS (SEPARATED BY time LENGTH
'10' SECOND SLIDE '10' SECOND);
INSERT INTO result2 SELECT url, COUNT(*) FROM waccesslog2 GROUP BY url;
- 按時間欄位切分的多流關聯
由於基於Slipstream實現比較快,Emily接到新的任務將原有sql遷移,需要將兩個流和一個維度表按照資料 欄位時間做切分。現有期貨價格和現貨價格兩個流, 分別有ID, 時間, 和價格三個欄位,例如:
qihuo
2016-8-14 20:50:00,1,50.20
2016-8-14 20:50:00,2,65.40
2016-8-14 20:50:00,3,31.30
2016-8-14 20:50:01,1,50.80
2016-8-14 20:50:01,2,65.10
2016-8-14 20:50:01,3,29.90
…
xianhuo
2016-8-14 20:50:00,1,55.10
2016-8-14 20:50:00,2,67.20
2016-8-14 20:50:00,3,33.10
2016-8-14 20:50:01,1,55.20
2016-8-14 20:50:01,2,66.70
2016-8-14 20:50:01,3,30.30
Emily先建立兩個流:
CREATE STREAM qihuo2(timestamp STRING, id STRING, price DOUBLE) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ',' TBLPROPERTIES ('topic'='qihuo', 'timefield'='timestamp', 'timeformat'='yyyy-MM-dd
HH-mm-ss', "kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="172.16.1.128:9092");
CREATE STREAM xianhuo2(timestamp STRING, id STRING, price DOUBLE) ROW FORMAT DELIMITED FIELDS
TERMINATED BY ',' TBLPROPERTIES ('topic'='xianhuo', 'timefield'='timestamp', 'timeformat'='yyyy-MM-
dd HH-mm-ss', "kafka.zookeeper"="tw-node127:2181", "kafka.broker.list"="172.16.1.128:9092");
按訊息時間對兩個流進行關聯, 並求現貨於期貨價格之差, 轉化了 (時間, ID, 現貨期貨之差) 為欄位的數 據流:
CREATE STREAM transform_stream2 AS
SELECT
qihuo.timestamp,
qihuo.id,
(xianhuo.price - qihuo.price) AS diff
FROM
qihuo2 qihuo JOIN xianhuo2 xianhuo
ON
qihuo.timestamp=xianhuo.timestamp
AND
qihuo.id=xianhuo.id;
在上述資料流上,每隔1秒,取長度為3分鐘的視窗. 轉化為 “最近3分鐘” 的資料流:
CREATE STREAM window_diff_stream AS
SELECT * FROM transform_stream2
STREAMWINDOW w1 AS (LENGTH '3' MINUTE SLIDE '1' SECOND);
最終求取最近3分鐘內, 每個ID現貨期貨之差的最大值和最小值,以及它們的差:
CREATE VIEW max_min_diff_window_stream AS SELECT maxTable.id, maxTable.diff AS maxDiff,
maxTable.timestamp AS maxTime, minTable.diff AS minDiff, minTable.timestamp AS minTime,
(maxTable.diff - minTable.diff) AS maxMinDiff, minTable.maxtime AS calTime FROM
(SELECT id,
timestamp,
diff,
maxtime FROM (
SELECT id,
timestamp,
diff,
row_number() OVER (PARTITION BY id ORDER BY diff ) AS minDiff,
max(timestamp) OVER (PARTITION BY id) AS maxtime FROM window_diff_stream)
WHERE minDiff=1) minTable
JOIN
(SELECT id,
timestamp,
diff FROM (
SELECT id,
timestamp,
diff,
row_number() OVER (PARTITION BY id ORDER BY diff DESC ) AS maxDiff
FROM window_diff_stream)
WHERE maxDiff=1) maxTable
ON minTable.id = maxTable.id;mily觸發計算:
CREATE TABLE result(maxTableId STRING, maxDiff DOUBLE, maxTime STRING, minDiff DOUBLE, minTime
STRING, maxMinDiff DOUBLE, calTime STRING);
SET streamsql.use.eventtime=true;
INSERT INTO result SELECT * FROM max_min_diff_window_stream;
流上的PL/SQL
當她老闆得知Slipstream還支援PLSQL時,讓她調研是否將原有的PLSQL程式碼遷移到Slipstream上。除了上述 期貨和現貨表, 現在還有一個交易表, 包含ID,時間,以及交易量, Emily要對每個ID累計從開市到當前時間的 交易量。另外一個需求是當 max_min_diff_window_stream 這個 stream 中 maxmindiff 大於等於 50 時, 觸發警報, 選取day_sum中, 累計值前20的記錄, 插入警告表:
CREATE STREAM transaction_stream(timestamp STRING, id STRING, transaction DOUBLE) ROW FORMAT
DELIMITED FIELDS TERMINATED BY ',' TBLPROPERTIES ('topic'='transaction', "kafka.zookeeper"="tw-
node127:2181");
CREATE TABLE day_sum(id STRING , sd STRING, total DOUBLE) ROW FORMAT DELIMITED FIELDS TERMINATED BY
',';
CREATE TABLE warm_transaction (id STRING, timestamp STRING, total DOUBLE);
CREATE VIEW transaction_sum AS SELECT id, timestamp, sum(s.transaction) total FROM
transaction_stream s GROUP BY id, timestamp;
SET plsql.show.sqlresults=true;
SET stream.enabled=true;
SET stream.tables=qihuo2,xianhuo2,transaction_stream;
DECLARE
threshold_count int := 0
BEGIN
INSERT INTO day_sum SELECT id, sd, CASE WHEN isnull(total2) THEN total1 ELSE (total1 + total2) END
total FROM
(SELECT t1.id id, t1.timestamp sd , t1.total total1, t2.total total2 FROM transaction_sum t1 LEFT
JOIN day_sum t2 ON t1.id=t2.id AND (to_unix_timestamp(t2.sd, 'yyyy-MM-dd HH:mm:ss') +
1)=unix_timestamp(t1.timestamp, 'yyyy-MM-dd HH:mm:ss'))
CREATE STREAM transaction_stream(timestamp STRING, id STRING, transaction DOUBLE) ROW FORMAT
DELIMITED FIELDS TERMINATED BY ',' TBLPROPERTIES ('topic'='${db}.transaction', "kafka.zookeeper"
="tw-node127:2181");
SELECT count(*) INTO threshold_count FROM max_min_diff_window_stream WHERE maxmindiff >= 50
IF threshold_count > 0 THEN
INSERT INTO warm_transaction SELECT id, sd, total FROM day_sum ORDER BY total DESC LIMIT 20
END IF END;
StreamJob的持久化
Emily可以熟練使用Slipstream了,但有一天當她配置了一些資訊後,應用意外終止了,因此她開始使 用StreamJob做持久化:
CREATE STREAMJOB access_log_streamjob AS ("INSERT INTO result SELECT url, COUNT(*) FROM waccesslog
GROUP BY url");
START STREAMJOB access_log_streamjob;
郵件警告
在每臺機器的/etc/inceptor1/conf/alert4j.properties中配置:
alert4j.service=email
email.server.host=smtp.exmail.qq.com
email.server.port=465
email.server.ssl=true
email.validate=true
email.sender.username[email protected].io
email.sender.password=password
email.from.address[email protected].io
email.to.addresses[email protected].io
當系統發生問題時,系統會發郵件到[email protected], 其中包含了關於系統問題的重要資訊.
高可用性
通過Transwarp Manager管理介面配置額外的InceptorServer,設定它的role為Standby。 設定完畢後將其啟 動。
使用 checkpoint 和 WAL 保證不丟失資料:
第一步, 通過Transwarp Manager管理介面修改InceptorServer配置,新增”stream.driver.checkpoint.dir” 配置項,並將其設定為HDFS上的某個目錄。配置完畢後,重啟Inceptor。
第二步, 建立 application 時需要指定 application 的 checkpoint 目錄,設定為HDFS上的某個目錄:
CREATE APPLICATION app1 WITH APPPROPERTIES("application.checkpoint.dir"="/tmp/app1/",
"application.enable.wal"="true");
或者對於已有的 application, 設定這兩個屬性:
ALTER APPLICATION app1 SET APPPROPERTIES("application.checkpoint.dir"="/tmp/app1/");
ALTER APPLICATION app1 SET APPPROPERTIES("application.enable.wal"="true");
Holodesk配置
Emily老闆告訴她,網站訪問次數統計功能非常有用,有好幾個其他小組都在使用該統計結果。然而,由於當 前資料存在Hive Table,查詢速度太慢。老闆讓她想想有沒有什麼辦法使查詢速度更快。Emily想到組 件Holodesk可以用於快速查詢分析,因此她將結果表設定為Holodesk,並做了相關引數設定:
CREATE TABLE holo(url STRING, count INT) STORED AS HOLODESK TBLPROPERTIES("holodesk.window.length"
="100000","holodesk.window.timeout"="100000");
CREATE STREAM w_accesslog AS SELECT * FROM accesslog STREAMWINDOW sw AS (LENGTH '10' SECOND SLIDE
'10' SECOND) TBLPROPERTIES("kafka.zookeeper"="tw-node127:2181");
INSERT INTO holo SELECT url, COUNT(*) FROM w_accesslog GROUP BY url;
Slipstream引數整理
SET =;