1. 程式人生 > 實用技巧 >寶塔中極速安裝的PHP如何使用AMQP連線RabbitMQ

寶塔中極速安裝的PHP如何使用AMQP連線RabbitMQ

前言:

有些人為了讓專案快速上線,伺服器往往安裝寶塔面板,然後再極速安裝LNMP。儘管環境搭建的時間省了,但是寶塔上PHP中擴充套件包沒有提供AMQP。這時候只是為了使用訊息佇列而對PHP大動干戈,不如使用一個PHP AMQP的庫,即用即裝,不對環境造成影響。

簡介:

php-amqplib客戶端庫,通過composer安裝,不需要在PHP中安裝擴充套件,以下為兩種不同的安裝方式。

1. 專案中新建composer.json,新增如下程式碼,然後composer install

{
    "require": {
        "php-amqplib/php-amqplib": " 2.6.*"
    }
}

2. 命令進入到專案,然後 composer require php-amqplib/php-amqplib 2.6.*

RabbitMQ設定:

1. 進入web管控臺,新增新使用者,角色管理員,任何IP上都可以登入,授權指定虛擬機器。

2. 新增交換機

3. 新增佇列並與互動機繫結。

編碼:

1. 封裝rabbitMQ類。

<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class RabbitMQ.
 */
class RabbitMQ
{
    const READ_LINE_NUMBER = 0;
    const READ_LENGTH      = 1;
    const READ_DATA        = 2;

    public $config;

    public static $prefix   = 'autoinc_key:';
    protected $exchangeName = 'flow';
    protected $queueName    = 'flow_queue';

    /**
     * @var \PhpAmqpLib\Connection\AMQPStreamConnection
     */
    protected $connection;
    /**
     * @var \PhpAmqpLib\Channel\AMQPChannel
     */
    protected $channel;
    protected $queue;
	
    //配置項
    private $host;
    private $port;
    private $user;
    private $pass;
    private $vhost;

    public function __construct($config = [])
    {
        //$this->config = $config;

        //設定rabbitmq配置值
        $this->host  = '192.168.1.101';
        $this->port  = 5672;
        $this->user  = 'beiqiaosu';
        $this->pass  = 'beiqiaosu';
        $this->vhost = 'report';

        $this->connect();
    }

    public function __call($method, $args = [])
    {
        $reConnect = false;
        while (1) {
            try {
                $this->initChannel();
                $result = call_user_func_array([$this->channel, $method], $args);
            } catch (\Exception $e) {
                //已重連過,仍然報錯
                if ($reConnect) {
                    throw $e;
                }

                \Swoole::$php->log->error(__CLASS__ . ' [' . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ', Code=' . $e->getCode() . "), RabbitMQ->{$method}, Params=" . var_export($args, 1));
                if ($this->connection) {
                    $this->close();
                }

                $this->connect();

                $reConnect = true;
                continue;
            }

            return $result;
        }
        //不可能到這裡
        return false;
    }

    /**
     * 連線rabbitmq訊息佇列.
     *
     * @return bool
     */
    public function connect()
    {
        try {
            if ($this->connection) {
                unset($this->connection);
            }
            $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
        } catch (\Exception $e) {
			echo __CLASS__ ."Swoole RabbitMQ Exception'".$e->getMessage();
            return false;
        }
    }

    /**
     * 關閉連線.
     */
    public function close()
    {
        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 設定交換機名稱.
     *
     * @param string $exchangeName
     */
    public function setExchangeName($exchangeName = '')
    {
        $exchangeName && $this->exchangeName = $exchangeName;
    }

    /**
     * 設定佇列名稱.
     *
     * @param string $queueName
     */
    public function setQueueName($queueName = '')
    {
        $queueName && $this->queueName = $queueName;
    }

    /**
     * 設定頻道.
     */
    public function initChannel()
    {
        if (!$this->channel) {
            //通道
            $this->channel = $this->connection->channel();
            $this->channel->queue_declare($this->queueName, false, true, false, false);
            $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
            $this->channel->queue_bind($this->queueName, $this->exchangeName);
        }
    }

    /**
     * 獲取佇列資料.
     *
     * @return mixed
     */
    public function pop()
    {
        while (1) {
            try {
                $this->connect();
                $this->initChannel();
                $message = $this->channel->basic_get($this->queueName);
				
                if ($message) {
                    $this->channel->basic_ack($message->delivery_info['delivery_tag']);
                    $result = $message->body;
                } else {
                    throw new \Exception('Empty Queue Data');
                }
            } catch (\Exception $e) {
                //\Swoole::$php->log->error(__CLASS__ . " [" . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ", Code=" . $e->getCode() . ")");
                sleep(1);
                continue;
            }

            return $result;
        }
		
        //不可能到這裡
        return false;
    }

    /**
     * 插入佇列資料.
     *
     * @param $data
     *
     * @return bool
     */
    public function push($data)
    {	
        while (1) {		
            try {
                $this->connect();
                $this->initChannel();
                $message = new AMQPMessage($data, ['content_type'=>'text/plain', 'devlivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
                $this->channel->basic_publish($message, $this->exchangeName);
            } catch (\Exception $e) {
				echo "$e->getMessage()";
                continue;
			}

            return true;
        }
		
        //不可能到這裡
        return false;
    }
}

2. 操作mq,出隊,入隊。

<?php

require_once "vendor/autoload.php";
require_once "component/RabbitMQ.php";

$mq = new RabbitMQ();

// 訊息消費測試
/*try {
	$res = $mq->pop();
	
}catch(\Exception $e) {
	
	var_dump($e->getMessage());die;
}*/


// 訊息生產測試
try {
	$res = $mq->push(json_encode(['name'=>'beiqiaosu','order_id'=>'2020070115261425155']));
	
}catch(\Exception $e) {
	
	var_dump($e->getMessage());die;
}


var_dump($res);die;

測試:

1. 先通過生產訊息(入隊)方法執行一下,然後進入佇列中get message檢視訊息總數。

2. 測試呼叫消費,再檢視總數。