1. 程式人生 > 其它 >Tp5使用Kafka:封裝生產者、消費者操作類

Tp5使用Kafka:封裝生產者、消費者操作類

【相關文章】PHP操作Kafka:php-rdkafka擴充套件的安裝

1、config.php中配置:

//kafka連線配置
'kafka_server' => [
    'host' => '127.0.0.1:9092',
    'topic' => 'topic1',
],

2、建立一個生產者 KafkaProducer.php :

_config = config('kafka_server');
        $this->_rk = new \RdKafka\Producer();
        $this->_rk->setLogLevel(LOG_DEBUG);
        
$this->_rk->addBrokers($this->_config['host']); $this->_topic = $this->_rk->newTopic($this->_config['topic']); } public function add($data){ $this->_topic->produce(RD_KAFKA_PARTITION_UA, 0, $data); $this->_rk->poll(0); while ($this
->_rk->getOutQLen() > 0) { $this->_rk->poll(50); } } }

3、建立一個消費者 KafkaConsumer.php :

_partition = $partition;
        $this->_config = config('kafka_server');

        $conf = new \RdKafka\Conf();
        $conf->set('group.id', $groupId);

        $rk = new
\RdKafka\Consumer($conf); $rk->addBrokers($this->_config['host']); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', sys_get_temp_dir()); $topicConf->set('auto.offset.reset', 'smallest'); $this->_topic = $rk->newTopic($this->_config['topic'], $topicConf); $this->_topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED); } public function run(){ while (true) { $message = $this->_topic->consume($this->_partition, 120*10000); Logger('KafkaConsumer::run::1', [$message]); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $this->exceTask($message->payload); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: //等待接收資訊 error_log("No more messages; will wait for more\n"); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: //超時 error_log("Timed out\n"); break; default: throw new \Exception($message->errstr(), $message->err); break; } } } private function exceTask($jsonData){ error_log($jsonData); $paramsArr = json_decode($jsonData,true); if(!isset($paramsArr['className']) || !isset($paramsArr['funcName'])){ error_log("Param error\n"); } $objUrl = '\\app\\index\\service\\' . $paramsArr['className']; try { $obj = new $objUrl(); return call_user_func([$obj, $paramsArr['funcName']], $jsonData); } catch (\Exception $e) { error_log("Func not found\n"); } } }

4、測試:

  呼叫生產者生產Test.php:

 'TestService',
          'funcName' => 'index',
          'user_id' => 1,
        ];
        KafkaProducer::getInstance()->add(json_encode($data));
    }
}

  消費者消費回撥TestService.php :

啟動消費者KafkaConsumer -> run() 後,呼叫生產者生產 Test.php,此時在消費者消費回撥TestService.php 中可看到如下日誌記錄,則消費者消費成功:

IT成長中的那些事兒