redis和kafka讀取程式碼
kafka讀取程式碼如下所示:
<?php
$conf = new RdKafka\Conf();
//設定消費組
$conf->set('group.id', 'myConsumerGroup');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");
$topicConf = new RdKafka\TopicConf();
$topicConf->set('request.required.acks', 1);
// 設定offset的儲存為file
$topic = $rk->newTopic("test", $topicConf);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
while (true) {
//引數1表示消費分割槽,這裡是分割槽0
//引數2表示同步阻塞多久
$message = $topic->consume(0, 12 * 1000);
if (is_null($message)) {
echo "No more messages\n";
continue;
}
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
redis讀取程式碼如下所示:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
for ($i = 0; $i < 20; $i++) {
//$redis->set("huancai . $i","huancai.$i");
$value=$redis->get("huancai . $i");
var_dump($value);
}
?>