MQ 學習 3 php 程式碼 操作
阿新 • • 發佈:2018-12-17
<?php /** * Created by PhpStorm. * User: ASUS * Date: 2018/10/22 * Time: 19:51 */ namespace App\Services; use App\Models\RfImageAttr; use App\Tools\ApiTools; use Kafka\Consumer; use Kafka\ConsumerConfig; use Kafka\Producer; use Kafka\ProducerConfig; use RdKafka\Conf; use RdKafka\TopicConf; class KafkaService { private $brokerList = 'xxx:9092,xxx:9092,xxx:9092'; public function __construct() { date_default_timezone_set('PRC'); } /* * 生產者 傳送訊息 * $config- */ public function producer($topic, $value) { $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList($this->brokerList); $config->setBrokerVersion('0.11.0.0'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); //非同步 $producer = new \Kafka\Producer(function() use ($value,$topic) { return array( array( 'topic' => $topic, //注意對應topic 'key' => '', 'value' => $value, ), ); }); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode) { \Log::INFO('kafka producer err:'.$errorCode); }); $producer->send(true); //同步 // $producer = new \Kafka\Producer(); // // $result = $producer->send(array( // array( // 'topic' => $topic, // 'value' => $value, // 'key' => '', // ), // )); // var_dump($result); // for($i = 0; $i < 100; $i++) { // $result = $producer->send(array( // array( // 'topic' => $topic, // 'value' => $value, // 'key' => '', // ), // )); // var_dump($result); // } } /* * Consumer * 消費 低階 */ public function consumer($group,$topicName){ //方式1 // $rk = new \RdKafka\Consumer(); // $rk->setLogLevel(LOG_DEBUG); // $rk->addBrokers($this->brokerList); // // $topic = $rk->newTopic($topic); // // 引數1消費分割槽0 // // RD_KAFKA_OFFSET_BEGINNING 重頭開始消費 // // RD_KAFKA_OFFSET_STORED 最後一條消費的offset記錄開始消費 // // RD_KAFKA_OFFSET_END 最後一條消費 // $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); // // while (true) { // // 第一個引數是分割槽號 // // 第二個引數是超時時間 // $msg = $topic->consume(0, 1000); // if (!empty($msg->err)) { // echo $msg->errstr(), "\n"; // break; // } else { // echo $msg->payload, "\n"; // } // } //方式2 try { $rcf = new \RdKafka\Conf(); $rcf->set('group.id', $group); $cf = new \RdKafka\TopicConf(); /* $cf->set('offset.store.method', 'file'); */ $cf->set('auto.offset.reset', 'smallest'); $cf->set('auto.commit.enable', true); $rk = new \RdKafka\Consumer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers($this->brokerList); $topic = $rk->newTopic($topicName, $cf); //$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); // $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); $msg = $topic->consume(0, 1000); if (!empty($msg->err)) { \Log::INFO("kafka customer err : ".$msg->errstr()); break; } else if(!empty($msg->payload)){ $info = $msg->payload; $info = explode('&',$info); if(!empty($info)) { $imgInfo = []; foreach ($info as $item) { $itemInfo = explode('=',$item); $imgInfo[$itemInfo[0]] = $itemInfo[1]; } //業務處理 if(is_array($imgInfo)) { $this->parseInfo($topicName,$imgInfo); }else{ echo 'is no array'; } } } $topic->consumeStop(0); sleep(1); } } catch (Exception $e) { \Log::INFO("kafka customer err : ".$e->getMessage()); } //程式碼 3 // $config = \Kafka\ConsumerConfig::getInstance(); // $config->setMetadataRefreshIntervalMs(10000); // $config->setMetadataBrokerList($this->brokerList); // $config->setGroupId('test5-group'); // $config->setBrokerVersion('0.11.0.0'); // $config->setTopics(array('test5')); ////$config->setOffsetReset('earliest'); // $consumer = new \Kafka\Consumer(); ////$consumer->setLogger($logger); // $consumer->start(function($topic, $part, $message) { // var_dump($message); // }); // try { // $rcf = new \RdKafka\Conf(); // $rcf->set('group.id', 'test5-group'); // $cf = new \RdKafka\TopicConf(); // /* // $cf->set('offset.store.method', 'file'); // */ // $cf->set('auto.offset.reset', 'smallest'); // $cf->set('auto.commit.enable', true); // $rk = new \RdKafka\Consumer($rcf); // $rk->setLogLevel(LOG_DEBUG); // $rk->addBrokers($this->brokerList); // $topic = $rk->newTopic($topic, $cf); // // //$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); // while (true) { // //RD_KAFKA_OFFSET_STORED // $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); // $msg = $topic->consume(0, 1000); // // // if (!empty($msg->err)) { // echo $msg->errstr(), "\n"; // break; // } else if(!empty($msg->payload)){ // echo "====消費資訊===\n"; // echo $msg->payload, "\n"; // } // $topic->consumeStop(0); // sleep(1); // } // } catch (Exception $e) { // echo $e->getMessage(); // } } private function parseInfo($topic,$info) { switch ($topic) { case 'image': $imgModel = RfImageAttr::getInstance(); $imgModel->updateParseInfo($info); break; } } }