1. 程式人生 > >redis和kafka讀取程式碼

redis和kafka讀取程式碼

kafka讀取程式碼如下所示:

<?php
$conf = new RdKafka\Conf();
//設定消費組
$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);

// 設定offset的儲存為file
$topic = $rk->newTopic("test", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
    //引數1表示消費分割槽,這裡是分割槽0
    //引數2表示同步阻塞多久
    $message = $topic->consume(0, 12 * 1000);
    if (is_null($message)) {
        echo "No more messages\n";
        continue;
    }
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>
redis讀取程式碼如下所示:

<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

for ($i = 0; $i < 20; $i++) {
    //$redis->set("huancai . $i","huancai.$i");
    $value=$redis->get("huancai . $i");
    var_dump($value);
}

?>