1. 程式人生 > >librdkafak消費者 最小c語言版本

librdkafak消費者 最小c語言版本

這是我自己看的librdkafka的樣例程式碼整理來的,去掉很多修飾的程式碼

#include <ctype.h>
#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <syslog.h>
#include <time.h>
#include <sys/time.h>
#include <getopt.h>
#include <librdkafka/rdkafka.h>

static int run = 1;
static rd_kafka_t *rk;

static void stop(int sig) {
    run = 0;
    fclose(stdin); /* abort fgets() */
}

static void sig_usr1 (int sig) {
    rd_kafka_dump(stdout, rk);
}

static void logger (const rd_kafka_t *rk, int level,
            const char *fac, const char *buf) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
            (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
            level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}

static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            fprintf(stderr, "%% Consumer reached end of %s [%d] message queue at offset %ld\n",
                   rd_kafka_topic_name(rkmessage->rkt),
                   rkmessage->partition, rkmessage->offset);
            run--;
            return;
        }

        return;
    }

    fprintf(stderr, "offset=%d, len=%d,   content=%s\n", rkmessage->offset, rkmessage->len, rkmessage->payload);
}

int main (int argc, char **argv) {
    rd_kafka_topic_t *rkt;
    char *brokers = "192.168.1.103:9092";
    char mode = 'C';
    char *topic =  "rtb";
    int partition = 1;
    int opt;
    int parti;      //迴圈partition
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    char errstr[512];
    int64_t start_offset = 0;
    char tmp[16];
    int64_t seek_offset = 0;

    conf = rd_kafka_conf_new();
    rd_kafka_conf_set_log_cb(conf, logger);
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
    topic_conf = rd_kafka_topic_conf_new();
    while ((opt = getopt(argc, argv, "p:s:")) != -1) {
        switch (opt) {
        case 'p':
            partition = strtoll(optarg, NULL, 10);
            run = partition;
            break;
        case 's':
            seek_offset = strtoll(optarg, NULL, 10);
            break;
        default:
            break;
        }
    }

    signal(SIGINT, stop);
    signal(SIGUSR1, sig_usr1);

    //Consumer
    if (mode == 'C') {
        /* Create Kafka handle */
        if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)))) {
            fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr);
            exit(1);
        }

        /* Add brokers */
        if (rd_kafka_brokers_add(rk, brokers) == 0) {
            fprintf(stderr, "%% No valid brokers specified\n");
            exit(1);
        }

        /* Create topic */
        rkt = rd_kafka_topic_new(rk, topic, topic_conf);
        topic_conf = NULL; /* Now owned by topic */

        for(parti = 0; parti < partition; parti++)
        {
            /* Start consuming */
            if (rd_kafka_consume_start(rkt, parti, start_offset) == -1){
                rd_kafka_resp_err_t err = rd_kafka_last_error();
                fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_err2str(err));
                if (err == RD_KAFKA_RESP_ERR__INVALID_ARG)
                    fprintf(stderr, "%% Broker based offset storage requires a group.id, add: -X group.id=yourGroup\n");
                exit(1);
            }

            while (run > 0) {
                rd_kafka_message_t *rkmessage;
                rd_kafka_resp_err_t err;

                if (seek_offset) {
                    err = rd_kafka_seek(rkt, parti, seek_offset, 2000);
                    if (err)
                        printf("Seek failed: %s\n", rd_kafka_err2str(err));
                    else
                        printf("Seeked to %ld\n", seek_offset);
                    seek_offset = 0;
                }

                rd_kafka_poll(rk, 0);
                rkmessage = rd_kafka_consume(rkt, parti, 1000);
                if (! rkmessage) /* timeout */
                    continue;

                msg_consume(rkmessage, NULL);
                rd_kafka_message_destroy(rkmessage);
            }
            rd_kafka_consume_stop(rkt, parti);
            while (rd_kafka_outq_len(rk) > 0)
                rd_kafka_poll(rk, 10);
        }

        /* Destroy topic */
        rd_kafka_topic_destroy(rkt);

        /* Destroy handle */
        rd_kafka_destroy(rk);
        }

    if (topic_conf)
        rd_kafka_topic_conf_destroy(topic_conf);

    /* Let background threads clean up and terminate cleanly. */
    run = 5;
    while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
        printf("Waiting for librdkafka to decommission\n");
    if (run <= 0)
        rd_kafka_dump(stdout, rk);

    return 0;
}


Makefile檔案
all:
	gcc -g *.c -o consumer -lrdkafka -lpthread -lrt -lssl

clean:
	rm -rf consumer