Kafka效能優化---1
阿新 • • 發佈:2019-01-29
吐槽語:
17年初的時候聽說了Kafka這個東西,傳聞甚巨,傳其有神鬼莫測之效能,於是抱著站在巨人肩膀上的態度開始虛心鑽研,從此就踏上了一條踩巨人肩膀的坑之不歸路。。。
初時,花了兩天時間查閱了Kafka相關資料,胸有成竹之際從官網下載了最新的Kafka和最新的zookeeper,想搭建一個環境來測試一二。按照網上的教程一步一步,摩擦摩擦,結果biu的一下就搭建好了,so easy?! 遂開始著手寫程式碼,千辛萬苦終於找到了librdkafka這個用c語言封裝的對Kafka客戶端的使用庫,OK,原始碼下載下來開始看,哎喲不錯喲,竟然有對c++的封裝,照著示例程式碼稀裡糊塗一頓封裝,終於封裝出兩個自己的c++類,一個Producer類,一個Consumer類。然後竟然就這麼很簡單很順利的用起來了!what?這比我以前用的那些訊息中介軟體都要容易百倍啊有木有!
但是,俗話說,凡事不要高興的太早。。太早。。早。。。。
一.Kafka基礎知識普及
研究kafka首先要搞清楚它到底是什麼,能做什麼事。官方定義如下:
Apache Kafka® is a distributed streaming platform. What exactly does that mean?
翻譯過來就是:Apache Kafka®是一個分散式流媒體平臺。這到底是個什麼鬼呢?
具體是什麼鬼,官方及非官方原理概念講解一大堆,一抄二,二抄三,三抄萬物,萬物生太極。。咳咳,言歸正傳。經過我潛心研究,給出如下定義:kafka是一個訊息佇列。是為了分離業務,解耦合,面向微服務吹牛比的分散式訊息佇列。有了上面的概念之後,那zookeeper又是搗什麼亂的呢,想一想,我們線上程之間資料互動需要加一把鎖還有其他互斥量訊號量之類的東西來保證資料的執行緒安全性,分散式系統裡沒有鎖,訊號量,互斥量怎麼辦,牛逼的人們於是就搞出了zookeeper這個東西來充當這些角色。zookeeper的功能(配置管理,名字服務,提供分散式同步以及叢集管理)。所以就一句話,kafka這個分散式叢集管理亂七八糟的資料客戶端,zookeeper負責管理kafka這個分散式叢集。
二.kafka的生產和消費機制
幼時懵懵懂懂之際,小心翼翼的按照套路呼叫生產消費,一個標點符號都不敢馬虎。我們都知道各種客戶端庫提供生產和消費介面,呼叫producer介面傳送一個"hello consumer"到某個主題上,然後很容易的就能通過consumer介面收到這個訊息。但是我們要的是可以實際應用的程式,而不是隻能hello world的測試程式。那麼問題來了: 我每秒鐘能傳送多少條hello world? 每秒鐘能生產多少條1K, 100K, 1M大小的資料? 發出去的資料會不會丟,會不會重,怎麼確定成功還是失敗? 我每秒鐘能收到多少條hello world? 每秒鐘能消費多少條1K,100K,1M大小的資料? 收到的訊息會不會丟,會不會重?怎麼保證不丟而且不重? 進而更大的疑問產生了,影響生產和消費效能的因素到底有哪些,受哪些硬體或者環境的制約? 怎麼解決這些問題?先來說說生產吧。我一直納悶producer為什麼只提供了非同步方式生產而沒有提供同步方式(配置裡提供的同步方式生產,經測,並不是真正意義上的同步),直到我自己實現了一個同步生產的介面。。。當然如果你能忍受每秒鐘只發20條hello world的效能,我還是介意使用同步方式,簡單啊,起碼每條訊息都能通過返回值確定成功或者失敗,也就保證了訊息不會丟也不會重。官網不提供同步介面,那怎麼實現同步方式生產呢,也很簡單,在呼叫非同步生產之後阻住,直到生產的回撥裡返回成功或失敗,結果賦給sync_key_, 這樣while條件監測到sync_key_的值被改變之後,就會跳出迴圈。
static thread_local int sync_key_;
/*同步生產*/
thread_local int iCount = 0, nBlock = 10;
while (sync_key_ == SYNC_PRODUCE_KEY)
{
producer_->poll(lTimeOut / nBlock);
if (++iCount > nBlock)
{
iCount = 0; break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if (sync_key_ != RdKafka::ERR_NO_ERROR)
{
if (sync_key_ == SYNC_PRODUCE_KEY)
{
CLogEvent log_event(CLogEvent::Type::LOG_KAFKA, (KFKErrorCode)sync_key_, "生產超時");
m_errorCallback(log_event, m_pErrUser);
}
return -1;
}
resp = producer_->flush(lTimeOut);
if (resp != RdKafka::ERR_NO_ERROR)
{
CLogEvent log_event(CLogEvent::Type::LOG_PRODUCER, RdKafka::err2str(resp).c_str());
m_errorCallback(log_event, m_pErrUser);
return -1;
}
/*設定生產者型別為同步方式*/
conf_->set("producer.type", "sync", errstr_);
conf_->set("queue.buffering.max.ms", "1", errstr_);
conf_->set("socket.blocking.max.ms", "10", errstr_);
//conf_->set("socket.nagle.disable", "true", errstr_);//此處設為true可以提升同步生產效能,副作用未知
非同步方式的生產就不做詳解了,要注意的就是幾個配置項的配置。下面是我非同步生產的配置:
/*設定生產者型別為非同步方式*/
conf_->set("producer.type", "async", errstr_);
conf_->set("batch.num.messages", "100", errstr_);
conf_->set("queue.buffering.max.kbytes", "1000000", errstr_);
conf_->set("queue.buffering.max.ms", "1000", errstr_);
conf_->set("queue.buffering.max.messages", "600", errstr_);
conf_->set("queue.enqueue.timeout.ms", "0", errstr_);
conf_->set("socket.blocking.max.ms", "10", errstr_);
/*設定訊息處理報告回撥*/
conf_->set("dr_cb", (RdKafka::DeliveryReportCb*)this, errstr_);
conf_->set("event_cb", (RdKafka::EventCb*)this, errstr_);
/*關閉broker日誌輸出*/
conf_->set("log.connection.close", "false", errstr_);
/*kafka叢集生產的資料至少一個節點需要寫成功。建議設定為1*/
tconf_->set("request.required.acks", "1", errstr_);
再來說說消費的坑吧,我們主要關注的是如何保證消費時訊息不丟不重,以及我們能不能指定某個區間範圍內的資料進行消費。kafka本身提供兩種方式消費,一種低階API,優點是很靈活,能滿足各種場景的需求,缺點就是繁瑣,還提供一種高階API,優點就是簡單,那是相當好用,缺點就是不同的業務需求需要自己做處理。低階API就不說了,如果有1000個分割槽,還要挨個分割槽指定,程式碼層面來說,太囉嗦,能不能用高階API來保證訊息不丟不重,而且還可以指定某個區間範圍內的資料進行消費呢?經過我複雜大腦的反覆思考,終於敲出了一套程式碼來是實現這種變態的需求:
a. 利用RdKafka::KafkaConsumer這個高階API裡的rebalance_cb我們可以做很多犀利的操作,來達到我們的目的。首先在rebalance_cb這個回撥裡,我們很輕鬆的就可以拿到std::vector<RdKafka::TopicPartition*>& partitions這麼一個玩意。這玩意是幹啥的呢,這東西就是儲存了觸發rebalance負載均衡時,每個主題+分割槽+offset的資訊,然後我們可以把這個vector裡的資訊都修改成我們想要起始位置,這樣就可以控制KafkaConsumer的起始消費位置了,同樣的道理,起始的位置能控制,那結束的位置也沒啥好控制的了,只要給一個結束位置,每消費出一條資料,跟結束位置做一個比較,到結束位置了,就直接停止消費就OK了,通過這種思路就實現了指定某個區間範圍內消費的效果。 b. 至於僅且只消費一次的語義,實際上沒有完美實現的方式。對資料的業務處理,和對消費出來的訊息進行提交不可能真正意義上達到事務處理或者原子處理的效果。當然還是存在一種業務場景可以滿足真正意義上的事務處理,那就是我們消費出來的資料是存到資料庫,然後把這條資料對應的offset也存到相同的資料庫,這個整體作為一個事務去處理,要麼同時提交,要麼同時回滾,這才是真正意義上的消費與業務處理的only one語義。
扯了這麼多,雲裡霧裡,貼點程式碼出來提提神吧,至於能不能看懂的話,我只能說:隨緣吧。目前只有我一個人能看懂我自己寫的這份程式碼,估計過段時間就果斷看不懂了。。。先看看配置項:
/*使用負載均衡*//*這個配置是必須的,不然我們啥都控制不了*/
res = conf_->set("rebalance_cb", (RdKafka::RebalanceCb*)this, errstr_);
if (res != RdKafka::Conf::CONF_OK) {
//std::cerr << errstr_ << std::endl;
m_errorCallback(res, errstr_, m_pErrUser);
}
/*設定快取佇列大小*//*這個不宜過大,否則會造成記憶體洩漏的假象*/
res = conf_->set("queued.max.messages.kbytes", "100000", errstr_);
if (res != RdKafka::Conf::CONF_OK)
{
//std::cerr << errstr_ << std::endl;
m_errorCallback(res, errstr_, m_pErrUser);
return false;
}
/*事件回撥*/
res = conf_->set("event_cb", (RdKafka::EventCb*)this, errstr_);
if (res != RdKafka::Conf::CONF_OK)
{
//std::cerr << errstr_ << std::endl;
m_errorCallback(res, errstr_, m_pErrUser);
return false;
}
/*關閉broker日誌輸出*/
conf_->set("log.connection.close", "false", errstr_);
/*debug除錯*/
res = conf_->set("debug", "all", errstr_);
conf_->set("log.queue", "true", errstr_);
if (res != RdKafka::Conf::CONF_OK)
{
//std::cerr << errstr_ << std::endl;
m_errorCallback(res, errstr_, m_pErrUser);
return false;
}
/*預設還要來這麼一句*/
conf_->set("default_topic_conf", tconf_.get(), errstr_);
僅且只消費一次這個梗上面已經提過了,就是要關閉自動提交,改為手動提交,在將這條消費出來的資料處理完畢之後,呼叫一下commit就好了。主要看看如何用高階API來實現控制區間消費吧,首先定義了兩個結構,用於設定每個主題+分割槽(TopicPartition)上的消費區間[起始offset, 結束offset]:
//資料消費的偏移量資訊
typedef struct tagCommitOffset
{
std::string group_id; //offset所在消費組
std::string topic_name; //offset所在主題名字
long topic_partition; //offset所在的分割槽
long long partition_offset; //offset值
long long offset_begin; //起始消費的offset值
long long offset_end; //終止消費的offset值
int finished; //處理完成標誌
std::string data_id; //資料id
int cosume_status; //offset消費狀態 0-失敗,1-成功
int commit_status; //offset提交狀態
tagCommitOffset()
{
finished = 0;
}
}COMMIT_OFFSET, OFFSET;
typedef struct RetransOffsets
{
string str_begin_time;
string str_end_time;
map<long, vector<OFFSET>> map_part_offsets;
RetransOffsets()
{
str_begin_time = "";
str_end_time = "";
map_part_offsets.clear();
}
~RetransOffsets()
{
str_begin_time = "";
str_end_time = "";
map_part_offsets.clear();
}
}CONSUME_CONDITION, OFFSETS_GROUP;
接下來就是開啟一個消費執行緒,然後線上程裡做控制了,注意劃重點了,用pause--->seek--->resume來進行消費位置的定位,要比unassign--->assign這種方式的效能高很多:
/*如果消費區間們不為空,且第一次進執行緒函式*/
if (condition_.map_part_offsets.size() > 0 && condition_flag_ == 0)
{
/*offsets_是通過rebalance_cb拿到的std::vector<RdKafka::TopicPartition*>& partitions*/
consumer_->pause(offsets_); //暫停消費
part_list_ressign(offsets_); //設定消費起始位置
for (auto it : offsets_)
{
consumer_->seek(*it, 1000); //定位到起始位置
}
consumer_->resume(offsets_); //重新開始消費
condition_flag_ = 1; //這個變數只是為了紀念第一次操作不一樣,用完賦個非0就不用了
}
/*ressign_flag_這個變數也是為了區分第一次,嘿嘿*/
if (ressign_flag_ > 0)
{
bool all_finished = true;
for (auto it_part : condition_.map_part_offsets)
{
for (auto it_offset : it_part.second)
{
/*如果有一個分割槽上的offset區間拖後腿了沒完事,那就不算完*/
if (it_offset.finished == 0) all_finished = false;
}
}
/*如果全都完事了,那就game over*/
if (all_finished)
{
ressign_flag_ = -1;
m_errorCallback(0, "重傳結束", m_pErrUser);
m_bExit = true;
continue;
}
else
{
consumer_->pause(offsets_);
part_list_ressign(offsets_);
for (auto it : offsets_)
{
consumer_->seek(*it, 1000);
}
consumer_->resume(offsets_);
}
}
ressign_flag_ = 0;
RdKafka::Message *msg = consumer_->consume(1000);
if ( msg != nullptr)
{
manual_consume(msg, NULL);
delete msg;
}
這裡面有兩個關鍵的函式part_list_ressign(消費位置再定位)和manual_consume(手動標記消費狀態),這就是核心所在了:
void CConsumer::part_list_ressign(const std::vector<RdKafka::TopicPartition*>& partitions)
{
stringstream os;
if (rebalance_flag_ == 0)
{
RdKafka::TopicPartition::destroy(offsets_);
}
for (unsigned int i = 0; i < partitions.size(); i++)
{
os << partitions[i]->topic() <<
"[" << partitions[i]->partition() << "], ";
int64_t offset_begin;
if (condition_flag_ == 0) offset_begin = 0;
else offset_begin = partitions[i]->offset();
/*設定儲存的offset*/
if (condition_.map_part_offsets.size() == 0)
{
partitions[i]->set_offset(RdKafka::Topic::OFFSET_END);
}
else
{
long partition = partitions[i]->partition();
auto it = condition_.map_part_offsets.find(partition);
if (it != condition_.map_part_offsets.end())
{
auto& offsets_vec = condition_.map_part_offsets[partition];
unsigned int ncount = 0;
for ( unsigned int j = 0; j < offsets_vec.size();j++)
{
if (offsets_vec[j].finished == 0)
{
if (offsets_vec[j].offset_begin >= offset_begin)
{
offset_begin = offsets_vec[j].offset_begin;
break;
}
}
else {
ncount++;
}
}
if (ncount < offsets_vec.size()) {
partitions[i]->set_offset(offset_begin);
}
else {
partitions[i]->set_offset(RdKafka::Topic::OFFSET_END);
}
}
else
{
partitions[i]->set_offset(RdKafka::Topic::OFFSET_END);
}
}
if (rebalance_flag_ == 0)
{
offsets_.push_back(RdKafka::TopicPartition::create(partitions[i]->topic(), partitions[i]->partition()));
}
}
//m_errorCallback(0, os.str(), m_pErrUser);
}
void CConsumer::manual_consume(RdKafka::Message * message, void * opaque)
{
stringstream ostring;
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
{
long partition = message->partition();
auto it = condition_.map_part_offsets.find(partition);
if (it != condition_.map_part_offsets.end())
{
auto& offsets_vec = condition_.map_part_offsets[partition];
for (auto& it_offset : offsets_vec)
{
if (message->offset() >= it_offset.offset_begin
&& message->offset() <= it_offset.offset_end)
{
char chtemp[256] = { 0 };
mysprintf(chtemp, "Read Message at %s[%d], offset:%lld, Key:%s ",
message->topic_name().c_str(), message->partition(), message->offset(),
(*(message->key())).c_str());
m_errorCallback(0, chtemp, m_pErrUser);
/* 正確訊息體 */
if (message != NULL)
{
MESSAGE_INFO a_message;
RdKafka::MessageTimestamp ts;
ts = message->timestamp();
a_message.time_stamp = ts.timestamp;
a_message.msg_len = message->len();
a_message.msg_payload = message->payload();
a_message.msg_topic_name = message->topic_name();
a_message.msg_partition = message->partition();
a_message.msg_offset = message->offset();
a_message.err_code = message->err();
a_message.err_str = message->errstr();
a_message.key_str = message->key();
a_message.key_len = message->key_len();
if (*a_message.key_str >= condition_.str_begin_time
&& *a_message.key_str <= condition_.str_end_time)
{
//回撥給業務處理
m_dispathCallback(m_eType, (void*)&a_message, m_pDispathUser);
this->m_nConsumNum++;
//非同步提交
this->consumer_->commitAsync();
}
}
if (message->offset() == it_offset.offset_end)
{
this->ressign_flag_++;
it_offset.finished = 1;
}
}
}
}
/*設定儲存的offset*/
for (auto& it : offsets_)
{
if ((it->topic() == message->topic_name())
&& (it->partition() == message->partition()))
{
it->set_offset(message->offset() + 1);
}
}
}
break;
case RdKafka::ERR__PARTITION_EOF:
/* 最後一條訊息 */
{
char chtemp[256] = { 0 };
mysprintf(chtemp, "TopicPartion %s[%d]----message at end.",
message->topic_name().c_str(), message->partition());
m_errorCallback(message->err(), chtemp, m_pErrUser);
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
m_errorCallback(message->err(), message->errstr(), m_pErrUser);
break;
default:
/* 錯誤資訊 */
m_errorCallback(message->err(), message->errstr(), m_pErrUser);
break;
}
}
各位看官看懂了麼,嘿嘿,歡迎討論。這篇太長了,再分一篇吧,下一篇會有一些奇思妙想,以及在實際應用中如何一步一步提高吞吐量的血淚史。。。
下一篇:http://blog.csdn.net/zhu_0416/article/details/79113049