1. 程式人生 > 實用技巧 >2. kafka核心概念以及簡單操作

2. kafka核心概念以及簡單操作

kafka消費模式

我們知道訊息佇列傳輸的是訊息,那麼這個訊息如何傳遞也是很重要的一環。一般訊息佇列支援兩種傳遞模式:

  • 點對點模式:

生產者將生產的訊息傳送到queue中,然後消費者再從queue中取出訊息進行消費。訊息一旦被消費,那麼queue中不再有儲存,所以消費者不可能消費到已經被消費的資訊。queue支援多個消費者同時消費,但是一個訊息只能被一個消費者消費,不存在說多個消費者同時消費一個訊息。日常生活中就好電話客服服務,同一個客戶呼入電話,只能被一位客服人員處理,第二個客服人員不能為該客戶服務。

  • 釋出訂閱模式

和點對點模型不同,它有一個主題(Topic)的概念。該模型也有傳送方和接收方,只不過叫法不一樣。傳送方也被稱為釋出者(publisher),接收方被稱為訂閱者(subscriber)。和點對點模型不一樣,這個模型可以存在多個釋出者和多個訂閱者,它們都能接收到相同主題的訊息。好比微信公眾號,一個公眾號可以有多個訂閱者,一個訂閱者也可以訂閱多個公眾號。

kafka核心概念

在kafka的世界中有很多概念和術語是需要我們提前理解並且熟練掌握的,下面來盤點一下。

之前我們提到過,kafka屬於分散式的訊息引擎系統,主要功能是提供一套完善的訊息釋出與訂閱方案。在kafka中,釋出訂閱的物件是主題(topic),可以為每個業務、每個應用、甚至是每一類資料都建立專屬的主題。

向主題釋出訊息的客戶端應用程式稱為生產者(producer),生產者通常持續不斷地向一個或多個主題傳送訊息,而訂閱這些主題獲取訊息的客戶端應用程式就被稱之為消費者(consumer)。和生產者類似,消費者也能同時訂閱多個主題。我們把生產者和消費者統稱為客戶端(clients)。你可以同時執行多個生產者和消費者例項,這些例項不斷地向kafka叢集中的多個主題生產和消費訊息。有客戶端自然也就有服務端,kafka的伺服器端由被稱為broker的服務程序構成,即一個kafka叢集由多個broker組成,broker負責接收和處理客戶端發來的請求,以及對訊息進行持久化。雖然多個broker程序能夠執行在同一臺機器上,但更常見的做法是將不同的broker分散執行在不同的機器上。這樣即便叢集中的某一臺機器宕機,執行在其之上的broker程序掛掉了,其他機器上的broker也依舊能對外提供服務,這其實就是kafka提供高可用的手段之一。

在實現高可用的另一個手段就是備份機制(replication)。備份的思想很簡單,就是把相同的資料拷貝到多臺機器上,而這些相同的資料拷貝就叫做副本(replica)。副本的數量是可以配置的,這些副本儲存著相同的資料,但卻有不同的角色和作用。kafka定義了兩種副本,領導者副本(leader replica)和追隨者副本(follower replica)。前者對外提供服務,這裡的對外指的是與客戶端進行互動;而後者只是被動地追隨領導者副本而已,不與外界進行互動。當然了,很多其他系統中追隨者副本是可以對外提供服務的,比如mysql,從庫是可以處理讀操作的,也就是所謂的"主寫從讀",但是在kafka中追隨者副本不會對外提供服務,至於為什麼我們作為思考題解答。對了,關於領導者--追隨者,之前其實是叫做主(master)--從(slave),但是不建議使用了,因為slave有奴隸的意思,政治上有點不合適,所以目前大部分的系統都改成leader-follower了。

副本的工作機制很簡單:生產者向主題寫的訊息總是往領導者那裡,消費者向主題獲取訊息也都是來自於領導者。也就是無論是讀還是寫,針對的都是領導者副本,至於追隨者副本,它只做一件事情,那就是向領導者副本傳送請求,請求領導者副本把最新生產的訊息傳送給它,這樣便能夠保持和領導者的同步。

雖然有了副本機制可以保證資料的持久化或者資料不丟失,但沒有解決伸縮性的問題。伸縮性即所謂的scalability,是分散式系統中非常重要且必須謹慎對待的問題。什麼是伸縮性呢?我們拿副本來說,雖然現在有了領導者副本和追隨者副本,但倘若領導者副本積累了太多的資料以至於單臺broker都無法容納了,此時應該怎麼辦?有個很自然的想法就是,能否把資料分割成多分儲存在不同的broker上?沒錯,kafka就是這麼設計的。

這種機制就是所謂的分割槽(partition)。如果瞭解其他的分散式系統,那麼可能聽說過分片、分割槽域等提法,比如MongoDB和ElasticSearch中的sharding、Hbase中的region,其實它們都是相同的原理,只是partition是最標準的名稱。

kafka中的分割槽機制指定的是將每個主題劃分為多個分割槽,每個分割槽都是一組有序的訊息日誌。生產者生產的每一條訊息只會被髮到一個分割槽中,也就說如果向有兩個分割槽的主題傳送一條訊息,那麼這條訊息要麼在第一個分割槽中,要麼在第二條分割槽中。而kafka的分割槽編號是從0開始的,如果某個topic有100個分割槽,那麼它們的分割槽編號就是從0到99。

到這裡可能會有疑問,那就是剛才提到的副本如何與這裡的分割槽聯絡在一起呢?實際上,副本是在分割槽這個層級定義的。每個分割槽下可以配置若干個副本,其中只能有1個領導者副本和N-1個追隨者副本。生產者向分割槽寫入訊息,每條訊息在分割槽中的位置由一個叫位移(offset)的資料來表徵。分割槽位移總是從0開始,假設一個生產者向一個空分割槽寫入了10條訊息,那麼這10條訊息的位移依次是0、1、2、...、9。


至此我們能完整地串聯起kafka的三層訊息架構:

  • 第一層是主題層,每個主題可以配置M個分割槽,每個分割槽又可以配置N個副本
  • 第二層是分割槽層,每個分割槽的N個副本中只能有一個副本來充當領導者角色,對外提供服務;其他的N-1個副本只是追隨者副本,用來提供資料冗餘之用。
  • 第三層是訊息層,分割槽中包含若干條訊息,每條訊息的位移從0開始,依次遞增。
  • 最後客戶端程式只能與分割槽的領導者副本進行互動

那麼kafka是如何持久化資料的呢?總的來說,kafka使用訊息日誌(log)來儲存資料,一個日誌就是磁碟上一個只能追加寫(append-only)訊息的物理檔案。因為只能追加寫入,故避免了緩慢的隨機I/O操作,改為效能較好的順序I/O操作,這也是實現kafka高吞吐量特性的一個重要手段。不過如果不停地向一個日誌寫入訊息,最終也會耗盡所有的磁碟空間,因此kafka必然要定期地刪除訊息以回收磁碟。怎麼刪除?簡單來說就是通過日誌段(log segment)機制。在kafka底層,一個日誌又進一步細分成多個日誌段,訊息被追加寫到當前最新的日誌段中,當寫滿了一個日誌段後,kafka會自動切分出一個新的日誌段,並將老的日誌段封存起來。kafka在後臺還有定時任務會定期地檢查老的日誌段是否能夠被刪除,從而實現回收磁碟的目的。

這裡再重點說一下消費者,之前說過有兩種訊息模型,即點對點模型(peer to peer,p2p)和分佈訂閱模型。這裡面的點對點指的是同一條訊息只能被下游的一個消費者消費,其他消費者不能染指。在kafka中實現這種p2p模型的方法就是引入了消費者組(consumer group)。所謂的消費者組,指的是多個消費者例項共同組成一個組來消費一個主題。這個主題中的每個分割槽都只會被消費者組裡面的一個消費者例項消費,其他消費者例項不能消費它。為什麼要引入消費者組呢?主要是為了提升消費者端的吞吐量,多個消費者例項同時消費,加速了整個消費端的吞吐量(TPS)。關於消費者組的機制,後面會詳細介紹,現在只需要知道消費者組就是多個消費者組成一個組來消費主題裡面的訊息、並且訊息只會被組裡面的一個消費者消費即可。此外,這裡的消費者例項可以是執行消費者應用的程序,也可以是一個執行緒,它們都稱為一個消費者例項(consumer instance)。

消費者組裡面的消費者不僅瓜分訂閱主題的資料,而且更酷的是它們還能彼此協助。假設組內某個例項掛掉了,kafka能夠自動檢測,然後把這個Failed例項之前負責的分割槽轉移給其他活著的消費者。這個過程就是大名鼎鼎的"重平衡(rebalance)"。嗯,其實即是大名鼎鼎,也是臭名昭著,因為由重平衡引發的消費者問題比比皆是。事實上,目前很多重平衡的bug,整個社群都無力解決。

每個消費者在消費訊息的過程中,必然需要有個欄位記錄它當前消費到了分割槽的哪個位置上,這個欄位就是消費者位移(consumer offset)。注意,我們之前說一個主題可以有多個分割槽、每個分割槽也是用位移來表示訊息的位置。但是這兩個位移完全不是一個概念,分割槽位移表示的是分割槽內的訊息位置,它是不變的,一旦訊息被成功寫入一個分割槽上,那麼它的位置就是固定了的。而消費者位移則不同,它可能是隨時變化的,畢竟它是消費者消費進度的指示器嘛。另外每個消費者都有著自己的消費者位移,因此一定要區分這兩類位移的區別。一個是分割槽位移,另一個是消費者位移。

小結:

  • 生產者,producer向主題釋出新訊息的應用程式
  • 消費者,consumer從主題訂閱新訊息的應用程式
  • 訊息,recordkafka是訊息引擎,這裡的訊息就是指kafka處理的主要物件
  • 主題,topic主題是承載訊息的邏輯容器,在實際使用中多用來區分具體的業務,即不同的業務對應不同的主題。
  • 分割槽,partition一個有序不變的訊息序列,每個主題下可以有多個分割槽。分割槽編號從0開始,分佈在不同的broker上面,實現釋出於訂閱的負載均衡。生產者將訊息傳送到主題下的某個分割槽中,以分割槽偏移(offset)來標識一條訊息在一個分割槽當中的位置(唯一性)
  • 分割槽位移,offset表示分割槽中每條訊息的位置資訊,是一個單調遞增且不變的值
  • 副本,replicakafka中同一條資料能夠被拷貝到多個地方以提供資料冗餘,這便是所謂的副本。副本還分為領導者副本和追隨者副本,各自有各自的功能職責。讀寫都是針對領導者副本來的,追隨者副本只是用來和領導者副本進行資料同步、保證資料冗餘、實現高可用。
  • 消費者位移,consumer offset表示消費者消費進度,每個消費者都有自己的消費者位移
  • 消費者組,consumer group多個消費者例項共同組成的一個組,同時消費多個分割槽以實現高吞吐。
  • 重平衡,rebalance消費者組內某個消費者例項掛掉之後,其它消費者例項自動重新分配訂閱主題分割槽的過程。重平衡是kafka消費者端實現高可用的重要手段

思考:為什麼kafka不像mysql那樣支援主寫從讀呢?

因為kafka的主題已經被分為多個分割槽,分佈在不同的broker上,而不同的broker又分佈在不同的機器上,因此從某種角度來說,kafka已經實現了負載均衡的效果。不像mysql,壓力都在主上面,所以才要從讀;另外,kafka儲存的資料和資料庫的資料有著實質性的差別,kafka儲存的資料是流資料,具有消費的概念,而且需要消費者位移。所以如果支援從讀,那麼消費端控制offset會更復雜,而且領導者副本同步到追隨者副本需要時間的,會造成資料不一致的問題;另外對於生產者來說,kafka是可以通過配置來控制是否等待follower對訊息確認的,如果支援從讀的話,那麼也需要所有的follower都確認了才可以回覆生產者,造成效能下降,而且follower出現了問題也不好處理。

命令列操作topic增刪查

還記得嗎?我們之前說bin目錄裡面有5個sh檔案非常常用,再來回憶一下:

  • kafka-server-start.sh: 用於啟動kafka叢集, 比如在bin目錄下可以通過./kafka-server-start.sh -daemon ../config/server.properities啟動kafka叢集
  • kafka-server-stop.sh: 用於關閉kafka叢集, 比如在bin目錄下可以通過./kafka-server-stop.sh ../config/server.properities關閉kafka叢集
  • kafka-topics.sh: 用於對主題進行操作, 比如建立、刪除、查詢主題
  • kafka-console-consumer.sh: 控制檯啟動一個消費者
  • kafka-console-producer.sh: 控制檯啟動一個生產者

下面我們就在bin目錄中啟動kafka叢集,當然我們這裡是單機,不過也可以叫叢集,雖然只有一臺,但我說它是叢集它就是,只不過機器少了點。

我們看到此時就啟動成功了,下面我們看看如何來操作主題,畢竟它是訊息的載體。

檢視所有的topic:

kafka-topics.sh --list --zookeeper localhost:2181

可以看到,由於我們還沒有建立,所以此時還沒有主題。這裡要指定--zookeeper,因為主題、分割槽資訊,以及broker分佈情況都儲存在zookeeper中。

建立topic:

kafka-topics.sh --create --zookeeper localhost:2181 --topic 主題名 --partitions 分割槽數 --replication-factor 副本數

注意:副本數不能超過你broker的數量,因為我們只有一臺機器,所以副本數是1,但是分割槽在一臺broker上是可以有多個的。

如果建立一個已存在的主題會報錯:

提示我們主題"matsuri"已存在。

刪除topic:

kafka-topics.sh --delete --zookeeper localhost:2181 --topic 主題名

這裡我們看到分割槽"matsuri"被標記為刪除,但如果沒有將delete.topic.enable設定為true,那麼這個分割槽是不會被真正刪除的。那麼我們到底有沒有真正刪除呢?我們看看配置檔案就知道了,或者說再建立一個satori,如果存在會報錯的。

可以看到,刪除的時候是真的將分割槽給刪除了。

檢視topic資訊:

kafka-topics.sh --describe --zookeeper localhost:2181 --topic 主題名

Python連線kafka操作主題

下面我們來看看如何使用Python連線kafka,Python連線kafka使用一個叫做pykafka的第三方模組,直接pip install pykafka即可。

from pykafka.cluster import Cluster
try:
    # 如果本地沒有gevent的話, 那麼匯入這個庫就會出現錯誤
    from pykafka.handlers import GEventHandler as hander
except ImportError:
    from pykafka.handlers import ThreadingHandler as hander

# 建立叢集連線
kafka_cluster = Cluster("47.94.174.89:9092", hander())

# 檢視已存在的主題
print(kafka_cluster.topics)  # {b'matsuri': <pykafka.topic.Topic at 0x29f0e792220 (name=b'matsuri')>}

# 但是注意, 雖然返回的結果是一個字典, 但它實際上是一個TopicDict物件
print(type(kafka_cluster.topics))  # pykafka.cluster.TopicDict

# 裡面的value是一個Topic物件
topic_matsuri = kafka_cluster.topics[b"matsuri"]
print(topic_matsuri)  # <pykafka.topic.Topic at 0x29f0e8529a0 (name=b'matsuri')>
print(type(topic_matsuri))  # <class 'pykafka.topic.Topic'>

當然建立叢集連線還有一個簡單的辦法:

from pykafka import KafkaClient

# 在內部會自動建立pykafka.cluster.Cluster物件
kafka_client = KafkaClient("47.94.174.89:9092")
print(kafka_client.topics is kafka_client.cluster.topics)  # True

關於叢集的建立、刪除,pykafka沒有提供特別好的介面,我們使用kafka_client.topics["xxx"]或者kafka_cluster["xxx"]的時候,如果主題"xxx"不存在,那麼會自動使用預設配置建立一個分割槽,但是生產環境我們肯定不會這麼做。我們在使用pykafka的時候,更多是往已存在的主題裡面傳送、接收資料。

控制檯生產者消費者測試

啟動生產者:

kafka-console-producer.sh --topic 主題 --broker-list ip:9092,注意這裡是--broker-list,也就是broker的地址。但還是那句話,不可以寫localhost。

啟動消費者:

kafka-console-consumer.sh --topic 主題 --bootstrap-server ip:9092


然後生產者生產資料,我們隨便寫幾句話吧。

然後看看消費者有什麼反應

我們看到消費者全部都接收到了。


資料預設保留7天,超過7天就會刪除。但是還有一個問題,要是消費者啟動之前,生產生產訊息了,怎麼辦?顯然此時的消費者是接收不到的,因此我們可以加上一個--from-beginning引數,這樣的話就可以把訊息全部消費掉。

關閉消費者之後,生產者又生產了兩條訊息,然後啟動消費者。

Python啟動生產者和消費者

下面問題來了,我們如何使用Python啟動生產者和消費者呢?

>>> from pykafka import KafkaClient
>>> 
>>> kafka_client = KafkaClient("47.94.174.89:9092")
>>> 
>>> topic_matsuri = kafka_client.topics["matsuri"]
>>> 
>>> consumer = topic_matsuri.get_simple_consumer()  # 啟動消費者,從topic_matsuri從獲取訊息
>>> for msg in consumer:
...     print(f"{msg.offset}, {msg.value.decode('utf-8')}")
... 
0, やっぱりるるちゃんが可愛いよ
0, 言いたいことがあるんだよ
1, 俺と一緒に人生を歩もう
1, すきすき大好き
2, 手コキ
2, 見て見て帕裡桑

這裡我們啟動了一個消費者,但是我們看到自動把過往的訊息全給收到了,然後我們再啟動生產者:

from pykafka import KafkaClient

kafka_client = KafkaClient("47.94.174.89:9092")

topic_matsuri = kafka_client.topics["matsuri"]

with topic_matsuri.get_sync_producer() as producer:  # 啟動生產者
    while True:
	val = input(">>>: ")
    	producer.produce(f"send message: {val}".encode("utf-8"))

隨便傳送幾條訊息,然後再看看消費者。

我們看到消費者已經接收到訊息了。