librdkafak消費者 最小c語言版本
阿新 • • 發佈:2018-10-31
這是我自己看的librdkafka的樣例程式碼整理來的,去掉很多修飾的程式碼
#include <ctype.h> #include <signal.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <syslog.h> #include <time.h> #include <sys/time.h> #include <getopt.h> #include <librdkafka/rdkafka.h> static int run = 1; static rd_kafka_t *rk; static void stop(int sig) { run = 0; fclose(stdin); /* abort fgets() */ } static void sig_usr1 (int sig) { rd_kafka_dump(stdout, rk); } static void logger (const rd_kafka_t *rk, int level, const char *fac, const char *buf) { struct timeval tv; gettimeofday(&tv, NULL); fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", (int)tv.tv_sec, (int)(tv.tv_usec / 1000), level, fac, rk ? rd_kafka_name(rk) : NULL, buf); } static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) { if (rkmessage->err) { if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { fprintf(stderr, "%% Consumer reached end of %s [%d] message queue at offset %ld\n", rd_kafka_topic_name(rkmessage->rkt), rkmessage->partition, rkmessage->offset); run--; return; } return; } fprintf(stderr, "offset=%d, len=%d, content=%s\n", rkmessage->offset, rkmessage->len, rkmessage->payload); } int main (int argc, char **argv) { rd_kafka_topic_t *rkt; char *brokers = "192.168.1.103:9092"; char mode = 'C'; char *topic = "rtb"; int partition = 1; int opt; int parti; //迴圈partition rd_kafka_conf_t *conf; rd_kafka_topic_conf_t *topic_conf; char errstr[512]; int64_t start_offset = 0; char tmp[16]; int64_t seek_offset = 0; conf = rd_kafka_conf_new(); rd_kafka_conf_set_log_cb(conf, logger); snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); topic_conf = rd_kafka_topic_conf_new(); while ((opt = getopt(argc, argv, "p:s:")) != -1) { switch (opt) { case 'p': partition = strtoll(optarg, NULL, 10); run = partition; break; case 's': seek_offset = strtoll(optarg, NULL, 10); break; default: break; } } signal(SIGINT, stop); signal(SIGUSR1, sig_usr1); //Consumer if (mode == 'C') { /* Create Kafka handle */ if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr); exit(1); } /* Add brokers */ if (rd_kafka_brokers_add(rk, brokers) == 0) { fprintf(stderr, "%% No valid brokers specified\n"); exit(1); } /* Create topic */ rkt = rd_kafka_topic_new(rk, topic, topic_conf); topic_conf = NULL; /* Now owned by topic */ for(parti = 0; parti < partition; parti++) { /* Start consuming */ if (rd_kafka_consume_start(rkt, parti, start_offset) == -1){ rd_kafka_resp_err_t err = rd_kafka_last_error(); fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__INVALID_ARG) fprintf(stderr, "%% Broker based offset storage requires a group.id, add: -X group.id=yourGroup\n"); exit(1); } while (run > 0) { rd_kafka_message_t *rkmessage; rd_kafka_resp_err_t err; if (seek_offset) { err = rd_kafka_seek(rkt, parti, seek_offset, 2000); if (err) printf("Seek failed: %s\n", rd_kafka_err2str(err)); else printf("Seeked to %ld\n", seek_offset); seek_offset = 0; } rd_kafka_poll(rk, 0); rkmessage = rd_kafka_consume(rkt, parti, 1000); if (! rkmessage) /* timeout */ continue; msg_consume(rkmessage, NULL); rd_kafka_message_destroy(rkmessage); } rd_kafka_consume_stop(rkt, parti); while (rd_kafka_outq_len(rk) > 0) rd_kafka_poll(rk, 10); } /* Destroy topic */ rd_kafka_topic_destroy(rkt); /* Destroy handle */ rd_kafka_destroy(rk); } if (topic_conf) rd_kafka_topic_conf_destroy(topic_conf); /* Let background threads clean up and terminate cleanly. */ run = 5; while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) printf("Waiting for librdkafka to decommission\n"); if (run <= 0) rd_kafka_dump(stdout, rk); return 0; }
all:
gcc -g *.c -o consumer -lrdkafka -lpthread -lrt -lssl
clean:
rm -rf consumer