【Kafka】《Kafka權威指南》——寫資料
不管是把 Kafka 作為訊息佇列、訊息、匯流排還是資料儲存平臺來使用 ,總是需要有一個可以往 Kafka 寫入資料的生產者和一個可以從 Kafka讀取資料的消費者,或者一個兼具兩種角 色的應用程式。
例如,在一個信用卡事務處理系統裡,有一個客戶端應用程式,它可能是一個線上商店, 每當有支付行為發生時,它負責把事務傳送到 Kafka上。另一個應用程式根據規則引擎檢 查這個事務,決定是批准還是拒絕。 批准或拒絕的響應訊息被寫回 Kafka,然後傳送給發起事務的線上商店。第三個應用程式從 Kafka上讀取事務和稽核狀態,把它們儲存到資料 庫, 隨後分析師可以對這些結果進行分析,或許還能借此改進規則引擎 。
開發者們可以使用 Kafka 內建的客戶端 API開發 Kafka應用程式。
在這一章,我們將從 Kafra生產者的設計和元件講起,學習如何使用 Kafka生產者。我們將展示如何建立 KafkaProducer和 ProducerRecords物件、如何將記錄傳送給 Kafka,以及如何處理從 Kafka 返回的錯誤,然後介紹用幹控制生產者行為的重要配置選項,最後深入 探討如何使用不同的分割槽方法和序列化器,以及如何自定義序列化器和分割槽器 。
在下一章,我們將會介紹 Kafra的悄費者客戶端,以及如何從 Kafka讀取訊息。
生產者概覽
一個應用程式在很多情況下需要往 Kafka 寫入訊息 : 記錄使用者的活動(用於審計和分析 )、 記錄度量指標、儲存日誌、訊息、記錄智慧家電的資訊、與其他應用程式進行非同步通訊、 緩衝即將寫入到資料庫的資料,等等。
多樣的使用場景意味著多樣的需求:是否每個訊息都很重要?是否允許丟失 一 小部分訊息?偶爾出現重複訊息是否可以接受?是否有嚴格的延遲和吞吐量要求?
在之前提到的信用卡事務處理系統裡,訊息丟失或訊息重複是不允許的,可以接受的延遲最大為 500ms,對吞吐量要求較高,我們希望每秒鐘可以處理一百萬個訊息。
儲存網站的點選資訊是另 一種使用場景。在這個場景裡,允許丟失少量的訊息或出現少量 的訊息重複,延遲可以高一些,只要不影響使用者體驗就行。換句話說,只要使用者點選連結 後可以馬上載入頁面,那麼我們並不介意訊息要在幾秒鐘之後才能到達 Kafka 伺服器。 吞 吐量則取決於網站使用者使用網站的頻度。
不同的使用場景對生產者 API 的使用和配置會有直接的影響。
儘管生產者 API 使用起來很簡單 ,但訊息的傳送過程還是有點複雜的。下圖展示 了向Kafka 傳送訊息的主要步驟。
Kafka 生產者元件圖
我們從建立 一個 ProducerRecord 物件開始, ProducerRecord 物件需要包含目標主題和要傳送的內容。我們還可以指定鍵或分割槽。在傳送 ProducerRecord物件時,生產者要先把鍵和 值物件序列化成位元組陣列,這樣它們才能夠在網路上傳輸 。
接下來,資料被傳給分割槽器。如果之前在 ProducerRecord物件裡指定了分割槽,那麼分割槽器就不會再做任何事情,直接把指定的分割槽返回。如果沒有指定分割槽 ,那麼分割槽器會根據 ProducerRecord物件的鍵來選擇一個分割槽 。選好分割槽以後 ,生產者就知道該往哪個主題和分割槽傳送這條記錄了。緊接著,這條記錄被新增到一個記錄批次裡,這個批次裡的所有訊息會被髮送到相同的主題和分割槽上。有一個獨立的執行緒負責把這些記錄批次傳送到相應的 broker 上。
伺服器在收到這些訊息時會返回一個響應。如果訊息成功寫入 Kafka,就返回 一 個 RecordMetaData 物件,它包含了主題和分割槽資訊,以及記錄在分割槽裡的偏移量。如果寫入 失敗, 就會返回 一個錯誤 。生產者在收到錯誤之後會嘗試重新發送訊息,幾次之後如果還是失敗,就返回錯誤資訊。
建立Kafka生產者
要往 Kafka寫入訊息,首先要建立一個生產者物件,井設定一些屬性。
下面的程式碼片段展示瞭如何建立一個新的生產者,這裡只指定了必要的屬性,其他使用預設設定。
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
Kafka生產者有 3個必選的屬性
bootstrap.servers
該屬性指定 broker 的地址清單,地址的格式為 host:port。清單裡不需要包含所有的broker地址,生產者會從給定的 broker裡查詢到其他 broker的資訊。不過建議至少要提供兩個 broker的資訊, 一旦其中一個宕機,生產者仍然能夠連線到叢集上。
key.serializer
broker希望接收到的訊息的鍵和值都是位元組陣列。生產者介面允許使用引數化型別,因此可以把 Java物件作為鍵和值傳送給 broker。這樣的程式碼具有良好的可讀性,不過生產者需要知道如何把這些 Java物件轉換成位元組陣列。 key.serializer必須被設定為一個實現了org.apache.kafka.common.serialization.Serializer介面的類,生產者會使用這個類把鍵物件序列化成位元組陣列。 Kafka 客戶端預設提供了ByteArraySerializer(這個只做很少的事情)、 StringSerializer和 IntegerSerializer,因此,如果你只使用常見的幾種 Java物件型別,那麼就沒必要實現自己的序列化器 。要注意, key.serializer是必須設定的,就算你打算只發送值內容。
value.serializer
與 key.serializer一樣, value.serializer指定的類會將值序列化。如果鍵和值都是字串,可以使用與 key.serializer 一樣的序列化器。如果鍵是整數型別而值是字元扇 , 那麼需要使用不同的序列化器。
傳送訊息主要有3種方式:
1、傳送並忘記( fire-and-forget):我們把訊息傳送給伺服器,但井不關心它是否正常到達。大多數情況下,訊息會正常到達,因為 Kafka是高可用的,而且生產者會自動嘗試重發。不過,使用這種方式有時候也會丟失一些訊息。
2、同步傳送:我們使用send()方怯傳送訊息, 它會返回一個Future物件,呼叫get()方法進行等待, 就可以知道悄息是否傳送成功。
3、非同步傳送:我們呼叫 send() 方怯,並指定一個回撥函式, 伺服器在返回響應時呼叫該函式。
在下面的幾個例子中 , 我們會介紹如何使用上述幾種方式來發送訊息,以及如何處理可能 發生的異常情況。
本章的所有例子都使用單執行緒,但其實生產者是可以使用多執行緒來發送訊息的。剛開始的 時候可以使用單個消費者和單個執行緒。如果需要更高的吞吐量,可以在生產者數量不變的 前提下增加執行緒數量。如果這樣做還不夠 , 可以增加生產者數量。
傳送訊息到Kafka
最簡單的同步傳送訊息方式如下所示 :
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record);
} catch(Exception e) {
e.printStack();
}
生產者的 send() 方住將 ProducerRecord物件作為引數,它需要目標主題的名字和要傳送的鍵和值物件,它們都是字串。鍵和值物件的型別必須與序列化器和生產者物件相匹配。
我們使用生產者的 send() 方越發送 ProducerRecord物件。從生產者的架構圖裡可以看到,訊息先是被放進緩衝區,然後使用單獨的執行緒傳送到伺服器端。 send() 方法會返回一個包含 RecordMetadata 的 Future物件,不過因為我們會忽略返回值,所以無法知道訊息是否傳送成功。如果不關心傳送結果,那麼可以使用這種傳送方式。比如,記錄 Twitter 訊息日誌,或記錄不太重要的應用程式日誌。
我們可以忽略傳送訊息時可能發生的錯誤或在伺服器端可能發生的錯誤,但在傳送訊息之前,生產者還是有可能發生其他的異常。這些異常有可能是 SerializationException (說明序列化訊息失敗)、 BufferExhaustedException 或 TimeoutException (說明緩衝區已滿),又或者是 InterruptException (說明發送執行緒被中斷)。
同步傳送訊息
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record).get();
} catch(Exception e) {
e.printStack();
}
在這裡, producer.send() 方住先返回一個 Future物件,然後呼叫 Future物件的 get() 方法等待 Kafka 響應。如果伺服器返回錯誤, get()方怯會丟擲異常。如果沒有發生錯誤,我們會得到一個 RecordMetadata物件,可以用它獲取訊息的偏移量。如果在傳送資料之前或者在傳送過程中發生了任何錯誤 ,比如 broker返回 了一個不允許重發訊息的異常或者已經超過了重發的次數 ,那麼就會丟擲異常。我們只是簡單地把異常資訊打印出來。
如何處理從Kafka生產者返回的錯誤
KafkaProducer一般會發生兩類錯誤。其中一類是可重試錯誤 ,這類錯誤可以通過重發訊息來解決。比如對於連線錯誤,可以通過再次建立連線來解決,“無主(noleader)” 錯誤則可 以通過重新為分割槽選舉首領來解決。 KafkaProducer可以被配置成自動重試,如果在多次重試後仍無能解決問題,應用程式會收到一個重試異常。另一類錯誤無出通過重試解決 ,比如“訊息太大”異常。對於這類錯誤, KafkaProducer不會進行任何重試,直接丟擲異常。
非同步傳送訊息
假設訊息在應用程式和 Kafka叢集之間一個來回需要 10ms。如果在傳送完每個訊息後都等待迴應,那麼傳送 100個訊息需要 1秒。但如果只發送訊息而不等待響應,那麼傳送100個訊息所需要的時間會少很多。大多數時候,我們並不需要等待響應——儘管 Kafka 會把目標主題、分割槽資訊和訊息的偏移量傳送回來,但對於傳送端的應用程式來說不是必需的。不過在遇到訊息傳送失敗時,我們需要丟擲異常、記錄錯誤日誌,或者把訊息寫入 “錯誤訊息”檔案以便日後分析。
為了在非同步傳送訊息的同時能夠對異常情況進行處理,生產者提供了回撥支援 。下面是使用非同步傳送訊息、回撥的一個例子。
生產者的配置
到目前為止 , 我們只介紹了生產者的幾個必要配置引數——bootstrap.servers API 以及序列化器。
生產者還有很多可配置的引數,在 Kafka文件裡都有說明,它們大部分都有合理的預設值 , 所以沒有必要去修改它們 。不過有幾個引數在記憶體使用、效能和可靠性方面對生產者影響比較大,接下來我們會一一說明。
1. acks
acks 引數指定了必須要有多少個分割槽副本收到訊息,生產者才會認為訊息寫入是成功的。
這個引數對訊息丟失的可能性有重要影響。 該引數有如下選項。 • 如果 acks=0, 生產者在成功寫入悄息之前不會等待任何來自伺服器的響應。也就是說, 如果當中出現了問題 , 導致伺服器沒有收到訊息,那麼生產者就無從得知,訊息也就丟 失了。不過,因為生產者不需要等待伺服器的響應,所以它可以以網路能夠支援的最大 速度傳送訊息,從而達到很高的吞吐量。
• 如果 acks=1,只要叢集的首領節點收到訊息,生產者就會收到 一個來自伺服器的成功 響應。如果訊息無撞到達首領節點(比如首領節點崩憤,新的首領還沒有被選舉出來), 生產者會收到一個錯誤響應,為了避免資料丟失,生產者會重發訊息。不過,如果一個 沒有收到訊息的節點成為新首領,訊息還是會丟失。這個時候的吞吐量取決於使用的是 同步傳送還是非同步傳送。如果讓傳送客戶端等待伺服器的響應(通過呼叫 Future物件 的 get()方法),顯然會增加延遲(在網路上傳輸一個來回的延遲)。如果客戶端使用非同步回撥,延遲問題就可以得到緩解,不過吞吐量還是會受傳送中訊息數量的限制(比如,生 產者在收到伺服器響應之前可以傳送多少個訊息)。
• 如果 acks=all,只有當所有參與複製的節點全部收到訊息時,生產者才會收到一個來自伺服器的成功響應。這種模式是最安全的,它可以保證不止一個伺服器收到訊息,就算有伺服器發生崩潰,整個叢集仍然可以執行(第 5 章將討論更多的細節)。不過,它的延遲比 acks=1時更高,因為我們要等待不只一個伺服器節點接收訊息。
2. buffer.memory
該引數用來設定生產者記憶體緩衝區的大小,生產者用它緩衝要傳送到伺服器的訊息。如果 應用程式傳送訊息的速度超過傳送到伺服器的速度,會導致生產者空間不足。這個時候, send()方法呼叫要麼被阻塞,要麼丟擲異常,取決於如何設定 block.on.buffe.full 引數 (在0.9.0.0版本里被替換成了max.block.ms,表示在丟擲異常之前可以阻塞一段時間)。
3. compression.type
預設情況下,訊息傳送時不會被壓縮。該引數可以設定為 snappy、 gzip 或 lz4,它指定了訊息被髮送給 broker之前使用哪一種壓縮演算法進行壓縮。 snappy 壓縮算怯由 Google巳發明, 它佔用較少 的 CPU,卻能提供較好的效能和相當可觀的壓縮比,如果比較關注效能和網路頻寬,可以使用這種演算法。 gzip壓縮演算法一般會佔用較多的 CPU,但會提供更高的壓縮比,所以如果網路頻寬比較有限,可以使用這種演算法。使用壓縮可以降低網路傳輸開銷和儲存開銷,而這往往是向 Kafka傳送訊息的瓶頸所在。
4. retries
生產者從伺服器收到的錯誤有可能是臨時性的錯誤(比如分割槽找不到首領)。在這種情況下, retries引數的值決定了生產者可以重發訊息的次數,如果達到這個次數,生產者會放棄重試並返回錯誤。預設情況下,生產者會在每次重試之間等待 1OOms,不過可以通過 retries.backoff.ms 引數來改變這個時間間隔。建議在設定重試次數和重試時間間隔之前, 先測試一下恢復一個崩潰節點需要多少時間(比如所有分割槽選舉出首領需要多長時間), 讓總的重試時間比 Kafka叢集從崩潰中恢復的時間長,否則生產者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦怯通過重試來解決(比如“悄息太大”錯誤)。一般情 況下,因為生產者會自動進行重試,所以就沒必要在程式碼邏輯裡處理那些可重試的錯誤。 你只需要處理那些不可重試的錯誤或重試次數超出上限的情況。
5. batch.size
當有多個訊息需要被髮送到同一個分割槽時,生產者會把它們放在放一個批次裡。該引數指定了一個批次可以使用的記憶體大小,按照位元組數計算(而不是訊息個數)。當批次被填滿,批次裡的所有訊息會被髮送出去。不過生產者井不一定都會等到批次被填滿才傳送,半捕 的批次,甚至只包含一個訊息的批次也有可能被髮送。所以就算把批次大小設定得很大, 也不會造成延遲,只是會佔用更多的記憶體而已。但如果設定得太小,因為生產者需要更頻繁地傳送訊息,會增加一些額外的開銷。
6. linger.ms
該引數指定了生產者在傳送批次之前等待更多訊息加入批次的時間。 KafkaProducer 會在批次填滿或 linger.ms達到上限時把批次傳送出去。預設情況下,只要有可用的執行緒, 生產者就會把訊息傳送出去,就算批次裡只有一個訊息。把 linger.ms設定成比0大的數, 讓生產者在傳送批次之前等待一會兒,使更多的訊息加入到這個批次 。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發送更多的訊息,每個訊息的開銷就變小了)。
7. client.id
該引數可以是任意的字串,伺服器會用它來識別訊息的來源,還可以用在日誌和配額指標裡。
8. max.in.flight.requests.per.connection
該引數指定了生產者在收到伺服器晌應之前可以傳送多少個訊息。它的值越高,就會佔用越多的記憶體,不過也會提升吞吐量。 把它設為 1 可以保證訊息是按照發送的順序寫入伺服器的,即使發生了重試。
9. timeout.ms、 request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms指定了生產者在傳送資料時等待伺服器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元資料(比如目標分割槽的首領是誰)時等待伺服器返回響應的時間。如果等待響應超時,那麼生產者要麼重試傳送資料,要麼返回 一個錯誤 (丟擲異常或執行回撥)。timeout.ms 指定了 broker 等待同步副本返回訊息確認的時間,與 asks 的配置相匹配一一如果在指定時間內沒有收到同步副本的確認,那麼 broker就會返回 一個錯誤 。
10. max.block.ms
該引數指定了在呼叫 send() 方法或使用 parttitionFor() 方能獲取元資料時生產者的阻塞 時間。當生產者的傳送緩衝區已捕,或者沒有可用的元資料時,這些方屈就會阻塞。在阻塞時間達到 max.block.ms時,生產者會丟擲超時異常。
11 . max.request.size
該引數用於控制生產者傳送的請求大小。它可以指能傳送的單個訊息的最大值,也可以指單個請求裡所有訊息總的大小。例如,假設這個值為 1MB,那麼可以傳送的單個最大訊息為 1MB,或者生產者可以在單個請求裡傳送一個批次,該批次包含了 1000個訊息,每個訊息大小為 1KB 。另外, broker對可接收的訊息最大值也有自己的限制( message.max.bytes),所以兩邊的配置最好可以匹配,避免生產者傳送的訊息被 broker拒絕 。
12. receive.buffer.bytes 和 send.buffer.bytes
這兩個引數分別指定了 TCP socket接收和傳送資料包的緩衝區大小 。 如果它們被設為 -1 , 就使用作業系統的預設值。如果生產者或消費者與 broker處於不同的資料中心,那麼可以適當增大這些值,因為跨資料中心的網路一般都有比較高的延遲和比較低的頻寬。
順序保證
Kafka可以保證同一個分割槽裡的訊息是有序的。也就是說,如果生產者按照一定的順序傳送訊息, broker就會按照這個順序把它們寫入分割槽,消費者也會按照同樣的順序讀取它們。在某些情況下 , 順序是非常重要的。如果把retries 設為非零整數,同時把 max.in.flight.requests.per.connection 設為比 1大的數,那麼,如果第一個批次訊息寫入失敗,而第二個批次寫入成功, broker會重試寫入第一個批次。如果此時第一個批次也寫入成功,那 麼兩個批次的順序就反過來了。
一般來說,如果某些場景要求訊息是有序的,那麼訊息是否寫入成功也是 很關鍵的,所以不建議把順序是非常重要的。如果把retries 設為 0。可以把 max.in.flight.requests.per.connection設為 1,這樣在生產者嘗試傳送第一批訊息時,就不會有其他的訊息傳送給 broker。不過這樣會嚴重影響生產者的吞吐量 ,所以只有在 對訊息的順序有嚴格要求的情況