1. 程式人生 > >Kafka學習(一)kafka指南(about雲翻譯)

Kafka學習(一)kafka指南(about雲翻譯)

begin 網絡 play 好的 align leader ensemble 驗證 實踐

kafka 權威指南中文版

問題導讀

1. 為什麽數據管道是數據驅動企業的一個關鍵組成部分?
2. 發布/訂閱消息的概念及其重要性是什麽?

第一章

初識 kafka

企業是由數據驅動的。我們獲取信息,分析它,處理它,並創造更多的產出。每一個應用程序都會產生數據,無論是日誌消息、指標、用戶行為、輸出報文或者其他類型。每一個字節的數據都有它的作用,傳入的數據會告訴接下來需要做什麽。為了知道數據的意義,我們需要把數據從它產生的地方,傳輸到它能夠被分析的地方。然後把分析的結果返回到它們能夠被執行的地方。 我們越快地做到這一點,我們的系統就能更敏捷,具有更快的響應。我們在移動數據花費的精力越少越少,我們就越能集中處理核心業務。這就是為什麽數據管道是數據驅動企業的一個關鍵組成部分。我們如何移動數據幾乎變的和數據本身一樣重要。


任何時候科學家們有分歧,那是因為我們沒有足夠的數據。我們可以在獲取怎樣的數據上產生統一的觀點。我們獲取了數據,然後數據解決了問題,要麽你是對的,要麽我是對的。然後我們就可以進行接下來的工作。

Publish / Subscribe Messaging 發布/訂閱消息

在討論 kafka 的特性之前,需要理解發布訂閱消息的概念及其重要性。發布-訂閱消息隊列的特征是消息的 sender(publisher)並不直接將 data(message)發送給receiver,publisher 以某種方法對消息進行分類,而 receiver (subscriber) 會訂閱接收特定類別的消息。 Pub/Sub 系統通常會有 broker(消息被發布到的中心點)來進行實現,消息將被發送至 broker。

How It Starts 由來


許多發布訂閱用例都始於相同的方式: 利用一個簡單的消息隊列或者進程間的通信。比如,你編寫的應用程序需要向某處發送監控信息,你可以從應用程序直接連接到顯示監控數據的 dashboard,並通過該連接發送監控數據,如圖 1-1 所示

技術分享圖片

圖 1-1 單個直連指標發布者


不久之後,你決定要分析長期的數據,但在 dashboard 中展銷效果不是很好。於是你建立了一項新服務,可以接收監控、存儲指標並對其進行分析。為了支持此功能,你可以修改應用程序以將指標寫入這兩個系統(實時監控和長期監控)。到目前為止,有多於三的個應用程序生成指標信息,並且它們都利用相同的連接與這兩個服務建立連接。你的同事認為, 對服務器進行輪詢以用於產生報警也是個不錯的想法, 因此你在每個應用服務器添加根據請求提供監控指標的服務。過一段時間後,你將會有更多應用程序需要使用這些服務器去獲取各個指標並將其用於各種目的。現在的架構看起來可能如圖 1-2 所示,連接變得更加復雜難以跟蹤調試

技術分享圖片

圖 1-2 多直連指標發布者


很明顯到這裏已經欠了很多技術債,你決定償還一些回來。你使用一個應用程序接收來自所有應用程序的指標,並提供一個服務器來查詢任何它們需要的系統的指標。這樣就降低了架構的復雜性,類似於圖 1-3。恭喜,你已經構建了一個發布訂閱消息系統!

技術分享圖片

圖 1-3 單指標發布訂閱系統

Individual Queue Systems 單隊列系統

在你與監控這場戰爭中,你的一個同事也一直在對日誌消息進行類似的工作。另一個則是在跟蹤前端頁面上的用戶行為,並向正在從事機器學習的開發人員提供該信息,為管理層創建一些報表。您已經遵循了類似的方法來構建將信息的發布者與該信息的訂閱者解耦的系統。圖 1-4 顯示了這種結構,具有三個獨立的發布/訂閱系統。

技術分享圖片

圖 1-4 多指標發布訂閱系統

這肯定比之前點對點的連接好多了(圖 1-2 所示),但是有很多重復工作。你的公司需要維護多個隊列系統來存儲數據,所有這些都有各自的 bug 和使用限制。你肯定也知道未來將會有更多需要消息隊列的情況。你想要的是一個單一的集中式系統,允許發布通用類型的數據,並且能隨著業務增長而增長。

Kafka 權威指南 第一章第 2 節 初識 Kafka

問題導讀:

1 kafka 中的消息單元: Message 和 batch

2 kafka 中的消息格式: schema

3 kafka 中的存儲模式: Topic 與 partition

4 kafka 中的兩種客戶端: producer 和 consumer

5 kafka 中的服務核心: broker 與 cluster

6 多集群組成的災備

翻譯內容來自《Kafka 權威指南》

什麽是 Kafka

Apache Kafka 是一個基於分布式日誌提交機制設計的發布訂閱系統。數據在 kafka 中持久化,用戶可以隨時按需讀取。另外數據以分布式的方式存儲,提高容錯性,易於擴展。

Message 和 Batches

Kafka 中最基本的數據單元是消息 message,如果使用過數據庫,那麽可以把 Kafka 中的消息理解成數據庫裏的一條行或者一條記錄。消息是由字符數組組成的, kafka 並不關系它內部是什麽,索引消息的具體格式與 Kafka 無關。消息可以有一個可選的 key,這個 key 也是個字符數組,與消息一樣,對於 kafka 也是透明的。 key 用來確定消息寫入分區時,進入哪一個分區。最簡單的處理方式,就是把 key 作為 hash 串,擁有相同 key 的消息,肯定會進入同一個分區。

為了提高效率, Kafka 以批量的方式寫入。一個 batch 就是一組消息的集合, 這一組的數據都會進入同一個 topic 和 partition(這個是根據 producer 的配置來定的) 。每一個消息都進行一次網絡傳輸會很消耗性能,因此把消息收集到一起,再同時處理就高效的多了。當然,這樣會引入更高的延遲以及吞吐量:batch 越大,同一時間處理的消息就越多。 batch 通常都會進行壓縮,這樣在傳輸以及存儲的時候效率都更高一些。

Schemas

對於 Kafa 來說,消息本身是不透明的,這樣就能對消息定義更多容易理解的內容。根據個人的需求不同,消息也會有不同的 schema。比如 JSON 或者XML 都是對人來說很容易閱讀的格式。然後他們在不同的模式版本中間缺乏一些處理的魯棒性和可擴展性。一些 Kafka 的開發者也傾向於使用 Apache Avro(最開始是用來為 Hadoop 做序列化的),提供了緊湊的序列化格式,在發生變化時,也不需要重新生成代碼,具有很強的數據類型和模式,具有很好的 向前擴展與向後兼容的能力。


Kafka 中數據是連續的,在數據在寫入到同時也可能被讀取消費,這樣數據的格式就很重要了。如果數據的格式發生變化,消費的應用也需要做出適當的調整。 如果事先定義好了數據存儲的格式,那麽讀取數據的時候就不需要做特殊的處理了。

Topics 和 Partitions


消息都是以主題 Topic 的方式組織在一起, Topic 也可以理解成傳統數據庫裏的表,或者文件系統裏的一個目錄。一個主題由 broker 上的一個或者多個 Partition 分區組成。在 Kafka 中數據是以 Log 的方式存儲,一個 partition 就是一個單獨的 Log。消息通過追加的方式寫入日誌文件,讀取的時候則是從頭開始按照順序讀取。註意,一個主題通常都是由多個分區組成的,每個分區內部保證消息的順序行,分區之間是不保證順序的。如果你想要 kafka 中的數據按照時間的先後順序進行存儲,那麽可以設置分區數為 1。如下圖所示,一個主題由 4 個分區組成,數據都以追加的方式寫入這四個文件。分區的方式為 Kafka 提供了良好的擴展性,每個分區都可以放在獨立的服務器上,這樣就相當於主題可以在多個機器間水平擴展,相對於單獨的服務器,性能更好。

技術分享圖片

在 Kafka 這種數據系統中經常會提起 stream 流這個詞,通常流被認為是一個主題中的數據,而忽略分區的概念。這就意味著數據流就是從producer 到 consumer。在很多框架中,比如 kafka stream,apache samza,storm 在操作實時數據的時候,都是這樣理解數據流的。這種操作的模式跟離線系統處理數據的方式不同,如 hadoop,是在某一個固定的時間處理一批的數據。

Producer 和 Consumer


Kafka 中主要有兩種使用者: Producer 和 consumer。
Producer 用來創建消息。在發布訂閱系統中,他們也被叫做 Publisher 發布者或 writer 寫作者。通常情況下,消息都會進入特定的主題。默認情況下,生產者不關系消息到底進入哪個分區,它會自動在多個分區間負載均衡。也有的時候,消息會進入特定的一個分區中。一般都是通過消息的 key 使用哈希的方式確定它進入哪一個分區。這就意味著如果所有的消息都給定相同的 key,那麽他們最終會進入同一個分區。生產者也可以使用自定義的分區器,這樣消息可以進入特定的分區。

Consumer 讀取消息。在發布訂閱系統中,也叫做 subscriber 訂閱者或者 reader 閱讀者。消費者訂閱一個或者多個主題,然後按照順序讀取主題中的數據。消費者需要記錄已經讀取到消息的位置,這個位置也被叫做 offset。每個消息在給定的分區中只有唯一固定的 offset。通過存儲最後消費的 Offset,消費者應用在重啟或者停止之後,還可以繼續從之前的位置讀取。保存的機制可以是 zookeeper,或者 kafka 自己。


消費者是以 consumer group 消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個 topic。每個分區在同一時間只能由 group 中的一個消費者讀取,在下圖中,有一個由三個消費者組成的 grouop,有一個消費者讀取主題中的兩個分區,另外兩個分別讀取一個分區。某個消費者讀取某個分區,也可以叫做某個消費者是某個分區的擁有者。


在這種情況下,消費者可以通過水平擴展的方式同時讀取大量的消息。另外,如果一個消費者失敗了,那麽其他的 group 成員會自動負載均衡讀取之前失敗的消費者讀取的分區。

技術分享圖片


Brokers 和 Clusters

單獨的 kafka 服務器也叫做 broker, Broker 從生產者那裏獲取消息,分配 offset,然後提交存儲到磁盤年。他也會提供消費者,讓消費者讀取分區上的消息,並把存儲的消息傳給消費者。依賴於一些精簡資源,單獨的 broker 也可以輕松的支持每秒數千個分區和百萬級的消息。


Kafka 的 broker 支持集群模式,在 Broker 組成的集群中,有一個節點也被叫做控制器(是在活躍的節點中自動選擇的)。這個 controller 控制器負責管理整個集群的操作,包括分區的分配、失敗節點的檢測等。一個 partition 只能出現在一個 broker 節點上,並且這個 Broker 也被叫做分區的 leader。一個分區可以分配多個 Broker,這樣可以做到多個機器之間備份的效果。這種多機備份在其中一個 broker 失敗的時候,可以自動選舉出其他的 broker 提供服務。然而,producer 和 consumer 都必須連接 leader 才能正常工作。

技術分享圖片

Kafka 的一個重要特性就是支持數據的過期刪除,數據可以在 Broker 上保留一段時間。 Kafka 的 broker 支持針對 topic 設置保存的機制,可以按照大小配置也可以按照時間配置。一旦達到其中的一個限制,可能是時間過期也可能是大小超過配置的數值,那麽這部分的數據都會被清除掉。每個 topic 都可以配置它自己的過期配置,因此消息可以按照業務的需要進行持久化保留。比如,一個數據追蹤分析的 topic 可以保留幾天時間,一些應用的指標信息則只需要保存幾個小時。 topic 支持日誌數據的壓縮,這樣 kafka 僅僅會保留最後一條日誌生成的 key。這在修改日誌類型的時候會非常有用。

Multiple Cluster

隨著 Kafka 部署環境的演變,有時候需要莉利用多集群的優勢。使用多集群的原因如下:
1 不同類型數據的分離
2 安全隔離
3 多數據中心(災備)
在使用多數據中心的時候,需要很清楚的理解消息是如何在她們之間傳遞的。一般情況下,用戶可以在多個對外提供服務的網址,產生一些前端數據,然後利用 kafka 把他們統一的匯總到一起,進行分析監控告警。這種備份的機制一般都是應用於單個集群,而不是多集群。
Kafka 項目提供了一個叫做 mirro maker 的工具,它支持多機房的數據傳輸。它其實就是一個基於 queu 連接的 consumer 和 producer。消息從 kafka 中消費,然後傳輸給另一個集群的 kafka。如下圖所示,就是使用 mirror maker 的一個例子,消息在兩個集群的本地聚合,然後再傳輸給另一個集群進行分析。由於kafka 設計的精簡,可以在多機房中實現這種簡單的管道處理機制。

技術分享圖片

kafka 權威指南 第一章第 3 節 為什麽選擇 kafka


為什麽選擇 Kafka

對於發布-訂閱消息系統有很多選擇,是什麽促使 Apache Kafka 是一個很好的選擇呢?

多個生產者

無論 kafka 多個生產者的客戶端正在使用很多 topic 還是同一個 topic , Kafka 都能夠無縫處理好這些生產者。這使得 kafka 成為一個從多個前端系統聚合數據,然後提供一致的數據格式的理想系統。例如,一個通過多個微服務向用戶提供內容的站點, 可以為統計 page view 而只設立一個 topic, 所有的服務將 page view 以統一的格式寫入這個 topic. 消費程序能夠以統一的數據格式來接收 page view 數據, 而不需要去協調多個生產者流.

多個消費者

除了多個生產者之外, kafka 也被設計為多個消費者去讀取任意的單個消息流而不相互影響;而其他的很多消息隊列系統,一旦一個消息被一個客戶端消費,那麽這個消息就不能被其他客戶端消費,這是 kafka 與其他隊列不同的地方;同時多個 kafka 消費者也可以選擇作為一個組的一部分,來分擔一個消息流,確保這整個組,這個消息只被消費一次。

基於硬盤的消息保存


Kafka 不僅能夠處理多個消費者,而且能夠持久的保存消息這也意味著消費者不一定需要實時的處理數據。消息將按照持久化配置規則存儲在硬盤上。這個可以根據每個 topic 進行設置,允許根據不同的消費者的需求不同 設置不同消息流的保存時間不同, 持久化保存意味著一旦消費者來不及消費或者突然出現流量高峰, 而不會有丟失數據的風險.同樣也意味著消息可以由 consumer 來負責管理, 比如消費消息掉線了一段時間,不需要擔心消息會在 producer 上累積或者消息丟失, consumer 能夠從上次停止的地方繼續消費。

可擴展性

Kafka 最開始設計的時候就把靈活擴展考慮到裏面,使其能夠處理任意數量的數據;用戶剛開始可以用一臺進行驗證其相關的理念,然後將其擴展成小的三臺 broker 的開發集群,隨著數據的增加,甚至擴展為數十臺,上百臺規模的大集群。擴展可以在集群正常運行的時候進行,對於整個系統的運作沒有影響;這也就意味著,對於很多臺 broker 的集群,如果一臺 broker 有故障,不影響為 client 提供服務.集群如果要同時容忍更多的故障的話, 可以配置更高的 replication factors. Replication 在第 6 章中會詳細探討.

高性能


上面的這些特性使得 Apache Kafka 成為一個能夠在高負載的情況下表現出優越性能的發布-訂閱消息系統。 Producer, consumer 和 broker 都能在大數據流的情況下輕松的擴展. 擴展過程能夠在依然提供從生產到消費亞秒級服務的情況下完成.

kafka 權威指南 第一章第 4 節 【中文版】


問題導讀
1.kafka 在大數據生態系統中的角色是什麽?
2.kafka 有哪些使用場景?

相關內容:
kafka 權威指南 第一章第 1 節 【中文版】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21648about
Kafka 權威指南 —— 第一章第 2 節 初識 Kafka
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21652
kafka 權威指南 第一章第 4 節 【中文版】
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21681
kafka 權威指南- 第一章第 5 節 開始入門 kafka
http://www.aboutyun.com/forum.php?mod=viewthread&tid=21683

The Data Ecosystem 數據生態系統

許多的應用參與到我們所構建的處理數據的環境中。 我們定義了輸入–產生數據或者引入數據到系統應用。 我們定義了輸出–統計指標, 報告, 或者其他數據產品。 我們創造了回路, 一些組件從系統中讀取數據,進行一些處理,然後將其回寫到數據基礎設施中, 以便後續其他地方處理。無數的內容,大小,用途不同的數據通過這套流程進行處理。
如圖 1-9 所示, Apache Kafka 為這個數據生態系統提供了循環系統。 Kafka 傳輸來自各個基礎設施的消息, 為所有的客戶端提供一致的接口。 因為引入了消息隊列模式, producer 和 consumer 就不再耦合, 之間也沒有任何的直接連接。 組件能夠隨著業務的增加和融合而添加或移除, producer 不需要關心誰在使用數據, 有多少 consumer 在使用數據。

技術分享圖片 圖 1-9 大數據生態系統


Use Cases 用例 Activity Tracking 行為追蹤

Kafka 最初的用途是用戶行為追蹤。 無論用戶在前端進行什麽交互操作都會產生消息以及行為記錄。 這寫信息可以是被動信息, 比如瀏覽頁面和點擊的追蹤,也可以是更加復雜的行為, 比如為用戶資料添加信息。 這些 message 被發布到一個或多個 topic, 隨後被後端的消費者消費。以這種方式, 我們生成報表, 為機器學習系統提供數據源, 更新搜索結果, 以及無數其他的用途。

Messaging 消息傳輸

kafka 的另外一個基本用途就是消息傳輸。 當一個應用把需要發送給用戶的通知(例如郵件消息)發送給用戶。 這些組件可以生產消息, 而不需要關註消息格式和消息將如何被發送。 一個通用的應用能夠讀取所有需要被發送的消息, 並且執行格式化, 選擇如何發送它們。 通過使用一個通用組件, 不僅僅是減少了不同系統中的重復功能, 而且還能做一些有意思的轉換功能, 比如聚集多條消息一次發送。

Metrics and Logging 指標和日誌

Kafka 同樣是理想的處理應用和系統指標、日誌的工具。可以用於多個生產者生產同一種類型的消息。應用定期發送它們的操作指標數據到 kafka 的 topic 中,這些數據可以被用於系統監控和報警。 同樣也可以被用於像 hadoop 這樣的系統進行長周期的分析, 比如項目的年增長。日誌消息能夠以同樣的方式進行發送,能夠被路由至專門的日誌分析系統,比如 Elasticsearch 或者安全分析應用。 kafka 提供了便利, 當目標系統需要改變時( 比我們要升級日誌存儲系統), 無需修改前端應用以及日誌的聚合方式。

Commit Log 提交日誌

Kafka 是基於提交日誌的思想來構建的, 自然可以以這種方式來使用 kafka。 數據庫更改可以發送到 kafka, 應用可以監控這個流來接收實時更新。 這個 changelog 流同樣也可以用於復制數據庫更新到遠程系統。或者合並多個應用的更新到單一的數據庫視圖。持久化的保存也為 changelog 提供了緩存, 意味著如果消費應用失敗的話, changelog 可以進行事件重放。另外,日誌壓縮的 topic 還能為日誌提供更長的保留時間, 如果對於每個 key 只保留最新一個更新的話。

Stream Processing 流處理

另一方面,它提供了許多類型的應用程序流處理。可以認為是提供了 Hadoop 的 map/reduce 處理類似的功能,但是 Hadoop 通常依賴於一個大的時間窗口的聚集數據, 幾小時或幾天, 然後以批處理的方式處理數據, 而流處理以實時的方式處理流。流處理框架允許用戶寫一個小應用來對 kafka message 進行操作, 執行諸如計數, 對 message 進行分區讓其他應用更有效的處理, 或者轉換來自多個源的數據。 Stream 處理學習在第 10 章中單獨講解。

kafka 權威指南 第一章第 5 節 Kafka 的起源

問題導讀
1 為什麽要創造 Kafka 這個項目?
2 為什麽起 Kafka 這個名字?

Kafka 最初是 LinkedIn 用來解決數據傳輸問題的,它可以以高性能的方式處理大量來自不同數據類型的數據,並提供實時處理用戶活動、系統指標等數據分析的工作。
Data really powers everything that we do. —— jeff weiner,LinkedIn 的 CEO

LinkedIn 遇到的問題

在本章開頭就說過, LInkedIn 最初需要收集各個系統和應用的指標數據進行數據分析,剛開始采用的方式是使用自定義的收集器以及開源框架來存儲和提供數據服務。收集的信息數量龐大、內容復雜,除了一些傳統的指標,比如 CPU 的使用情況、應用的性能分析等等;還需要收集很多與業務相關聯的復雜的監控信息以及提供給某個用戶請求的信息。這個系統也遇到了很多的問題,比如指標的采集采用輪詢的模式,內部傳輸大量的指標信息,沒有提供系統內部的自服務。系統內部高度耦合,需要人工來設置很多簡單的任務,並且系統間各種指標的命名也大不相同。

同時,有一個系統在用戶創建活動的時候會產生一些信息,這些信息通過一個前置的 http 服務,以 XML 的數據格式進行傳輸。這些數據需要先經過解析處理,然後用來做離線分析。這個系統本身很不穩定,經常會崩潰出錯。 XML 的格式還不是很通用,解析起來非常復雜。如果改變了數據的格式,那麽從數據的傳輸到後面的數據解析處理都需要進行調整。系統經常會因為數據格式的調整而引發崩潰。定位這個問題,需要很長時間,因此根本無法用來做實時服務。

監控和用戶活動追蹤都無法直接使用該服務,監控的服務太脆弱,數據的格式也不自由,並且輪詢的這種機制效果也不好。追蹤服務系統對於用戶的指標來說太容易崩潰、這種批量的定時任務也無法做到實時處理和告警。並且數據之間的關聯也不容易做,比如用戶活動與系統應用性能之間的關聯分析,但是這種分析還是很有用的。比如用戶的操作發生了錯誤,如果采用定時的批處理,那麽有可能需要幾個小時之後才能定位到問題。

最開始, LinkedIn 想要在現有的開源解決方案中,尋找一種可以支持大數據的實時服務以及支持水平擴展的方案。最初系統使用的是 ActiveMQ,但是它不支持水平擴展。並且它內部有很多 BUG, LinkedIn 為此也付出了很多的心血來解決這些問題。如果服務器發生崩潰,也會阻塞客戶端的請求,從而影響應用服務器的服務性能。這種處理方式主要是為了推動管道處理方式結構。


Birth of Kafka Kafka 的誕生

LinkedIn 由 Jay Kreps 主導,他是 Voldemort(鍵值存儲系統)的主要開發者。最初的團隊還包括 Neha Narkhede 以及 Jun Rao。他們一起參與開發了這個即需要實時處理又需要支持水平擴展的消息系統。
這個系統主要的目標是:

  • 基於 Push-pull 推拉模式解耦生產者與消費者的關系
  • 提供消息的持久化,並且支持多個消費者消費消息
  • 消息的吞吐量性能優化
  • 允許系統在數據增長的情況下進行水平擴展

最終就設計出了這個具有經典的消息系統接口的發布訂閱系統,但是在存儲上又很像一款日誌數據聚合系統。結合 Apache Avro 提供的消息序列化的特性,系統支持每天處理十億級的系統指標和用戶行為數據。在 2015 年 8 月份的時候, LinkedIn 就有萬億級的消息數據,沒天都要處理 1PB 的數據量。

Open Source 開源


Kafka 在 2010 年開源貢獻給 GitHub,從一開始它就吸引了大量的使用這的註意,並且在 2011 年成為 Apache 的一個項目,並且 2012 年成為 Apache 的孵化項目。從那時起,大量來自 linkedIn 的員工以及社區的貢獻者共同不斷改善 Kafka.目前 Kafka 在很多公司都有很多的應用場景。在 2014 年, Jay Kreps, Neha Narkhede, Jun Rap 離開了 LinkedIn,並成立了 Confluent 公司,專門提供 kafka 相關的技術支持。這兩家公司以及不斷加入的貢獻者,持續不斷的改進與支撐 kafka,讓它成為大數據管道處理的首選。

The Name 命名


Kafka 經常提及的一個問題就是,為什麽要取這個名字? Jay Kreps 的回答是這樣的:

“ 我覺得既然 Kafka 是一個支持讀寫的消息系統,那不如直接用一個作家的名字來命名。我在大學學習過很多的文學課程,特別喜歡 Franz Kafka,因此就給這個開源項目起了這個很響亮的名字。但其實他們本身並沒有很直接的關系。 ”

kafka 權威指南- 第一章第 6 節 開始入門 kafka

我們知道 kafka 通用術語,下面我們繼續,設置 kafka,和構建數據管道。下一章,探索 kafka 的安裝和配置。包括:運行在合適的硬件及生產時需要考慮的事情。


kafka 權威指南 第二章第 1 節:安裝 Kafka 【中文版】


問題導讀
1.安裝 Kafka 的步驟有哪些,需要哪些準備?
2.kafka 配置完成後 如何驗證 Kafka 是否正常運行?

第二章 Installing Kafka


本章介紹怎樣運行 Apache Kafka,包括存儲 kafka 源數據的 zookeeper 的安裝。
同樣的,也會覆蓋 kakfa 部署所需的基本配置項以及選擇運行 broker 的硬件的標準。最後,介紹怎樣部署 kafka 集群以及在生成環境上的一些註意點。

First Things First


Choosing an Operating System


Apache Kafka 是一個 Java 程序,可以運行在多種操作系統上,包括: Windows, OS X, Linux 等。本章安裝使用 kafka 的步驟都是在使用最廣泛的 Linux 系統上執行的。 Linux 也是 Kafka 部署建議的操作系統。 Kafka 在 Windows 和 OS X 上的安裝請點擊附錄 A。

Installing Java


無論是安裝 zookeeper 還是 kafka,都需要一個 Java 運行環境。 Java 版本可能是 Java8,也可以是系統提供的版本,或者是直接從 java.com 下載。雖然zookeeper 和 kafka 在有 java runtime 的版本上就可以運行,但是有完整的 jdk 在開發工具和應用時會更便利。因此,以下的安裝步驟假設你已經在
/usr/java/jdk1.8.0_51 下安裝了 jdk8。

Installing Zookeeper

Apache Kafka 利用 zookeeper 存儲關於 kafka 集群的元數據信息以及消費者信息。因為分布式版 kakfka 自帶啟動 zookeeper 服務器的腳本,因此並不需要安裝分布式版 zookeeper。

技術分享圖片
圖 2-1 Kafka 和 Zookeeper

Kafka 和 3.4.6 版 zookeeper 穩定結合已廣泛測試,從http://mirror.cc.columbia.edu/pu ... keeper-3.4.6.tar.gz下載 3.4.6 版 zookeeper。

Standalone Server


下例是 zookeeper 安裝的基本配置,路徑為/usr/local/
Zookeeper,數據存儲在/var/lib/zookeeper

? #
  tar -zxf zookeeper-3.4.6.tar.gz
  # mv zookeeper-3.4.6 /usr/local/zookeeper
  # mkdir -p /var/lib/zookeeper
  # cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
 > tickTime=2000
 > dataDir=/var/lib/zookeeper
 > clientPort=2181
 > EOF
  # export JAVA_HOME=/usr/java/jdk1.8.0_51
  # /usr/local/zookeeper/bin/zkServer.sh start
  JMX enabled by default
  Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
  Starting zookeeper ... STARTED
  #

現在你可以驗證 zookeeper 是否在單機模式下能夠連接到客戶端端口,發送四個字符‘srvr’about 雲大數據雲技術學習分享平臺
# telnet localhost 2181
  Trying ::1...
  Connected to localhost.
  Escape character is ^].
  srvr
  Zookeeper version: 3.4.6-1569965, built on 02/20/2014
  09:09 GMT
  Latency min/avg/max: 0/0/0
  Received: 1
  Sent: 0
  Connections: 1
  Outstanding: 0
  Zxid: 0x0
  Mode: standalone
  Node count: 4
  Connection closed by foreign host.
  #

Zookeeper Ensemble

一個 zookeeper 集群被叫做"ensemble”,因為一致性協議的使用,建議一個 zookeeper 集群使用奇數個服務器(例如: 3,5,等),每個都作為集群的一個 quorum 用來返回發往 zookeeper 的請求。這意味著 3 節點的集群,當一個節點失效後仍可以運行。 5 個節點的集群,允許 2 個節點失效。
Sizing Your Zookeeper Ensemble
假設 zookeeper 運行在 5 個節點上,為了使配置變成包括節點轉換的套件。如果你的套件不能夠容許多用於 1 個節點的宕機,這會給做維護工作引入額外的風險。同樣不建議一個 zookeeper 套件運行在多余 7 個的節點上,因為集群性能會因一致性的特征而降低。
一個 zookeeper 套件中服務器的配置必須含有一項共通的配置:服務器列表,並且每個服務器都需要在數據目錄下有一個 myid 文件用來標識那個服務器的的id。如果服務器的主機名如下例: zoo1.example.com, zoo2.example.com, and zoo3.example.com,則配置文件可配置為:

tickTime=2000
  dataDir=/var/lib/zookeeper
  clientPort=2181
  initLimit=20
  syncLimit=5
  server.1=zoo1.example.com:2888:3888
  server.2=zoo2.example.com:2888:3888
  server.3=zoo3.example.com:2888:3888

在這個配置中, initLimit 表示 follower 經過多長時間可以連接上 leader。 syncLimit 表示 follower 與 leader 不同步的時間有多長。所有的配置都是以
tickTime 為單位的整數倍,例如這裏 initLimit 為 20 * 2000 ms 或 40 s。上邊的配置也列出了在套件中的每個服務器。服務器是以 server.X=hostname:peer
Port:leader Port 這樣格式話的方式定義的,個各參數釋義如下:
X:服務器 Id。這個值必須是整數,但不必從 0 開始或順序的。
Hostname:服務器的主機名或服務器 IP 地址。
peerPort:服務器間進行交互的 TCP 端口。
leaderPort:leader 選舉時的 TCP 端口。about 雲大數據雲技術學習分享平臺
客戶端只要能夠通過 clientPort 連接到套件即可,但是套件中的成員必須兩兩之間能夠通過以上 3 個端口互通。
除了分享的配置文件外,每個服務器在 data Dir 目錄下必須有一個 myid 文件。這個文件必須包含有與配置文件一致的服務器 id 號。以上步驟完成後,套件中
的服務器就可以啟動並兩兩互通。
Installing a Kafka Broker
Java 和 Zookeeper 配置好後,你就可以準備安裝 Apache Kafka。當前版本 kafka 可在http://kafka.apache.org/downloads.html下載。到發稿為止, Kafka 版本
是運行在 Scala2.11.0 上的 0.9.0.1。
下例是 kafka 安裝在/usr/local/kafka,利用之前配置好的 zookeeper,消息的日誌切片會存儲在/tmp/kafka-logs:
# tar -zxf kafka_2.11-0.9.0.1.tgz
  # mv kafka_2.11-0.9.0.1 /usr/local/kafka
  # mkdir /tmp/kafka-logs
  # export JAVA_HOME=/usr/java/jdk1.8.0_51
  # /usr/local/kafka/bin/kafka-server-start.sh -daemon
  /usr/local/kafka/config/server.properties
  #

Kafka 啟動後,我們可以通過一下簡單的操作:創建 test topic,生產一些消息,消費一些消息來檢測 Kafka 是否正常運行。
創建和驗證一個 topic:
# /usr/local/kafka/bin/kafka-topics.sh --create --
zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
 Created topic "test".
 # /usr/local/kafka/bin/kafka-topics.sh --zookeeper
 localhost:2181
--describe --topic test
 Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
  Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  #

生產消息到測試 topic:
# /usr/local/kafka/bin/kafka-console-producer.sh --
broker-list
  localhost:9092 --topic test
  Test Message 1
  Test Message 2
  ^D
  #

從測試 topic 消費消息:
# /usr/local/kafka/bin/kafka-console-consumer.sh --
zookeeper
 localhost:2181 --topic test --from-beginningabout 雲大數據雲技術學習分享平臺
Test Message 1
 Test Message 2
 ^C
 Consumed 2 messages
  #

kafka 權威指南 第二章第 2 節:安裝 kafka Broker


問題導讀
1.kafka 安裝需要做哪些操作?
2.如何驗證 kafka 是否安裝成功?
3./tmp/kafka-logs 的作用是什麽?
Java 和 zookeeper 配置完畢,你可以安裝 kafka.軟件下載http://kafka.apache.org/downloads.html。這裏下載的版本為 0.9.0.1, Scala 版本 2.11.0.下面的例
子安裝 kafka 在 /usr/local/kafka,開始之前配置使用 zookeeper server 和在/tmp/kafka-logs 路徑下存儲消息日誌.

? 1 2 3 4 5 6 7
 # tar -zxf kafka_2.11-0.9.0.1.tgz
 # mv kafka_2.11-0.9.0.1 /usr/local/kafka
 # mkdir /tmp/kafka-logs
 # export JAVA_HOME=/usr/java/jdk1.8.0_51
 # /usr/local/kafka/bin/kafka-server-start.sh -daemon
  /usr/local/kafka/config/server.properties
  #

一旦 Kafka broker 啟動,我們可以創建 test topic,來驗證是否工作。
創建驗證 topic:
[Bash shell] 純文本查看 復制代碼
? 1 2
# /usr/local/kafka/bin/kafka-topics.sh --create --
zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
創建 topic "test".
[Bash shell] 純文本查看 復制代碼
?about 雲大數據雲技術學習分享平臺
1 2 3 4 5
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper
localhost:2181
--describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1
Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
#
在 topic 下面,生產者產生消息
[Bash shell] 純文本查看 復制代碼
? 1 2 3 4 5 6
# /usr/local/kafka/bin/kafka-console-producer.sh --
broker-list
localhost:9092 --topic test
Test Message 1
Test Message 2
^D
#
在 test topic 下面消費消息
[Bash shell] 純文本查看 復制代碼
? 1 2 3 4 5 6 7
# /usr/local/kafka/bin/kafka-console-consumer.sh --
zookeeper
localhost:2181 --topic test --from-beginning
Test Message 1
Test Message 2
^C
Consumed 2 messages
#

kafka 權威指南 第二章第 3 節 Broker 的配置

問題導讀:
1 Broker 有哪些基本的配置?about 雲大數據雲技術學習分享平臺
2 Broker 有哪些需要了解的配置?


Borker 配置


前面章節中的例子,用來作為單個節點的服務器示例是足夠的,但是如果想要把它應用到生產環境,就遠遠不夠了。在 Kafka 中有很多參數可以控制它的運行和工作。大部分的選項都可以忽略直接使用默認值就好,遇到一些特殊的情況你可以再考慮使用它們。

Broker 的一般配置

有很多參數在部署集群模式時需要引起重視,這些參數都是 broker 最基本的配置,很多參數都需要依據集群的 broker 情況而變化。

broker.id

每個 kafka 的 broker 都需要有一個整型的唯一標識,這個標識通過 broker.id 來設置。默認的情況下,這個數字是 0,但是它可以設置成任何值。需要註意的是,需要保證集群中這個 id 是唯一的。這個值是可以任意填寫的,並且可以在必要的時候從 broker 集群中刪除。比較好的做法是使用主機名相關的標識來做為 id,比如,你的主機名當中有數字相關的信息,如 hosts1.example.com, host2.example.com,那麽這個數字就可以用來作為 broker.id 的值。

port

默認啟動 kafka 時,監聽的是 TCP 的 9092 端口,端口號可以被任意修改。如果端口號設置為小於 1024,那麽 kafka 需要以 root 身份啟動。但是並不推薦以 root 身份啟動。

zookeeper.connect

這個參數指定了 Zookeeper 所在的地址,它存儲了 broker 的元信息。在前一章節的例子 中, Zookeeper 是運行在本機的 2181 端口上,因此這個值被設置成 localhost:2181。這個值可以通過分號設置多個值,每個值的格式都是 hostname:port/path,其中每個部分的含義如下:
? hostname 是 zookeeper 服務器的主機名或者 ip 地址
? port 是服務器監聽連接的端口號
? /path 是 kafka 在 zookeeper 上的根目錄。如果缺省,會使用根目錄。
如果設置了 chroot,但是它又不存在,那麽 broker 會在啟動的時候直接創建。
PS:為什麽使用 Chroot 路徑
一個普遍認同的最佳實踐就是 kafka 集群使用 chroot 路徑,這樣 zookeeper 可以與其他應用共享使用,而不會有任何沖突。指定多個 zookeeper 服務器的地址也是比較好的做法,這樣當 zookeeper 集群中有節點失敗的時候,還可以正常連接其他的節點。

log.dirs


這個參數用於配置 Kafka 保存數據的位置, Kafka 中所有的消息都會存在這個目錄下。可以通過逗號來指定多個目錄, kafka 會根據最少被使用的原則選擇目錄分配新的 parition。註意 kafka 在分配 parition 的時候選擇的規則不是按照磁盤的空間大小來定的,而是分配的 parition 的個數多小。

num.recovery.thread.per.data.dir


kafka 可以配置一個線程池,線程池的使用場景如下:
? 當正常啟動的時候,開啟每個 parition 的文檔塊 segment
? 當失敗後重啟時,檢查 parition 的文檔塊
? 當關閉 kafka 的時候,清除關閉文檔塊
默認,每個目錄只有一個線程。最好是設置多個線程數,這樣在服務器啟動或者關閉的時候,都可以並行的進行操作。尤其是當非正常停機後,重啟時,如果有大量的分區數,那麽啟動 broker 將會花費大量的時間。註意,這個參數是針對每個目錄的。比如, num.recovery.threads.per.data.dir 設置為 8,如果有 3 個 log.dirs 路徑,那麽一共會有 24 個線程。

auto.create.topics.enable


在下面場景中,按照默認的配置,如果還沒有創建 topic, kafka 會在 broker 上自動創建 topic:
? 當 producer 向一個 topic 中寫入消息時
? 當 cosumer 開始從某個 topic 中讀取數據時
? 當任何的客戶端請求某個 topic 的信息時
在很多場景下,這都會引發莫名其妙的問題。尤其是沒有什麽辦法判斷某個 topic 是否存在,因為任何請求都會創建該 topic。如果你想嚴格的控制 topic 的創建,那麽可以設置 auto.create.topics.enable 為 false。

默認的主題配置 Topic Defaults


kafka 集群在創建 topic 的時候會設置一些默認的配置,這些參數包括分區的個數、消息的容錯機制,這些信息都可以通過管理員工具以 topic 為單位進行配置。
kafka 為我們提供的默認配置,基本也能滿足大多數的應用場景了。
PS:使用 per.topic 進行參數的覆蓋
在之前的版本中,可以通過 log.retention.hours.per.topic, log.retention.bytes.per.topic, log.segment.bytes.per.topic 等覆蓋默認的配置。現在的版本不能這麽用了,必須通過管理員工具進行設置。

num.partitions


這個參數用於配置新創建的 topic 有多少個分區,默認是 1 個。註意 partition 的個數只可以被增加,不能被減少。這就意味著如果想要減少主題的分區數,那麽就需要重新創建 topic。
在第一章中介紹過, kafka 通過分區來對 topic 進行擴展,因此需要使用分區的個數來做負載均衡,如果新增了 broker,那麽就會引發重新負載分配。這並不意味著所有的主題的分區數都需要大於 broker 的數量,因為 kafka 是支持多個主題的,其他的主題會使用其余的 broker。需要註意的是,如果消息的吞吐量很高,那麽可以通過設置一個比較大的分區數,來分攤壓力。

log.retention.ms


這個參數用於配置 kafka 中消息保存的時間,也可以使用 log.retention.hours,默認這個參數是 168 個小時,即一周。另外,還支持 log.retention.minutes 和 log.retention.ms。這三個參數都會控制刪除過期數據的時間,推薦還是使用 log.retention.ms。如果多個同時設置,那麽會選擇最小的那個。
PS:過期時間和最後修改時間
過期時間是通過每個 log 文件的最後修改時間來定的。在正常的集群操作中,這個時間其實就是 log 段文件關閉的時間,它代表了最後一條消息進入這個文件的時間。然而,如果通過管理員工具,在 brokers 之間移動了分區,那麽這個時候會被刷新,就不準確了。這就會導致本該過期刪除的文件,被繼續保留了。

log.retention.bytes


這個參數也是用來配置消息過期的,它會應用到每個分區,比如,你有一個主題,有 8 個分區,並且設置了 log.retention.bytes 為 1G,那麽這個主題總共可以保留 8G 的數據。註意,所有的過期配置都會應用到 patition 粒度,而不是主題粒度。這也意味著,如果增加了主題的分區數,那麽主題所能保留的數據也就隨之增加了。
PS:通過大小和時間配置數據過期
如果設置了 log.retention.bytes 和 log.retention.ms(或者其他過期時間的配置),只要滿足一個條件,消息就會被刪除。比如,設置 log.retention.ms 是 86400000(一天),並且 log.retention.bytes 是 1000000000(1G),那麽只要修改時間大於一天或者數據量大於 1G,這部分數據都會被刪除。

log.segment.bytes


這個參數用來控制 log 段文件的大小,而不是消息的大小。在 kafka 中,所有的消息都會進入 broker,然後以追加的方式追加到分區當前最新的 segment 段文
件中。一旦這個段文件到達了 log.segment.bytes 設置的大小,比如默認的 1G,這個段文件就會被關閉,然後創建一個新的。一旦這個文件被關閉,就可以理
解成這個文件已經過期了。這個參數設置的越小,那麽關閉文件創建文件的操作就會越頻繁,這樣也會造成大量的磁盤讀寫的開銷。
通過生產者發送過來的消息的情況可以判斷這個值的大小。比如,主題每天接收 100M 的消息,並且 log.segment.bytes 為默認設置,那麽 10 天後,這個段文
件才會被填滿。由於段文件在沒有關閉的時候,是不能刪除的, log.retention.ms 又是默認的設置,那麽這個消息將會在 17 天後,才過期刪除。因為 10 天後,段文件才關閉。再過 7 天,這個文件才算真正過期,才能被清除。
PS:根據時間戳追蹤 offset
段文件的大小也會影響到消息消費 offset 的操作,因為讀取某一時間的 offset 時, kafka 會尋找關閉時間晚於 offset 時間的那個段文件。然後返回 offset 所在的段文件的第一個消息的 offset,然後按照偏移值查詢目標的消息。因此越小的段文件,通過時間戳消費 offset 的時候就會越精確。

log.segment.ms

這個參數也可以控制段文件關閉的時間,它定義了經過多長時間段文件會被關閉。跟 log.retention.bytes 和 log.retention.ms 類似, log.segment.ms 和 log.segment.bytes 也不是互斥的。 kafka 會在任何一個條件滿足時,關閉段文件。默認情況下,是不會設置 Log.segment.ms 的,也就意味著只會通過段文件的大小來關閉文件。
PS:基於時間關閉段文件的磁盤性能需求。當時使用基於時間的段文件限制,對磁盤的要求會很高。這是因為,一般情況下如果段文件大小這個條件不滿足,會按照時間限制來關閉文件,此時如果分區數很多,主題很多,將會有大量的段文件同時關閉,同時創建。

message.max.bytes


這個參數用於限制生產者消息的大小,默認是 1000000,也就是 1M。生產者在發送消息給 broker 的時候,如果出錯,會嘗試重發;但是如果是因為大小的原因,那生產者是不會重發的。另外, broker 上的消息可以進行壓縮,這個參數可以使壓縮後的大小,這樣能多存儲很多消息。
需要註意的是,允許發送更大的消息會對性能有很大影響。更大的消息,就意味著 broker 在處理網絡連接的時候需要更長的時間,它也會增加磁盤的寫操作壓力,影響 IO 吞吐量。
PS:配合消息大小的設置
消息大小的參數 message.max.bytes 一般都和消費者的參數 fetch.message.max.bytes 搭配使用。如果 fetch.message.max.bytes 小於 message.max.bytes,那麽當消費者遇到很大的消息時,將會無法消費這些消息。同理,在配置 cluster 時 replica.fetch.max.bytes 也是一樣的道理。

kafka 權威指南 第二章第 4 節 硬件選擇

問題導讀:
1 Kafka 的對於硬件環境有什麽要求?
2 Kafka 如何選擇硬件以調整 Kafka 的性能?about 雲大數據雲技術學習分享平臺

硬件選擇


kafka 的 broker 的硬件選擇可以說是一門藝術。 Kafka 本身不需要太特殊的硬件,它可以運行在任何機器上。但是一旦考慮到性能,就有很多因素需要考慮了,比如:磁盤的吞吐和容量,內存,網絡以及 CPU。一旦你知道你的環境哪種性能是瓶頸,那麽你就可以進行相應的硬件升級了。

磁盤吞吐


Producer 客戶端生產消息的性能主要取決於 broker 磁盤寫入的速度。 Kafka 的消息在產生的時候在本地存儲,並且大多數的客戶端都會一直等到

broker 確認


寫入成功後,才認為消息提交成功。這就意味著越快的磁盤寫入就會帶來越小的延遲。
所以在磁盤的選擇上,可以選擇 HDD 硬盤或者是 SSD 固態硬盤。固態硬盤在讀寫的時候性能都更高,但是普通的機械硬盤也更經濟實惠。你也可以通過設置
多個存儲路徑或者 RAID,提高硬盤的使用率。另外,驅動也會影響消息的吞吐,比如 SAS 或者 SATA。

磁盤容量

容量是存儲的另一關鍵指標,你想要保留多長時間或者多少的數據,就決定了你的磁盤容量至少要滿足多少。如果 broker 每天都有 1T 的數據量,消息的保存時間為 7 天,那麽你至少需要 7T 的磁盤空間。當然也需要考慮 10%的空間用來存儲其他的文件,以及用戶緩存的空間。存儲容量也是衡量 Kafka 集群規模的標誌,集群的總容量可以通過主題切分多個分區來進行就擴展,這樣多個 broker 可以共同分擔存儲壓力。

內存

除了磁盤性能,內存也是一個很重要的指標。磁盤的性能會影響生產者產生消息的效率,內存效率也會有很大的影響。因為正常情況下 kafka 消費者的操作都是讀取分區的末尾消息,它不會跟生產者產生消息的時間差距很大。在這種情況下,消息一般都是直接緩存在內容,消費者直接從內存中消費數據,這樣效率更快一些。
Kafka 本身不需要為 JVM 分配很大的內容。對於 5G 的堆內存每秒處理 XM 的數據也是沒問題的。 kafka 使用頁緩存也是很有好處的,這也是為什麽推薦 kafka 不要與其他的應用部署在一個服務器上,因為這樣會幹擾頁緩存的效率,會產生很多臟頁。

網絡


Kafka 會占用很大的網絡帶寬,這也是一個很重要的指標。在多個消費者的時候,也需要考慮 kafka 在傳輸和讀取上的網絡壓力。一個生產者每秒鐘向一個主題寫入 1M 的數據,但是所有的消費者都會讀取這部分的數據。其他的操作,比如幾區你的備份、鏡像等等都會增加網絡壓力。 在網絡情況不好的時候,集群的備份會有延遲,此時整個集群也會變得脆弱。

CPU

CPU 計算的能力,相對於磁盤內存來說,不是那麽重要,但是它也會影響 Broker 的性能。因為一般來說生產者的消息都會在 broker 進行壓縮存儲,以此來節省網絡帶寬以及磁盤空間。這需要 kafka 先解壓縮,然後按照一定的規則再組織消息,最終再壓縮存儲到磁盤。這也是 kafka 大部分需要計算性能工作的地方,但是這不是選擇硬件標準的主要因素。

Kafka學習(一)kafka指南(about雲翻譯)