1. 程式人生 > >c語言使用librdkafka庫實現kafka的生產和消費例項

c語言使用librdkafka庫實現kafka的生產和消費例項

關於librdkafka庫的介紹,可以參考kafka的c/c++高效能客戶端librdkafka簡介,本文使用librdkafka庫來進行kafka的簡單的生產、消費

一、producer librdkafka進行kafka生產操作的大致步驟如下:

1、建立kafka配置 rd_kafka_conf_t *rd_kafka_conf_new (void)

2、配置kafka各項引數 rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,                                        const char *name,                                        const char *value,                                        char *errstr, size_t errstr_size)

3、設定傳送回撥函式 void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,                                   void (*dr_msg_cb) (rd_kafka_t *rk,                                   const rd_kafka_message_t *                                   rkmessage,                                   void *opaque))

4、建立producer例項 rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)

5、例項化topic rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)

6、非同步呼叫將訊息傳送到指定的topic int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,               int msgflags,               void *payload, size_t len,               const void *key, size_t keylen,               void *msg_opaque)

7、阻塞等待訊息傳送完成 int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)

8、等待完成producer請求完成 rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)

9、銷燬topic void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)

10、銷燬producer例項 void rd_kafka_destroy (rd_kafka_t *rk)

完整程式碼如下my_producer.c: #include <stdio.h> #include <signal.h> #include <string.h>   #include "../src/rdkafka.h"   static int run = 1;   static void stop(int sig){     run = 0;     fclose(stdin); }   /*     每條訊息呼叫一次該回調函式,說明訊息是傳遞成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR)     還是傳遞失敗(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)     該回調函式由rd_kafka_poll()觸發,在應用程式的執行緒上執行  */ static void dr_msg_cb(rd_kafka_t *rk,                       const rd_kafka_message_t *rkmessage, void *opaque){         if(rkmessage->err)             fprintf(stderr, "%% Message delivery failed: %s\n",                      rd_kafka_err2str(rkmessage->err));         else             fprintf(stderr,                         "%% Message delivered (%zd bytes, "                         "partition %"PRId32")\n",                         rkmessage->len, rkmessage->partition);         /* rkmessage被librdkafka自動銷燬*/ }   int main(int argc, char **argv){     rd_kafka_t *rk;            /*Producer instance handle*/     rd_kafka_topic_t *rkt;     /*topic物件*/     rd_kafka_conf_t *conf;     /*臨時配置物件*/     char errstr[512];               char buf[512];                  const char *brokers;            const char *topic;                if(argc != 3){         fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);         return 1;     }       brokers = argv[1];     topic = argv[2];       /* 建立一個kafka配置佔位 */     conf = rd_kafka_conf_new();       /*建立broker叢集*/     if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,                 sizeof(errstr)) != RD_KAFKA_CONF_OK){         fprintf(stderr, "%s\n", errstr);         return 1;     }       /*設定傳送報告回撥函式,rd_kafka_produce()接收的每條訊息都會呼叫一次該回調函式      *應用程式需要定期呼叫rd_kafka_poll()來服務排隊的傳送報告回撥函式*/     rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);       /*建立producer例項       rd_kafka_new()獲取conf物件的所有權,應用程式在此呼叫之後不得再次引用它*/     rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));     if(!rk){         fprintf(stderr, "%% Failed to create new producer:%s\n", errstr);         return 1;     }       /*例項化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic     物件儲存topic特定的配置,並在內部填充所有可用分割槽和leader brokers,*/     rkt = rd_kafka_topic_new(rk, topic, NULL);     if (!rkt){         fprintf(stderr, "%% Failed to create topic object: %s\n",                  rd_kafka_err2str(rd_kafka_last_error()));         rd_kafka_destroy(rk);         return 1;     }       /*用於中斷的訊號*/     signal(SIGINT, stop);       fprintf(stderr,                 "%% Type some text and hit enter to produce message\n"                 "%% Or just hit enter to only serve delivery reports\n"                 "%% Press Ctrl-C or Ctrl-D to exit\n");        while(run && fgets(buf, sizeof(buf), stdin)){          size_t len = strlen(buf);            if(buf[len-1] == '\n')              buf[--len] = '\0';            if(len == 0){             /*輪詢用於事件的kafka handle,             事件將導致應用程式提供的回撥函式被呼叫             第二個引數是最大阻塞時間,如果設為0,將會是非阻塞的呼叫*/              rd_kafka_poll(rk, 0);              continue;          }        retry:          /*Send/Produce message.            這是一個非同步呼叫,在成功的情況下,只會將訊息排入內部producer佇列,            對broker的實際傳遞嘗試由後臺執行緒處理,之前註冊的傳遞迴調函式(dr_msg_cb)            用於在訊息傳遞成功或失敗時嚮應用程式發回訊號*/          if (rd_kafka_produce(                     /* Topic object */                      rkt,                     /*使用內建的分割槽來選擇分割槽*/                      RD_KAFKA_PARTITION_UA,                     /*生成payload的副本*/                      RD_KAFKA_MSG_F_COPY,                     /*訊息體和長度*/                      buf, len,                     /*可選鍵及其長度*/                      NULL, 0,                      NULL) == -1){              fprintf(stderr,                   "%% Failed to produce to topic %s: %s\n",                   rd_kafka_topic_name(rkt),                  rd_kafka_err2str(rd_kafka_last_error()));                if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){                 /*如果內部佇列滿,等待訊息傳輸完成並retry,                 內部隊列表示要傳送的訊息和已傳送或失敗的訊息,                 內部佇列受限於queue.buffering.max.messages配置項*/                  rd_kafka_poll(rk, 1000);                  goto retry;              }              }else{              fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",                   len, rd_kafka_topic_name(rkt));          }           /*producer應用程式應不斷地通過以頻繁的間隔呼叫rd_kafka_poll()來為         傳送報告佇列提供服務。在沒有生成訊息以確定先前生成的訊息已傳送了其         傳送報告回撥函式(和其他註冊過的回撥函式)期間,要確保rd_kafka_poll()         仍然被呼叫*/          rd_kafka_poll(rk, 0);      }        fprintf(stderr, "%% Flushing final message.. \n");      /*rd_kafka_flush是rd_kafka_poll()的抽象化,      等待所有未完成的produce請求完成,通常在銷燬producer例項前完成      以確保所有排列中和正在傳輸的produce請求在銷燬前完成*/      rd_kafka_flush(rk, 10*1000);        /* Destroy topic object */      rd_kafka_topic_destroy(rkt);        /* Destroy the producer instance */      rd_kafka_destroy(rk);        return 0; }

二、consumer librdkafka進行kafka消費操作的大致步驟如下:

1、建立kafka配置 rd_kafka_conf_t *rd_kafka_conf_new (void) 2、建立kafka topic的配置 rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)  3、配置kafka各項引數 rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,                                        const char *name,                                        const char *value,                                        char *errstr, size_t errstr_size) 4、配置kafka topic各項引數 rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,                          const char *name,                          const char *value,                          char *errstr, size_t errstr_size) 5、建立consumer例項 rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size) 6、為consumer例項新增brokerlist int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

7、開啟consumer訂閱 rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)

8、輪詢訊息或事件,並呼叫回撥函式 rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms) 9、關閉consumer例項 rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)

10、釋放topic list資源 rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)

11、銷燬consumer例項 void rd_kafka_destroy (rd_kafka_t *rk) 

12、等待consumer物件的銷燬 int rd_kafka_wait_destroyed (int timeout_ms)

完整程式碼如下my_consumer.c #include <string.h> #include <stdlib.h> #include <syslog.h> #include <signal.h> #include <error.h> #include <getopt.h>   #include "../src/rdkafka.h"   static int run = 1; //`rd_kafka_t`自帶一個可選的配置API,如果沒有呼叫API,Librdkafka將會使用CONFIGURATION.md中的預設配置。 static rd_kafka_t *rk; static rd_kafka_topic_partition_list_t *topics;   static void stop (int sig) {   if (!run)     exit(1);   run = 0;   fclose(stdin); /* abort fgets() */ }   static void sig_usr1 (int sig) {   rd_kafka_dump(stdout, rk); }   /**  * 處理並列印已消費的訊息  */ static void msg_consume (rd_kafka_message_t *rkmessage,        void *opaque) {   if (rkmessage->err) {     if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {       fprintf(stderr,         "%% Consumer reached end of %s [%"PRId32"] "              "message queue at offset %"PRId64"\n",              rd_kafka_topic_name(rkmessage->rkt),              rkmessage->partition, rkmessage->offset);         return;     }       if (rkmessage->rkt)             fprintf(stderr, "%% Consume error for "                     "topic \"%s\" [%"PRId32"] "                     "offset %"PRId64": %s\n",                     rd_kafka_topic_name(rkmessage->rkt),                     rkmessage->partition,                     rkmessage->offset,                     rd_kafka_message_errstr(rkmessage));     else             fprintf(stderr, "%% Consumer error: %s: %s\n",                     rd_kafka_err2str(rkmessage->err),                     rd_kafka_message_errstr(rkmessage));       if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||         rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)           run = 0;     return;   }     fprintf(stdout, "%% Message (topic %s [%"PRId32"], "                       "offset %"PRId64", %zd bytes):\n",                       rd_kafka_topic_name(rkmessage->rkt),                       rkmessage->partition,     rkmessage->offset, rkmessage->len);     if (rkmessage->key_len) {     printf("Key: %.*s\n",              (int)rkmessage->key_len, (char *)rkmessage->key);   }     printf("%.*s\n",            (int)rkmessage->len, (char *)rkmessage->payload);    }   /*   init all configuration of kafka  */ int initKafka(char *brokers, char *group,char *topic){   rd_kafka_conf_t *conf;   rd_kafka_topic_conf_t *topic_conf;   rd_kafka_resp_err_t err;   char tmp[16];   char errstr[512];     /* Kafka configuration */   conf = rd_kafka_conf_new();     //quick termination   snprintf(tmp, sizeof(tmp), "%i", SIGIO);   rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);     //topic configuration   topic_conf = rd_kafka_topic_conf_new();     /* Consumer groups require a group id */   if (!group)           group = "rdkafka_consumer_example";   if (rd_kafka_conf_set(conf, "group.id", group,                         errstr, sizeof(errstr)) !=       RD_KAFKA_CONF_OK) {           fprintf(stderr, "%% %s\n", errstr);           return -1;   }     /* Consumer groups always use broker based offset storage */   if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",                               "broker",                               errstr, sizeof(errstr)) !=       RD_KAFKA_CONF_OK) {           fprintf(stderr, "%% %s\n", errstr);           return -1;   }     /* Set default topic config for pattern-matched topics. */   rd_kafka_conf_set_default_topic_conf(conf, topic_conf);     //例項化一個頂級物件rd_kafka_t作為基礎容器,提供全域性配置和共享狀態   rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));   if(!rk){     fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);     return -1;   }     //Librdkafka需要至少一個brokers的初始化list   if (rd_kafka_brokers_add(rk, brokers) == 0){     fprintf(stderr, "%% No valid brokers specified\n");     return -1;   }     //重定向 rd_kafka_poll()佇列到consumer_poll()佇列   rd_kafka_poll_set_consumer(rk);     //建立一個Topic+Partition的儲存空間(list/vector)   topics = rd_kafka_topic_partition_list_new(1);   //把Topic+Partition加入list   rd_kafka_topic_partition_list_add(topics, topic, -1);   //開啟consumer訂閱,匹配的topic將被新增到訂閱列表中   if((err = rd_kafka_subscribe(rk, topics))){       fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));       return -1;   }     return 1; }   int main(int argc, char **argv){   char *brokers = "localhost:9092";   char *group = NULL;   char *topic = NULL;      int opt;   rd_kafka_resp_err_t err;     while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){     switch (opt) {       case 'b':         brokers = optarg;         break;       case 'g':         group = optarg;         break;       case 't':         topic = optarg;         break;       default:         break;     }   }      signal(SIGINT, stop);   signal(SIGUSR1, sig_usr1);     if(!initKafka(brokers, group, topic)){     fprintf(stderr, "kafka server initialize error\n");   }else{     while(run){       rd_kafka_message_t *rkmessage;       /*-輪詢消費者的訊息或事件,最多阻塞timeout_ms         -應用程式應該定期呼叫consumer_poll(),即使沒有預期的訊息,以服務         所有排隊等待的回撥函式,當註冊過rebalance_cb,該操作尤為重要,         因為它需要被正確地呼叫和處理以同步內部消費者狀態 */       rkmessage = rd_kafka_consumer_poll(rk, 1000);       if(rkmessage){         msg_consume(rkmessage, NULL);         /*釋放rkmessage的資源,並把所有權還給rdkafka*/         rd_kafka_message_destroy(rkmessage);       }     }   }   done:     /*此呼叫將會阻塞,直到consumer撤銷其分配,呼叫rebalance_cb(如果已設定),     commit offset到broker,並離開consumer group     最大阻塞時間被設定為session.timeout.ms     */     err = rd_kafka_consumer_close(rk);     if(err){       fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));     }else{       fprintf(stderr, "%% Consumer closed\n");     }       //釋放topics list使用的所有資源和它自己     rd_kafka_topic_partition_list_destroy(topics);       //destroy kafka handle     rd_kafka_destroy(rk);        run = 5;     //等待所有rd_kafka_t物件銷燬,所有kafka物件被銷燬,返回0,超時返回-1     while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){       printf("Waiting for librdkafka to decommission\n");     }     if(run <= 0){       //dump rdkafka內部狀態到stdout流       rd_kafka_dump(stdout, rk);     }       return 0; }

在linux下編譯producer和consumer的程式碼: gcc my_producer.c -o my_producer  -lrdkafka -lz -lpthread -lrt gcc my_consumer.c -o my_consumer  -lrdkafka -lz -lpthread -lrt 在執行my_producer或my_consumer時可能會報錯"error while loading shared libraries xxx.so", 此時需要在/etc/ld.so.conf中加入xxx.so所在的目錄