1. 程式人生 > 其它 >Java的Kafka生產者如何管理TCP連線

Java的Kafka生產者如何管理TCP連線

1.什麼時候建立TCP連線?

在建立 KafkaProducer 例項時,生產者應用會在後臺建立並啟動一個名為 Sender 的執行緒,該 Sender 執行緒開始執行時首先會建立與 Broker 的連線。

而且由於沒呼叫send,其實不知道給哪個broker傳送,所以是連線 bootstrap.servers 引數指定的所有 Broker 。

所以實際環境中,不建議把所有broker都配置到 bootstrap.servers ,指定 3~4 臺即可,因為 Producer 一旦連線到叢集中的任一臺 Broker,就能拿到整個叢集的 Broker 資訊。 Producer 向某一臺 Broker 傳送了 METADATA 請求,嘗試獲取叢集的元資料資訊。

但Sender 的執行緒在例項啟動後傳送,是有風險的,雖然KafkaProducer是執行緒安全,但在物件構造器中啟動執行緒會造成 this 指標的逃逸。理論上,Sender 執行緒完全能夠觀測到一個尚未構造完成的 KafkaProducer 例項。當然,在構造物件時建立執行緒沒有任何問題,但最好是不要同時啟動它。

實際上,TCP 連線還可能在兩個地方被建立:一個是在更新元資料後,另一個是在訊息傳送時。

場景一:當 Producer 嘗試給一個不存在的主題傳送訊息時,Broker 會告訴 Producer 說這個主題不存在。此時 Producer 會發送 METADATA 請求給 Kafka 叢集,去嘗試獲取最新的元資料資訊。

場景二:Producer 通過 metadata.max.age.ms 引數定期地去更新元資料資訊。該引數的預設值是 300000,即 5 分鐘,也就是說不管叢集那邊是否有變化,Producer 每 5 分鐘都會強制重新整理一次元資料以保證它是最及時的資料。

2.TCP連線何時關閉?

Producer 端關閉 TCP 連線的方式有兩種:一種是使用者主動關閉;一種是 Kafka 自動關閉。

推薦關閉使用producer.close() 方法來關閉,kill -9也是使用者主動關閉。

自動關閉是設定Producer 端引數 connections.max.idle.ms 的值,預設是9min,如果在 9 分鐘內沒有任何請求“流過”某個 TCP 連線,那麼 Kafka 會主動幫你把該 TCP 連線關閉。

使用者可以在 Producer 端設定 connections.max.idle.ms=-1 禁掉這種機制。一旦被設定成 -1,TCP 連線將成為永久長連線。當然這只是軟體層面的“長連線”機制,由於 Kafka 建立的這些 Socket 連線都開啟了 keepalive,因此 keepalive 探活機制還是會遵守的。

值得注意的是,在第二種方式中,TCP 連線是在 Broker 端被關閉的,但其實這個 TCP 連線的發起方是客戶端,因此在 TCP 看來,這屬於被動關閉的場景,即 passive close。被動關閉的後果就是會產生大量的 CLOSE_WAIT 連線,因此 Producer 端或 Client 端沒有機會顯式地觀測到此連線已被中斷。