c語言使用librdkafka庫實現kafka的生產和消費例項
關於librdkafka庫的介紹,可以參考kafka的c/c++高效能客戶端librdkafka簡介,本文使用librdkafka庫來進行kafka的簡單的生產、消費
一、producer librdkafka進行kafka生產操作的大致步驟如下:
1、建立kafka配置 rd_kafka_conf_t *rd_kafka_conf_new (void)
2、配置kafka各項引數 rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size)
3、設定傳送回撥函式 void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf, void (*dr_msg_cb) (rd_kafka_t *rk, const rd_kafka_message_t * rkmessage, void *opaque))
4、建立producer例項 rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)
5、例項化topic rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
6、非同步呼叫將訊息傳送到指定的topic int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition, int msgflags, void *payload, size_t len, const void *key, size_t keylen, void *msg_opaque)
7、阻塞等待訊息傳送完成 int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)
8、等待完成producer請求完成 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)
9、銷燬topic void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)
10、銷燬producer例項 void rd_kafka_destroy (rd_kafka_t *rk)
完整程式碼如下my_producer.c: #include <stdio.h> #include <signal.h> #include <string.h> #include "../src/rdkafka.h" static int run = 1; static void stop(int sig){ run = 0; fclose(stdin); } /* 每條訊息呼叫一次該回調函式,說明訊息是傳遞成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) 還是傳遞失敗(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) 該回調函式由rd_kafka_poll()觸發,在應用程式的執行緒上執行 */ static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque){ if(rkmessage->err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, " "partition %"PRId32")\n", rkmessage->len, rkmessage->partition); /* rkmessage被librdkafka自動銷燬*/ } int main(int argc, char **argv){ rd_kafka_t *rk; /*Producer instance handle*/ rd_kafka_topic_t *rkt; /*topic物件*/ rd_kafka_conf_t *conf; /*臨時配置物件*/ char errstr[512]; char buf[512]; const char *brokers; const char *topic; if(argc != 3){ fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]); return 1; } brokers = argv[1]; topic = argv[2]; /* 建立一個kafka配置佔位 */ conf = rd_kafka_conf_new(); /*建立broker叢集*/ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK){ fprintf(stderr, "%s\n", errstr); return 1; } /*設定傳送報告回撥函式,rd_kafka_produce()接收的每條訊息都會呼叫一次該回調函式 *應用程式需要定期呼叫rd_kafka_poll()來服務排隊的傳送報告回撥函式*/ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); /*建立producer例項 rd_kafka_new()獲取conf物件的所有權,應用程式在此呼叫之後不得再次引用它*/ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if(!rk){ fprintf(stderr, "%% Failed to create new producer:%s\n", errstr); return 1; } /*例項化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic 物件儲存topic特定的配置,並在內部填充所有可用分割槽和leader brokers,*/ rkt = rd_kafka_topic_new(rk, topic, NULL); if (!rkt){ fprintf(stderr, "%% Failed to create topic object: %s\n", rd_kafka_err2str(rd_kafka_last_error())); rd_kafka_destroy(rk); return 1; } /*用於中斷的訊號*/ signal(SIGINT, stop); fprintf(stderr, "%% Type some text and hit enter to produce message\n" "%% Or just hit enter to only serve delivery reports\n" "%% Press Ctrl-C or Ctrl-D to exit\n"); while(run && fgets(buf, sizeof(buf), stdin)){ size_t len = strlen(buf); if(buf[len-1] == '\n') buf[--len] = '\0'; if(len == 0){ /*輪詢用於事件的kafka handle, 事件將導致應用程式提供的回撥函式被呼叫 第二個引數是最大阻塞時間,如果設為0,將會是非阻塞的呼叫*/ rd_kafka_poll(rk, 0); continue; } retry: /*Send/Produce message. 這是一個非同步呼叫,在成功的情況下,只會將訊息排入內部producer佇列, 對broker的實際傳遞嘗試由後臺執行緒處理,之前註冊的傳遞迴調函式(dr_msg_cb) 用於在訊息傳遞成功或失敗時嚮應用程式發回訊號*/ if (rd_kafka_produce( /* Topic object */ rkt, /*使用內建的分割槽來選擇分割槽*/ RD_KAFKA_PARTITION_UA, /*生成payload的副本*/ RD_KAFKA_MSG_F_COPY, /*訊息體和長度*/ buf, len, /*可選鍵及其長度*/ NULL, 0, NULL) == -1){ fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){ /*如果內部佇列滿,等待訊息傳輸完成並retry, 內部隊列表示要傳送的訊息和已傳送或失敗的訊息, 內部佇列受限於queue.buffering.max.messages配置項*/ rd_kafka_poll(rk, 1000); goto retry; } }else{ fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n", len, rd_kafka_topic_name(rkt)); } /*producer應用程式應不斷地通過以頻繁的間隔呼叫rd_kafka_poll()來為 傳送報告佇列提供服務。在沒有生成訊息以確定先前生成的訊息已傳送了其 傳送報告回撥函式(和其他註冊過的回撥函式)期間,要確保rd_kafka_poll() 仍然被呼叫*/ rd_kafka_poll(rk, 0); } fprintf(stderr, "%% Flushing final message.. \n"); /*rd_kafka_flush是rd_kafka_poll()的抽象化, 等待所有未完成的produce請求完成,通常在銷燬producer例項前完成 以確保所有排列中和正在傳輸的produce請求在銷燬前完成*/ rd_kafka_flush(rk, 10*1000); /* Destroy topic object */ rd_kafka_topic_destroy(rkt); /* Destroy the producer instance */ rd_kafka_destroy(rk); return 0; }
二、consumer librdkafka進行kafka消費操作的大致步驟如下:
1、建立kafka配置 rd_kafka_conf_t *rd_kafka_conf_new (void) 2、建立kafka topic的配置 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) 3、配置kafka各項引數 rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size) 4、配置kafka topic各項引數 rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf, const char *name, const char *value, char *errstr, size_t errstr_size) 5、建立consumer例項 rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size) 6、為consumer例項新增brokerlist int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)
7、開啟consumer訂閱 rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
8、輪詢訊息或事件,並呼叫回撥函式 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms) 9、關閉consumer例項 rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)
10、釋放topic list資源 rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)
11、銷燬consumer例項 void rd_kafka_destroy (rd_kafka_t *rk)
12、等待consumer物件的銷燬 int rd_kafka_wait_destroyed (int timeout_ms)
完整程式碼如下my_consumer.c #include <string.h> #include <stdlib.h> #include <syslog.h> #include <signal.h> #include <error.h> #include <getopt.h> #include "../src/rdkafka.h" static int run = 1; //`rd_kafka_t`自帶一個可選的配置API,如果沒有呼叫API,Librdkafka將會使用CONFIGURATION.md中的預設配置。 static rd_kafka_t *rk; static rd_kafka_topic_partition_list_t *topics; static void stop (int sig) { if (!run) exit(1); run = 0; fclose(stdin); /* abort fgets() */ } static void sig_usr1 (int sig) { rd_kafka_dump(stdout, rk); } /** * 處理並列印已消費的訊息 */ static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) { if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { fprintf(stderr, "%% Consumer reached end of %s [%"PRId32"] " "message queue at offset %"PRId64"\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); return; } if (rkmessage->rkt) fprintf(stderr, "%% Consume error for " "topic \"%s\" [%"PRId32"] " "offset %"PRId64": %s\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset, rd_kafka_message_errstr(rkmessage)); else fprintf(stderr, "%% Consumer error: %s: %s\n", rd_kafka_err2str(rkmessage->err), rd_kafka_message_errstr(rkmessage)); if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION || rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) run = 0; return; } fprintf(stdout, "%% Message (topic %s [%"PRId32"], " "offset %"PRId64", %zd bytes):\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset, rkmessage->len); if (rkmessage->key_len) { printf("Key: %.*s\n", (int)rkmessage->key_len, (char *)rkmessage->key); } printf("%.*s\n", (int)rkmessage->len, (char *)rkmessage->payload); } /* init all configuration of kafka */ int initKafka(char *brokers, char *group,char *topic){ rd_kafka_conf_t *conf; rd_kafka_topic_conf_t *topic_conf; rd_kafka_resp_err_t err; char tmp[16]; char errstr[512]; /* Kafka configuration */ conf = rd_kafka_conf_new(); //quick termination snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); //topic configuration topic_conf = rd_kafka_topic_conf_new(); /* Consumer groups require a group id */ if (!group) group = "rdkafka_consumer_example"; if (rd_kafka_conf_set(conf, "group.id", group, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); return -1; } /* Consumer groups always use broker based offset storage */ if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method", "broker", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%% %s\n", errstr); return -1; } /* Set default topic config for pattern-matched topics. */ rd_kafka_conf_set_default_topic_conf(conf, topic_conf); //例項化一個頂級物件rd_kafka_t作為基礎容器,提供全域性配置和共享狀態 rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if(!rk){ fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr); return -1; } //Librdkafka需要至少一個brokers的初始化list if (rd_kafka_brokers_add(rk, brokers) == 0){ fprintf(stderr, "%% No valid brokers specified\n"); return -1; } //重定向 rd_kafka_poll()佇列到consumer_poll()佇列 rd_kafka_poll_set_consumer(rk); //建立一個Topic+Partition的儲存空間(list/vector) topics = rd_kafka_topic_partition_list_new(1); //把Topic+Partition加入list rd_kafka_topic_partition_list_add(topics, topic, -1); //開啟consumer訂閱,匹配的topic將被新增到訂閱列表中 if((err = rd_kafka_subscribe(rk, topics))){ fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err)); return -1; } return 1; } int main(int argc, char **argv){ char *brokers = "localhost:9092"; char *group = NULL; char *topic = NULL; int opt; rd_kafka_resp_err_t err; while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){ switch (opt) { case 'b': brokers = optarg; break; case 'g': group = optarg; break; case 't': topic = optarg; break; default: break; } } signal(SIGINT, stop); signal(SIGUSR1, sig_usr1); if(!initKafka(brokers, group, topic)){ fprintf(stderr, "kafka server initialize error\n"); }else{ while(run){ rd_kafka_message_t *rkmessage; /*-輪詢消費者的訊息或事件,最多阻塞timeout_ms -應用程式應該定期呼叫consumer_poll(),即使沒有預期的訊息,以服務 所有排隊等待的回撥函式,當註冊過rebalance_cb,該操作尤為重要, 因為它需要被正確地呼叫和處理以同步內部消費者狀態 */ rkmessage = rd_kafka_consumer_poll(rk, 1000); if(rkmessage){ msg_consume(rkmessage, NULL); /*釋放rkmessage的資源,並把所有權還給rdkafka*/ rd_kafka_message_destroy(rkmessage); } } } done: /*此呼叫將會阻塞,直到consumer撤銷其分配,呼叫rebalance_cb(如果已設定), commit offset到broker,並離開consumer group 最大阻塞時間被設定為session.timeout.ms */ err = rd_kafka_consumer_close(rk); if(err){ fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err)); }else{ fprintf(stderr, "%% Consumer closed\n"); } //釋放topics list使用的所有資源和它自己 rd_kafka_topic_partition_list_destroy(topics); //destroy kafka handle rd_kafka_destroy(rk); run = 5; //等待所有rd_kafka_t物件銷燬,所有kafka物件被銷燬,返回0,超時返回-1 while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){ printf("Waiting for librdkafka to decommission\n"); } if(run <= 0){ //dump rdkafka內部狀態到stdout流 rd_kafka_dump(stdout, rk); } return 0; }
在linux下編譯producer和consumer的程式碼: gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt 在執行my_producer或my_consumer時可能會報錯"error while loading shared libraries xxx.so", 此時需要在/etc/ld.so.conf中加入xxx.so所在的目錄