1. 程式人生 > >KSQL介紹:面向Apache Kafka的開源Streaming SQL引擎

KSQL介紹:面向Apache Kafka的開源Streaming SQL引擎

我非常高興地宣佈KSQL,這是面向Apache Kafka的一種資料流SQL引擎。KSQL降低了資料流處理這個領域的准入門檻,為使用Kafka處理資料提供了一種簡單的、完全互動的SQL介面。你不再需要用Java或Python之類的程式語言編寫程式碼了!KSQL具有這些特點:開源(採用Apache 2.0許可證)、分散式、可擴充套件、可靠、實時。它支援眾多功能強大的資料流處理操作,包括聚合、連線、加窗(windowing)和sessionization(捕獲單一訪問者的網站會話時間範圍內所有的點選流事件)等等。

文章目錄

一個簡單的例子

Introducing KSQL: Open Source Streaming SQL for Apache Kafka
如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

查詢流式資料意味著什麼?這與SQL資料庫相比怎樣?

可以說它實際上與SQL資料庫大不一樣。大多數資料庫用於對儲存的資料執行按需查詢和改動。KSQL還無法執行查詢,它所做的是連續轉換――也就是說,資料流處理。比如設想一下:我有來自使用者的點選流和帳戶資訊表(這些資訊關於不斷更新的那些使用者)。KSQL讓我可以對該點選流和使用者表進行建模,並將兩者連線起來,儘管那兩者當中的一個是無限的。

所以,KSQL對Kafka話題中的資料流執行連續查詢――隨著新資料不斷流入,轉換在連續進行。相比之下,針對關係資料庫的查詢是一次性查詢――對資料集執行一次即可完成,就像針對資料庫中有限行的SELECT語句。

KSQL適用於什麼?

很好,現在你可以連續查詢無限資料流。那有什麼好處呢?

1、實時監控遇上實時分析

CREATE TABLE error_counts AS

SELECT error_code, count(*)FROM monitoring_stream

WINDOW TUMBLING (SIZE 1 MINUTE)

WHERE type = 'ERROR'

這方面的一個用途是,定義實時計算的自定義業務級別度量指標,你可以監控併發出警報,就像監控CPU負載那樣。另一個用途是,在KSQL中為應用程式定義正確性概念,並核實它在生產環境中執行時滿足這個概念。我們一提到監控,常常想到跟蹤低級別效能統計數字的計數器(counter)和計量器(gauge)。這些種類的計量器常常可以告訴你CPU負載很高,但其實無法告訴你應用程式是否在做它應該做的事情。KSQL允許針對應用程式生成的原始事件流定義自定義度量指標,無論它們是日誌事件、資料庫更新還是其他任何型別的事件。

比如說,一個Web應用程式可能需要核實:每當新客戶註冊,就傳送歡迎電子郵件,建立新的使用者記錄,並對其信用卡計費。這些功能可能分散在不同的服務或應用程式中,你需要監控,確保對每個新客戶而言,每個操作都在某個服務級別協議(SLA)裡面(比如30秒)。

2、安全和異常檢測

CREATE STREAM possible_fraud AS

SELECT card_number, count(*)

FROM authorization_attempts

WINDOW TUMBLING (SIZE 5 SECONDS)

GROUP BY card_number

HAVING count(*) > 3;

這方面的簡單版本是你在上面演示中看到的:KSQL查詢將事件流轉換成數值時間序列聚合,這些聚合使用Kafka-Elastic連線件輸入到Elastic,並在Grafana UI中加以視覺化。安全用例常常酷似監控和分析。你尋找欺詐、濫用、垃圾郵件、入侵或其他不良行為方面的模式,而不是監控應用程式行為或業務行為。KSQL為定義這些模式並查詢實時資料流提供了一種簡單、先進、實時的方法。

3、聯機資料整合

CREATE STREAM vip_users AS

SELECT userid, page, action

FROM clickstream c

LEFT JOIN users u ON c.userid = u.user_id

WHERE u.level = 'Platinum';

許多公司進行的資料處理大部分屬於資料豐富(data enrichment)這個範疇:拿來來自幾個資料庫的資料,將其轉換,連線起來,並將資料儲存到鍵值儲存庫、搜尋索引、快取或其他資料服務系統。長期以來,用於資料整合的ETL(提取、轉換和載入)作為定期的批處理作業來加以執行。比如說,實時轉儲原始資料,然後每隔幾小時進行轉換,以實現高效查詢。對於許多用例而言,這種延遲是不可接受的。如果結合Kafka連線件使用,KSQL能夠實現由批量資料整合轉變為聯機資料整合。你可以使用資料流-表連線,藉助儲存在表中的元資料來豐富資料流,或者將資料流載入到另一個系統之前,對PII(個人身份資訊)資料執行簡單的過濾。

4、應用程式開發

許多應用程式將輸入資料流轉換成輸出資料流。比如說,負責為線上商店重新排序庫存少的產品的程序可能需要銷售和發貨方面的資料流,才能計算訂單資料流。

至於用Java編寫的更復雜的應用程式,Kafka的原生資料流API可能正是我們所需要的。不過針對簡單的應用程式,或者對Java程式設計不感興趣的團隊,簡單的SQL介面也許正是它們所尋找的。

KSQL中的核心抽象

KSQL在內部使用Kafka的Streams API(https://kafka.apache.org/documentation/streams/),它們使用同樣的核心抽象來用於Kafka端的資料流處理。KSQL中有兩個核心抽象,它們對應於Kafka Streams中的兩個核心抽象,讓你可以處理Kafka主題:

1、STREAM:資料流是無限序列的結構化資料(“事實”,fact)。比如說,我們可能有一個財務交易資料流,比如“Alice向Bob打款100美元,然後Charlie向Bob打款50美元”。資料流中的事實是不可變的,這意味著新的事實可以插入到資料流中,但現有的事實根本無法被更新或刪除。資料流可以由Kafka主題來建立,或由現有的資料流和表來生成。

CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)

WITH (kafka_topic='pageviews', value_format=’JSON’);

2.、TABLE:表是STREAM或另一個TABLE的檢視,它表示不斷變化的事實的集合。比如說,我們可能有一個表,含有最新的財務資訊,比如“Bob當前的帳戶餘額是150美元”。它相當於傳統的資料庫表,但是由資料加窗之類的資料流語義加以豐富。表中的事實是不可變的,這意味著新的事實可以插入到表中,但現有的事實根本無法被更新或刪除。表可以由Kafka主題來建立,或由現有的資料流和表來生成。

CREATE TABLE users (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid  VARCHAR)

WITH (kafka_topic='users', value_format='DELIMITED');

KSQL簡化了資料流應用程式,因為它完全整合了表和資料流這兩個概念,允許將代表事實現狀的資料流與代表當前發生的事件的表連線起來。Apache Kafka中的主題可以表示為KSQL中的STREAM或TABLE,具體取決於主題處理的預期語義。比如說,如果你想把主題中的資料作為一系列獨立值來讀取,可以使用CREATE STREAM。這種資料流的一個示例是獲取頁面檢視事件的主題,其中每個頁面檢視事件是不相關的,彼此獨立。另一方面,如果你想把主題中的資料作為可更新值的不斷變化的集合來讀取,可以使用CREATE TABLE。說到應該在KSQL中作為TABLE來讀取的話題,這方面的一個例子是獲取使用者元資料的主題,其中每個事件表示特定使用者ID的最新元資料,無論是使用者的姓名、地址還是喜好選擇。

KSQL實戰:實時點選流分析和異常檢測

不妨看一個實際的演示。此演示顯示了你如何將KSQL用於實時監控、異常檢測和警報。針對點選流資料的實時日誌分析有好幾種形式。在本文例子中,我們標記出了在測試Web伺服器上佔用太多頻寬的惡意使用者會話。監控惡意使用者會話是sessionization的許多應用之一。不過籠統地說,會話是使用者行為分析的基本模組。一旦你按照特定的會話識別符號將使用者和事件關聯起來,就可以構建許多型別的分析機制,從簡單的度量指標(比如訪問次數),到更復雜的度量指標(比如客戶轉換漏斗和事件流),不一而足。我們在演示的最後環節顯示瞭如何在Elastic支援的Grafana儀表板上實時顯示KSQL查詢的輸出結果。

視訊播放器

 

00:00

05:09

 

 

內部結構

Introducing KSQL: Open Source Streaming SQL for Apache Kafka
如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

有一個KSQL伺服器程序執行查詢。一組KSQL程序作為一個叢集來執行。可以通過啟動KSQL伺服器的更多例項來動態新增更多的處理能力。這些例項具有容錯性:如果一個例項失效,另外幾個會接過它處理的工作。使用互動式KSQL命令列客戶軟體來啟動查詢,客戶軟體通過REST API向叢集傳送命令。命令列讓你可以檢查可用的資料流和表,執行新的查詢,檢查執行中查詢的狀態,並終止執行中查詢。在內部,KSQL是使用Kafka的Streams API構建的;它繼承了Kafka的彈性可擴充套件性、先進的狀態管理及容錯功能,還支援Kafka最近推出的只處理一次(exactly-once proecessing)語義。KSQL伺服器嵌入這個機制,另外添加了分散式SQL引擎(包括一些新穎的功能,比如提升查詢效能的位元組碼自動生成)以及用於查詢和控制的REST API。

Kafka + KSQL顛覆資料庫

過去我們談論了顛覆資料庫(https://www.confluent.io/blog/turning-the-database-inside-out-with-apache-samza/),現在我們通過為由內到外發生變化的資料庫新增SQL層來實現顛覆。

在關係資料庫中,表是核心抽象,日誌是實現細節。而在以事件為中心的世界,資料庫已被顛覆,核心抽象不是表,而是日誌。這些表只是來源於日誌,隨著新資料進入到日誌,表不斷更新。中央日誌是Kafka,KSQL是引擎,讓你可以建立所需的物化檢視,並將它們表示為持續更新的表。然後,你可以針對這類流式表執行時間點查詢(KSQL即將釋出該功能),為日誌中的每個鍵獲得最新值,採取持續不斷的方式。

Introducing KSQL: Open Source Streaming SQL for Apache Kafka
如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

使用Kafka和KSQL徹底顛覆資料庫,這對公司中可以用資料流方式來表示和處理的所有資料派什麼用場帶來了很大的影響。Kafka日誌是資料流的核心儲存抽象,允許進入到離線資料倉庫的相同資料現在可用於資料流處理。其他一切資料是基於日誌的流式物化檢視,無論是各種資料庫、搜尋索引還是公司中的其他資料服務系統。現在可以使用KSQL,以資料流的方式,執行建立這些派生檢視所需的所有資料豐富和ETL。監控、安全、異常及威脅檢測、分析以及故障應對可以實時執行,而不是為時太晚才執行。所有這些可供任何人使用,只要藉助一種對你的所有Kafka資料而言簡單又熟悉的SQL介面:KSQL。

Introducing KSQL: Open Source Streaming SQL for Apache Kafka
如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop

KSQL的下一站是什麼?

我們在釋出開發者預覽版的KSQL,開始圍繞它構建社群,並徵集反饋意見。我們計劃與開源社群合作,增添另外幾項功能,從而將它變成一種可準備部署到生產環境的系統:從KSQL的質量、穩定性和可操作性,直到支援更豐富的SQL語法(包括進一步的聚合功能和針對連續表的時間點SELECT),對迄今為止計算的資料執行快速查詢。

如何獲取KSQL?

你可以試一試KSQL快速入門和上述演示來實際體驗一下。歡迎你反饋缺少什麼功能,或者哪些方面可以改進:歡迎到Confluent Community Slack上的#KSQL頻道發表任何想法或反饋,如果你發現了錯誤,歡迎提交GitHub問題單;我們很樂意與早期採用者密切合作,所以請踴躍參與。我們期待與開源社群的其餘人合作,讓KSQL變成一項出色的技術。

本文英文原文:https://www.confluent.io/blog/ksql-open-source-streaming-sql-