1. 程式人生 > >最簡單流處理引擎——Kafka Streams簡介

最簡單流處理引擎——Kafka Streams簡介


Kafka在0.10.0.0版本以前的定位是分散式,分割槽化的,帶備份機制的日誌提交服務。而kafka在這之前也沒有提供資料處理的顧服務。大家的流處理計算主要是還是依賴於Storm,Spark Streaming,Flink等流式處理框架。

Storm,Spark Streaming,Flink流處理的三駕馬車各有各的優勢.

Storm低延遲,並且在市場中佔有一定的地位,目前很多公司仍在使用。

Spark Streaming藉助Spark的體系優勢,活躍的社群,也佔有一定的份額。

而Flink在設計上更貼近流處理,並且有便捷的API,未來一定很有發展。

但是他們都離不開Kafka的訊息中轉,所以Kafka於0.10.0.0版本推出了自己的流處理框架,Kafka Streams。Kafka的定位也正式成為Apache Kafka® is a distributed streaming platform,

分散式流處理平臺。

實時流式計算

近幾年來實時流式計算髮展迅速,主要原因是實時資料的價值和對於資料處理架構體系的影響。實時流式計算包含了 無界資料 近實時 一致性 可重複結果 等等特徵。a type of data processing engine that is designed with infinite data sets in mind 一種考慮了無線資料集的資料處理引擎。

1、無限資料:一種不斷增長的,基本上無限的資料集。這些通常被稱為“流式資料”。無限的流式資料集可以稱為無界資料,相對而言有限的批量資料就是有界資料。

2、無界資料處理:一種持續的資料處理模式,應用於上面的無界資料。批量處理資料(離線計算)也可以重複執行來處理資料,但是會有效能的瓶頸。

3、低延遲,近實時的結果:相對於離線計算而言,離線計算並沒有考慮延遲的問題。

解決了兩個問題,流處理可以提代批處理系統:

1、正確性:有了這個,就和批量計算等價了。

Streaming需要能隨著時間的推移依然能計算一定時間視窗的資料。Spark Streaming通過微批的思想解決了這個問題,實時與離線系統進行了一致性的儲存,這一點在未來的實時計算系統中都應該滿足。

2、推理時間的工具:這可以讓我們超越批量計算。

好的時間推理工具對於處理不同事件的無界無序資料至關重要。

而時間又分為事件時間和處理時間。

還有很多實時流式計算的相關概念,這裡不做贅述。

Kafka Streams簡介

Kafka Streams被認為是開發實時應用程式的最簡單方法。它是一個Kafka的客戶端API庫,編寫簡單的java和scala程式碼就可以實現流式處理。

優勢:

  • 彈性,高度可擴充套件,容錯

  • 部署到容器,VM,裸機,雲

  • 同樣適用於小型,中型和大型用例

  • 與Kafka安全性完全整合
  • 編寫標準Java和Scala應用程式
  • 在Mac,Linux,Windows上開發

  • Exactly-once 語義

用例:

紐約時報使用Apache Kafka和Kafka Streams將釋出的內容實時儲存和分發到各種應用程式和系統,以供讀者使用。

Pinterest大規模使用Apache Kafka和Kafka Streams來支援其廣告基礎架構的實時預測預算系統。使用Kafka Streams,預測比以往更準確。

作為歐洲領先的線上時尚零售商,Zalando使用Kafka作為ESB(企業服務匯流排),幫助我們從單一服務架構轉變為微服務架構。使用Kafka處理 事件流使我們的技術團隊能夠實現近乎實時的商業智慧。

荷蘭合作銀行是荷蘭三大銀行之一。它的數字神經系統Business Event Bus由Apache Kafka提供支援。它被越來越多的財務流程和服務所使用,其中之一就是Rabo Alerts。此服務會在財務事件時實時向客戶發出警報,並使用Kafka Streams構建。

LINE使用Apache Kafka作為我們服務的中央資料庫,以便彼此通訊。每天產生數億億條訊息,用於執行各種業務邏輯,威脅檢測,搜尋索引和資料分析。LINE利用Kafka Streams可靠地轉換和過濾主題,使消費者可以有效消費的子主題,同時由於其複雜而簡單的程式碼庫,保持易於維護性。

Topology

Kafka Streams通過一個或多個拓撲定義其計算邏輯,其中拓撲是通過流(邊緣)和流處理器(節點)構成的圖。

拓撲中有兩種特殊的處理器

  • 源處理器:源處理器是一種特殊型別的流處理器,沒有任何上游處理器。它通過使用來自這些主題的記錄並將它們轉發到其下游處理器,從一個或多個Kafka主題為其拓撲生成輸入流。
  • 接收器處理器:接收器處理器是一種特殊型別的流處理器,沒有下游處理器。它將從其上游處理器接收的任何記錄傳送到指定的Kafka主題。

在正常處理器節點中,還可以把資料發給遠端系統。因此,處理後的結果可以流式傳輸回Kafka或寫入外部系統。

Kafka在這當中提供了最常用的資料轉換操作,例如mapfilterjoinaggregations等,簡單易用。

當然還有一些關於時間,視窗,聚合,亂序處理等。未來再一一做詳細介紹,下面我們進行簡單的入門案例開發。

快速入門

首先提供WordCount的java版和scala版本。

java8+:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
 
import java.util.Arrays;
import java.util.Properties;
 
public class WordCountApplication {
 
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
 
}

scala:

import java.util.Properties
import java.util.concurrent.TimeUnit
 
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
object WordCountApplication extends App {
  import Serdes._
 
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }
 
  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")
 
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()
 
  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}

如果kafka已經啟動了,可以跳過前兩步。

1、下載

下載 2.3.0版本並解壓縮它。請注意,有多個可下載的Scala版本,我們選擇使用推薦的版本(2.12):

> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0

2、啟動

Kafka使用ZooKeeper,因此如果您還沒有ZooKeeper伺服器,則需要先啟動它。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

啟動Kafka伺服器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3、建立topic 啟動生產者

我們建立名為streams-plaintext-input的輸入主題和名為streams-wordcount-output的輸出主題:

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

檢視:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0

4、啟動WordCount

以下命令啟動WordCount演示應用程式:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示應用程式將從輸入主題stream-plaintext-input讀取,對每個讀取訊息執行WordCount演算法的計算,並連續將其當前結果寫入輸出主題streams-wordcount-output。因此,除了日誌條目之外不會有任何STDOUT輸出,因為結果會寫回Kafka。

現在我們可以在一個單獨的終端中啟動控制檯生成器,為這個主題寫一些輸入資料:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

並通過在單獨的終端中使用控制檯使用者讀取其輸出主題來檢查WordCount演示應用程式的輸出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

5、處理資料

我們在生產者端輸入一些資料。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

輸出端:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1

繼續輸入:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

我們看到隨著資料實時輸入,wordcount的結果實時的輸出了。

6、停止程式

您現在可以通過Ctrl-C按順序停止控制檯使用者,控制檯生產者,Wordcount應用程式,Kafka代理和ZooKeeper伺服器。

什麼是Kafka?
Kafka監控工具彙總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer

替代Flume——Kafka Connect簡介

更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算

相關推薦

簡單處理引擎——Kafka Streams簡介

Kafka在0.10.0.0版本以前的定位是分散式,分割槽化的,帶備份機制的日誌提交服務。而kafka在這之前也沒有提供資料處理的顧服務。大家的流處理計算主要是還是依賴於Storm,Spark Streaming,Flink等流式處理框架。 Storm,Spark Streaming,Flink流處理

[翻譯]Kafka Streams簡介: 讓處理變得更簡單

看到一篇不錯的譯文,再推送一撥 Introducing Kafka Streams: Stream Processing Made Simple 這是Jay Kreps在三月寫的一篇文章,用來介紹Kafka Streams。當時Kafka Streams

Kafka Streams:它是適合您的處理引擎嗎?

在這篇文章中,我們將詳細討論流媒體訪問模式以及HDF 3.3和即將釋出的HDP 3.1版本中Kafka Streams支援的增加。 在新增Kafka Streams支援之前,HDP和HDF支援兩個流處理引擎:帶有Storm的Spark Structured Streaming和Stream

EasyNVR無外掛直播伺服器軟體介面呼叫返回“Unauthorized”簡單處理方式

背景需求 對於EasyNVR的受眾群體十分的廣泛,不僅僅有將EasyNVR作為視訊直播平臺直接使用的,更多的是使用EasyNVR的對應功能整合到自身系統。對於前者,只需要將軟體的使用功能搞清楚即可,對於整合軟體功能的就需要自身具有一定的開發能力了。需要通過呼叫介面來滿足自身的需求

EasyNVR介面呼叫返回“Unauthorized”簡單處理方式

背景需求 對於EasyNVR的受眾群體十分的廣泛,不僅僅有將EasyNVR作為視訊直播平臺直接使用的,更多的是使用EasyNVR的對應功能整合到自身系統。對於前者,只需要將軟體的使用功能搞清楚即可,對於整合軟體功能的就需要自身具有一定的開發能力了。需要通過呼叫

簡單處理MVC中預設的Json方法返回時間的問題

利用 Json方法返回 資料時,如果有時間格式,會變成 “\/Date(1369419656217)\/” 這個樣子,問了同事找到個解決方法 using Newtonsoft.Json; using Newtonsoft.Json.Converters;

簡單的物理引擎

1 物理引擎 **維基百科 物理引擎指一個用來模擬物理系統的電腦軟體,用來模擬一些物理系統,如剛體,流體動力學等,主要用在視訊遊戲,電影等領域。 **開發人員 物理引擎設定一些引數,輸出一些結果

NiFi處理引擎

AttributeRollingWindow 1.3.0 AttributesToJSON 1.3.0 Base64EncodeContent 1.3.0 CaptureChangeMySQL 1.3.0 CompareFuzzyHash 1.3.0 CompressContent 1.3.0 Connec

Unity3D筆記——Socket粘包分包的理解和簡單處理方式

前言:Unity3D筆記是我平時做一些好玩的測試和研究,記錄的筆記。會比較詳細也可能隨口一提就過了。 所以大家見諒了,內容一般都會是原創的(非原創我會註明轉載)。由於很多內容其他的朋友也肯定研究發表過,大家請指出錯誤。 之前做的答題遊戲,在向伺服器申請題目列表時遇到

Esper複雜事件處理引擎 --告警關聯分析

Esper 釋出了2.2.0準備分析一下,以便應用到告警關聯分析 --------------------------------------------------- Esper是一個事件流處理(Event Stream Processing,ESP)和複雜事件處理(Co

Spark Streaming,Flink,Storm,Kafka Streams,Samza:如何選擇處理框架

![](https://img2020.cnblogs.com/blog/1089984/202006/1089984-20200610080225004-690722209.png) 根據最新的統計顯示,僅在過去的兩年中,當今世界上90%的資料都是在新產生的,每天建立2.5萬億位元組的資料,並且隨著新裝

規則引擎是什麽?簡單的解釋

規則引擎 drools ilog odm 在軟件行業裏面,規則引擎作為基礎軟件的一種,也是屬於比較神秘的一種存在,這不是說出來大家都能了然的一款產品,甚至有很多的IT人,在聽到這個詞的時候,表現出來的也是一頭霧水,“什麽是規則引擎?”通常都會聽到這樣的問題。我記得在一次展會上,一個

spring boot Thymeleaf模板引擎 簡單輸出例子

test leaf att map 控制器 輸出 span blog hello spring boot Thymeleaf模板引擎 最簡單輸出例子 控制器代碼如下: @GetMapping(value = "/test")public String test(Mo

基於qml創建簡單的圖像處理程序(1)-基於qml創建界面

cep font mes quit vid www 習慣 image ble 為什麽使用QT,包括進一步使用QML?兩個主要原因,一是因為我是一個c++程序員,有語言使用慣性;二是我主要做圖像處理方面工作,使用什麽平臺對於我來說不重要,我只需要在不同平臺上面能

基於qml創建簡單的圖像處理程序(2)-使用c++&qml進行圖像處理

.cn turn isnull 按鈕 編寫 可能 finish height 通過 《基於qml創建最簡單的圖像處理程序》系列課程及配套代碼基於qml創建最簡單的圖像處理程序(1)-基於qml創建界面http://www.cnblogs.com/jsxyhelu/p/83

基於qml創建簡單的圖像處理程序(3)-使用opencv&qml進行圖像處理

結果 tar isempty reat features eabi qt quick resources 也會 《基於qml創建最簡單的圖像處理程序》系列課程及配套代碼基於qml創建最簡單的圖像處理程序(1)-基於qml創建界面http://www.cnblogs.com/

基於Flume+Kafka+Spark Streaming打造實時處理項目實戰課程

大數據本課程從實時數據產生和流向的各個環節出發,通過集成主流的分布式日誌收集框架Flume、分布式消息隊列Kafka、分布式列式數據庫HBase、及當前最火爆的Spark Streaming打造實時流處理項目實戰,讓你掌握實時處理的整套處理流程,達到大數據中級研發工程師的水平!下載地址:百度網盤下載

Oracle教程(1)-Oracle簡介簡單的語句

Oracle11g資料庫的安裝swap建議要求不小於1G,也就是實體記憶體不小於2G 一,最簡單的查詢語句 【*】代表所有的列,它與select之後列出的所有的列名是一樣的 如果不用*,select後面加特定列,就可以查詢出特定列的資料 slect後面加多個特定名

大資料實時處理引擎比較

從流處理的核心概念,到功能的完備性,再到周邊的生態環境,全方位對比了目前比較熱門的流處理框架:Spark,Flink,Storm和 Gearpump。結合不同的框架的設計,為大家進行深入的剖析。與此同時,從吞吐量和延時兩個方面,對各個框架進行效能評估。 主要技術點:流失資料處理,Spark,

Spark Streaming實時處理筆記(6)—— Kafka 和 Flume的整合

1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se