1. 程式人生 > >MQ 學習 3 php 程式碼 操作

MQ 學習 3 php 程式碼 操作

<?php
/**
 * Created by PhpStorm.
 * User: ASUS
 * Date: 2018/10/22
 * Time: 19:51
 */

namespace App\Services;
use App\Models\RfImageAttr;
use App\Tools\ApiTools;
use Kafka\Consumer;
use Kafka\ConsumerConfig;
use Kafka\Producer;
use Kafka\ProducerConfig;
use RdKafka\Conf;
use RdKafka\TopicConf;

class KafkaService
{
    private $brokerList = 'xxx:9092,xxx:9092,xxx:9092';

    public function __construct()
    {
        date_default_timezone_set('PRC');
    }

    /*
     * 生產者 傳送訊息
     *  $config-
     */
    public function producer($topic, $value)
    {
        $config = \Kafka\ProducerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList($this->brokerList);
        $config->setBrokerVersion('0.11.0.0');

        $config->setRequiredAck(1);
        $config->setIsAsyn(false);
        $config->setProduceInterval(500);
        //非同步
        $producer = new \Kafka\Producer(function() use ($value,$topic) {
            return array(
                array(
                    'topic' => $topic,     //注意對應topic
                    'key' => '',
                    'value' => $value,
                ),
            );
        });
        $producer->success(function($result) {
            var_dump($result);
        });
        $producer->error(function($errorCode) {
            \Log::INFO('kafka producer err:'.$errorCode);
        });
        $producer->send(true);

        //同步
//        $producer = new \Kafka\Producer();
//
//        $result = $producer->send(array(
//            array(
//                'topic' => $topic,
//                'value' => $value,
//                'key' => '',
//            ),
//        ));
//        var_dump($result);


//        for($i = 0; $i < 100; $i++) {
//            $result = $producer->send(array(
//                array(
//                    'topic' => $topic,
//                    'value' => $value,
//                    'key' => '',
//                ),
//            ));
//            var_dump($result);
//        }
    }

    /*
     * Consumer
     * 消費 低階
     */
    public function consumer($group,$topicName){

        //方式1
//        $rk = new \RdKafka\Consumer();
//        $rk->setLogLevel(LOG_DEBUG);
//        $rk->addBrokers($this->brokerList);
//
//        $topic = $rk->newTopic($topic);
//        // 引數1消費分割槽0
//        // RD_KAFKA_OFFSET_BEGINNING 重頭開始消費
//        // RD_KAFKA_OFFSET_STORED 最後一條消費的offset記錄開始消費
//        // RD_KAFKA_OFFSET_END 最後一條消費
//        $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//
//        while (true) {
//            // 第一個引數是分割槽號
//            // 第二個引數是超時時間
//            $msg = $topic->consume(0, 1000);
//            if (!empty($msg->err)) {
//                echo $msg->errstr(), "\n";
//                break;
//            } else {
//                echo $msg->payload, "\n";
//            }
//        }
        //方式2

        try {
            $rcf = new \RdKafka\Conf();
            $rcf->set('group.id', $group);
            $cf = new \RdKafka\TopicConf();
            /*
                $cf->set('offset.store.method', 'file');
            */
            $cf->set('auto.offset.reset', 'smallest');
            $cf->set('auto.commit.enable', true);
            $rk = new \RdKafka\Consumer($rcf);
            $rk->setLogLevel(LOG_DEBUG);
            $rk->addBrokers($this->brokerList);
            $topic = $rk->newTopic($topicName, $cf);
            //$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
            while (true) {
                $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
//                    $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
                $msg = $topic->consume(0, 1000);
                if (!empty($msg->err)) {
                    \Log::INFO("kafka customer err : ".$msg->errstr());
                    break;
                } else if(!empty($msg->payload)){
                    $info = $msg->payload;

                    $info = explode('&',$info);

                    if(!empty($info))
                    {
                        $imgInfo = [];
                        foreach ($info as $item)
                        {
                            $itemInfo = explode('=',$item);
                            $imgInfo[$itemInfo[0]] = $itemInfo[1];
                        }
                        //業務處理
                        if(is_array($imgInfo))
                        {
                            $this->parseInfo($topicName,$imgInfo);
                        }else{
                            echo 'is no array';
                        }
                    }


                }
                $topic->consumeStop(0);
                sleep(1);
            }
        } catch (Exception $e) {
            \Log::INFO("kafka customer err : ".$e->getMessage());
        }

        //程式碼 3

//        $config = \Kafka\ConsumerConfig::getInstance();
//        $config->setMetadataRefreshIntervalMs(10000);
//        $config->setMetadataBrokerList($this->brokerList);
//        $config->setGroupId('test5-group');
//        $config->setBrokerVersion('0.11.0.0');
//        $config->setTopics(array('test5'));
////$config->setOffsetReset('earliest');
//        $consumer = new \Kafka\Consumer();
////$consumer->setLogger($logger);
//        $consumer->start(function($topic, $part, $message) {
//            var_dump($message);
//        });

//        try {
//            $rcf = new \RdKafka\Conf();
//            $rcf->set('group.id', 'test5-group');
//            $cf = new \RdKafka\TopicConf();
//            /*
//                $cf->set('offset.store.method', 'file');
//            */
//            $cf->set('auto.offset.reset', 'smallest');
//            $cf->set('auto.commit.enable', true);
//            $rk = new \RdKafka\Consumer($rcf);
//            $rk->setLogLevel(LOG_DEBUG);
//            $rk->addBrokers($this->brokerList);
//            $topic = $rk->newTopic($topic, $cf);
//
//            //$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//            while (true) {
//                //RD_KAFKA_OFFSET_STORED
//                $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
//                $msg = $topic->consume(0, 1000);
//
//
//                if (!empty($msg->err)) {
//                    echo $msg->errstr(), "\n";
//                    break;
//                } else if(!empty($msg->payload)){
//                    echo "====消費資訊===\n";
//                    echo $msg->payload, "\n";
//                }
//                $topic->consumeStop(0);
//                sleep(1);
//            }
//        } catch (Exception $e) {
//            echo $e->getMessage();
//        }
    }

    private function parseInfo($topic,$info)
    {
        switch ($topic)
        {
            case 'image':

                $imgModel = RfImageAttr::getInstance();
                $imgModel->updateParseInfo($info);

                break;
        }
    }
}