.Net(c#)使用 Kafka 小結
阿新 • • 發佈:2020-05-18
# .Net(c#)使用 Kafka 小結
## 1.開篇
由於專案中必須使用 kafka 來作為訊息元件,所以使用 kafka 有一段時間了。不得不感嘆 kafka 是一個相當優秀的訊息系統。下面直接對使用過程做一總結,希望對大家有用。
### 1.1.kafka 部署
kafka 的簡單搭建我們使用 docker 進行,方便快捷單節點。生產環境不推薦這樣的單節點 kafka 部署。
#### 1.1.1.確保安裝了 docker 和 docker-compose
網上很多教程,安裝也簡單,不作為重點贅述。
#### 1.1.2.編寫 docker-compose.yml
將以下內容直接複製到新建空檔案`docker-compose.yml`中。
```yml
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: "test"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
```
#### 1.1.3.容器構建提交
在`docker-compose.yml`檔案的目錄下執行以下命令:
```sh
docker-compose build # 打包
docker-compose up # 啟動, 新增 -d 可以後臺啟動。
```
看到日誌輸出:
```sh
Creating network "desktop_default" with the default driver
Creating desktop_zookeeper_1 ... done
Creating desktop_kafka_1 ... done
Attaching to desktop_zookeeper_1, desktop_kafka_1
zookeeper_1 | ZooKeeper JMX enabled by default
zookeeper_1 | Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper_1 | 2020-05-17 03:34:31,794 [myid:] - INFO [main:QuorumPeerConfig@136] - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
...
zookeeper_1 | 2020-05-17 03:34:31,872 [myid:] - INFO [main:ZooKeeperServer@836] - tickTime set to 2000
...
kafka_1 | Excluding KAFKA_VERSION from broker config
```
沒有錯誤輸出說明部署成功。
## 2.kafka 客戶端選擇
在 github 上能夠找到好幾個 c#可以使用的 kafka 客戶端。大家可以去搜一下,本文就只說明[rdkafka-dotnet](https://github.com/ah-/rdkafka-dotnet)和[confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet)。
### 2.1.rdkafka-dotnet
我們生產環境中就使用的該客戶端。在該專案 github 首頁中可以看到:
```csharp
var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
{
consumer.OnMessage += (obj, msg) =>
{
//...
};
}
```
沒錯,使用它的原因就是它提供了**EventConsumer**,可以直接非同步訂閱訊息。整體上來說該客戶端非常的穩定,效能優良。使用過程中比較難纏的就是它的配置,比較不直觀。它基於[librdkafka(C/C++)](https://github.com/edenhill/librdkafka)實現,配置 Config 類中顯式配置比較少,大多數是通過字典配置的,比如:
```csharp
var config = new Config();
config["auto.offset.reset"] = "earliest";//配置首次訊息偏移位置為最早
```
這對於新手來說並不是很友好,很難想到去這樣配置。當然如果有 **librdkafka** 的使用經驗會好很多。大多數配置在 **librdkafka** 專案的[CONFIGURATION](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)。
還有一個需要注意的是 Broker 的版本支援[Broker version support: >=0.8](https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility),也在 librdkafka 專案中可以找到。
### 2.2 confluent-kafka-dotnet
confluent-kafka-dotnet 是 rdkafka-dotnet(好幾年沒有維護了)的官方後續版本。推薦使用 confluent-kafka-dotnet,因為配置相對友好,更加全面。比如:
```csharp
var conf = new ConsumerConfig
{
AutoOffsetReset = AutoOffsetReset.Earliest//顯式強型別賦值配置
};
```
對於 EventConsumer 怎麼辦呢?在專案[變更記錄](https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/CHANGELOG.md)中已經明確提出移除了 OnMessage 多播委託,而 EventConsumer,也就不存在了。但這不難,我們可以參照基專案寫一個:
```csharp
public class Event