1. 程式人生 > >rdkafka 儲存offset到本地檔案

rdkafka 儲存offset到本地檔案

               

為了支援斷點續傳功能,需要將offset儲存在一個地方,下次從這個offset開始。librdkafka提供了本地檔案儲存的方式。

下面的程式碼演示了

1. 要用topic config物件設定 offset.store.path和offset.store.method

2. start函式接受引數OFFSET_STORED

  std::unique_ptr<RdKafka::Consumer> consumer(RdKafka::Consumer::create(global_conf_.get(), err_));  if (!consumer) {   throw KafkaError("Failed to create consumer"
);  }  stringstream stream;  stream << topic_name_ << "-" << partition_idx << ".txt";  string file_path = stream.str();  string lasterr;  topic_conf_->set("offset.store.path", file_path, lasterr);  topic_conf_->set("offset.store.method", "file", lasterr);  std::unique_ptr
<RdKafka::Topic> topic(RdKafka::Topic::create(consumer.get(), topic_name_, topic_conf_.get(), err_));  //  RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_BEGINNING);  RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_STORED);  if
(resp != RdKafka::ERR_NO_ERROR) {   throw KafkaError(RdKafka::err2str(resp));  }

注意:

1. 當前程序目錄下會出現$topic-$partiton-idx.txt檔案

2. 執行後正常讀取資料後需要過一會兒才會寫入數值

這個寫入時間可以設定:offset.store.sync.interval.ms

3. 程式啟動時,可以先手動將offset寫入檔案,然後再啟動

詳細配置參考github

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md