1. 程式人生 > >.Net(c#)使用 Kafka 小結

.Net(c#)使用 Kafka 小結

# .Net(c#)使用 Kafka 小結 ## 1.開篇 由於專案中必須使用 kafka 來作為訊息元件,所以使用 kafka 有一段時間了。不得不感嘆 kafka 是一個相當優秀的訊息系統。下面直接對使用過程做一總結,希望對大家有用。 ### 1.1.kafka 部署 kafka 的簡單搭建我們使用 docker 進行,方便快捷單節點。生產環境不推薦這樣的單節點 kafka 部署。 #### 1.1.1.確保安裝了 docker 和 docker-compose 網上很多教程,安裝也簡單,不作為重點贅述。 #### 1.1.2.編寫 docker-compose.yml 將以下內容直接複製到新建空檔案`docker-compose.yml`中。 ```yml version: "3" services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: [zookeeper] ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_CREATE_TOPICS: "test" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock ``` #### 1.1.3.容器構建提交 在`docker-compose.yml`檔案的目錄下執行以下命令: ```sh docker-compose build # 打包 docker-compose up # 啟動, 新增 -d 可以後臺啟動。 ``` 看到日誌輸出: ```sh Creating network "desktop_default" with the default driver Creating desktop_zookeeper_1 ... done Creating desktop_kafka_1 ... done Attaching to desktop_zookeeper_1, desktop_kafka_1 zookeeper_1 | ZooKeeper JMX enabled by default zookeeper_1 | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg zookeeper_1 | 2020-05-17 03:34:31,794 [myid:] - INFO [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg ... zookeeper_1 | 2020-05-17 03:34:31,872 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 2000 ... kafka_1 | Excluding KAFKA_VERSION from broker config ``` 沒有錯誤輸出說明部署成功。 ## 2.kafka 客戶端選擇 在 github 上能夠找到好幾個 c#可以使用的 kafka 客戶端。大家可以去搜一下,本文就只說明[rdkafka-dotnet](https://github.com/ah-/rdkafka-dotnet)和[confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet)。 ### 2.1.rdkafka-dotnet 我們生產環境中就使用的該客戶端。在該專案 github 首頁中可以看到: ```csharp var config = new Config() { GroupId = "example-csharp-consumer" }; using (var consumer = new EventConsumer(config, "127.0.0.1:9092")) { consumer.OnMessage += (obj, msg) => { //... }; } ``` 沒錯,使用它的原因就是它提供了**EventConsumer**,可以直接非同步訂閱訊息。整體上來說該客戶端非常的穩定,效能優良。使用過程中比較難纏的就是它的配置,比較不直觀。它基於[librdkafka(C/C++)](https://github.com/edenhill/librdkafka)實現,配置 Config 類中顯式配置比較少,大多數是通過字典配置的,比如: ```csharp var config = new Config(); config["auto.offset.reset"] = "earliest";//配置首次訊息偏移位置為最早 ``` 這對於新手來說並不是很友好,很難想到去這樣配置。當然如果有 **librdkafka** 的使用經驗會好很多。大多數配置在 **librdkafka** 專案的[CONFIGURATION](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。 還有一個需要注意的是 Broker 的版本支援[Broker version support: >=0.8](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility),也在 librdkafka 專案中可以找到。 ### 2.2 confluent-kafka-dotnet confluent-kafka-dotnet 是 rdkafka-dotnet(好幾年沒有維護了)的官方後續版本。推薦使用 confluent-kafka-dotnet,因為配置相對友好,更加全面。比如: ```csharp var conf = new ConsumerConfig { AutoOffsetReset = AutoOffsetReset.Earliest//顯式強型別賦值配置 }; ``` 對於 EventConsumer 怎麼辦呢?在專案[變更記錄](https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/CHANGELOG.md)中已經明確提出移除了 OnMessage 多播委託,而 EventConsumer,也就不存在了。但這不難,我們可以參照基專案寫一個: ```csharp public class Event