1. 程式人生 > >php RabbitMQ訊息佇列

php RabbitMQ訊息佇列

背景:在更新一張表資料的同時通知其他服務作出相應的更改(考慮到效能問題,不即時操作),查閱資料後,選擇了rabbitMQ訊息佇列實現。在更新資料表時將更新相關資訊放到訊息佇列中,然後在其他服務執行的時候從訊息佇列中取出相關的資訊,然後處理。

RabbitMQ是一個開源的基於AMQP(Advanced Message Queuing Protocol)標準,並且可靠性高的企業級訊息系統,目前很多網站在用,包括reddit,Poppen.de等。

Java程式碼  收藏程式碼
  1. 1. 安裝RabbitMQ  
  2. sudo apt-get install rabbitmq-server  
  3. sudo /etc/init.d/rabbitmq-server start  
Java程式碼  收藏程式碼
  1. 2. 安裝librabbitmq  
  2. sudo apt-get install mercurial  
  3. hg clone http://hg.rabbitmq.com/rabbitmq-c  
  4. cd rabbitmq-c  
  5. hg clone http://hg.rabbitmq.com/rabbitmq-codegen codegen
  6. 紅色字型部分已過期不可用,此時rabbitmq-c檔案目錄下面只有一個readme檔案說明新地址,瀏覽器開啟新地址
  7. 找到新的下載地址:
  8. 下載後將包解壓拷貝rabbitmq-c-0.4.1裡面的檔案到rabbitmq-c檔案目錄下
  9. autoreconf -i && ./configure && make && sudo make install  
  10. 3. 安裝php-rabbit擴充套件  
  11. wget http://php-rabbit.googlecode.com/files/php-rabbit.r91.tar.gz  
  12. tar -zxvf php-rabbit.r91.tar.gz  
  13. cd php-rabbit.r91
  14. 紅色字型部分已過期不可用,命令視窗會一直重複連線
  15. 換成下面的命令安裝amqp
  16. wget http://pecl.php.net/get/amqp-1.0.4.tgz
    tar zxf amqp-1.0.4.tgz && cd amqp-1.0.4

    /usr/local/php5/bin/phpize   (/usr/local/php5  php所在路徑
    )

    ./configure --with-php-config=/usr/local/php5/bin/php-config --with-amqp
    make && make install
  17. 4.編輯 php.ini 新增:  
  18. extension=amqp.so  
  19. 輸出phpinfo看下是否擴充套件已經載入成功,have fun:)  

 下面是get方式獲取訊息佇列內的資訊php程式碼(用consume回撥方式的時候,回撥函式一直連線保持,沒解決.

<?php
class RabbitMqModel extends BaseModel
{
    private $conn;        //rabbitMq伺服器連線
    private $channel;     //rabbitMq虛擬主機
    private $ex;          //rabbitMq交換機
    
    /**
     * 建立佇列需要用到的虛擬主機、交換機
     */
    public function createRabbitMqSer()
    {
        $conn_args = array('host' => '172.18.107.66', 'port' => '5672', 'login' => 'guest',
                           'password' => 'guest','vhost'=>'/');
        $this->conn = new AMQPConnection($conn_args);
        if ($this->conn->connect()) {
            echo "Established a connection to the broker <br />";
        }
        else {
            echo "Cannot connect to the broker <br /> ";
        }

        //建立channel
        $this->channel = new AMQPChannel($this->conn);
        //建立exchange交換機
        $this->ex = new AMQPExchange($this->channel);
        $this->ex->setName('exchange');//建立名字
        $this->ex->setType(AMQP_EX_TYPE_DIRECT);
        $this->ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
        $this->ex->declare();
    }
    
    /**
     * 
     * 向佇列裡面新增資訊
     */
    public function setRabbitMqInfo()
    {
        $this->createRabbitMqSer();
        
        //建立佇列
        $q = new AMQPQueue($this->channel);
        //設定佇列名字 如果不存在則新增
        $q->setName('queue');
        $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
        echo "queue status: ".$q->declare();
        
        echo "<br />";
        echo 'queue bind: '.$q->bind('exchange','route.key');//將你的佇列繫結到routingKey
        //echo 'queue bind: '.$q->bind('exchange','route.'.$timestr);//將你的佇列繫結到routingKey
        echo "<br />";

        //釋出訊息
        $message = json_encode(array('Hello World!','php','c++'));
        $this->channel->startTransaction();
        echo "send: ".$this->ex->publish($message, 'route.key'); //將你的訊息通過制定routingKey傳送
        //echo "send: ".$ex->publish($message, 'route.'.$timestr); //將你的訊息通過制定routingKey傳送
        $this->channel->commitTransaction();
        $this->conn->disconnect();
    }
    
    /**
     * 
     * 獲取佇列裡的資訊
     */
    public function getRabbitMqInfo ()
    {
        $this->createRabbitMqSer();
        
        //建立佇列
        $q = new AMQPQueue($this->channel);
        //設定佇列名字 如果不存在則新增
        $q->setName('queue');
        $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
        echo "queue status: ".$q->declare();
        
        echo "<br />";
        echo 'queue bind: '.$q->bind('exchange','route.key');//將你的佇列繫結到routingKey
        
        // $q->consume(array($this,'processMessage'));   //需手動應答
        while($q->declare()){
            $messages = $q->get(AMQP_AUTOACK) ;
            if ($messages){
                echo "<pre>";
                print_r(json_decode($messages->getBody(),true))."\n";
            }
        }
        
        // disconnect
        $this->conn->disconnect();
    }
}

參考:
http://www.rabbitmq.com/install.html
http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/
http://code.google.com/p/php-rabbit/