rdkafka 儲存offset到本地檔案
阿新 • • 發佈:2018-12-22
為了支援斷點續傳功能,需要將offset儲存在一個地方,下次從這個offset開始。librdkafka提供了本地檔案儲存的方式。
下面的程式碼演示了
1. 要用topic config物件設定 offset.store.path和offset.store.method
2. start函式接受引數OFFSET_STORED
std::unique_ptr<RdKafka::Consumer> consumer(RdKafka::Consumer::create(global_conf_.get(), err_)); if (!consumer) { throw KafkaError("Failed to create consumer" ); } stringstream stream; stream << topic_name_ << "-" << partition_idx << ".txt"; string file_path = stream.str(); string lasterr; topic_conf_->set("offset.store.path", file_path, lasterr); topic_conf_->set("offset.store.method", "file", lasterr); std::unique_ptr <RdKafka::Topic> topic(RdKafka::Topic::create(consumer.get(), topic_name_, topic_conf_.get(), err_)); // RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_BEGINNING); RdKafka::ErrorCode resp = consumer->start(topic.get(), partition_idx, RdKafka::Topic::OFFSET_STORED); if (resp != RdKafka::ERR_NO_ERROR) { throw KafkaError(RdKafka::err2str(resp)); }
注意:
1. 當前程序目錄下會出現$topic-$partiton-idx.txt檔案
2. 執行後正常讀取資料後需要過一會兒才會寫入數值
這個寫入時間可以設定:offset.store.sync.interval.ms
3. 程式啟動時,可以先手動將offset寫入檔案,然後再啟動
詳細配置參考github
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md