php RabbitMQ訊息佇列
阿新 • • 發佈:2018-12-30
背景:在更新一張表資料的同時通知其他服務作出相應的更改(考慮到效能問題,不即時操作),查閱資料後,選擇了rabbitMQ訊息佇列實現。在更新資料表時將更新相關資訊放到訊息佇列中,然後在其他服務執行的時候從訊息佇列中取出相關的資訊,然後處理。
RabbitMQ是一個開源的基於AMQP(Advanced Message Queuing Protocol)標準,並且可靠性高的企業級訊息系統,目前很多網站在用,包括reddit,Poppen.de等。
- 1. 安裝RabbitMQ
- sudo apt-get install rabbitmq-server
-
sudo /etc/init.d/rabbitmq-server start
- 2. 安裝librabbitmq
- sudo apt-get install mercurial
- hg clone http://hg.rabbitmq.com/rabbitmq-c
- cd rabbitmq-c
- hg clone http://hg.rabbitmq.com/rabbitmq-codegen codegen
- 紅色字型部分已過期不可用,此時rabbitmq-c檔案目錄下面只有一個readme檔案說明新地址,瀏覽器開啟新地址
- 找到新的下載地址:
- 下載後將包解壓拷貝rabbitmq-c-0.4.1裡面的檔案到rabbitmq-c檔案目錄下
-
autoreconf -i && ./configure && make && sudo make install
- 3. 安裝php-rabbit擴充套件
- wget http://php-rabbit.googlecode.com/files/php-rabbit.r91.tar.gz
- tar -zxvf php-rabbit.r91.tar.gz
- cd php-rabbit.r91
- 紅色字型部分已過期不可用,命令視窗會一直重複連線
- 換成下面的命令安裝amqp
-
wget http://pecl.php.net/get/amqp-1.0.4.tgz
tar zxf amqp-1.0.4.tgz && cd amqp-1.0.4
/usr/local/php5/bin/phpize (/usr/local/php5 php所在路徑
./configure --with-php-config=/usr/local/php5/bin/php-config --with-amqp
make && make install - 4.編輯 php.ini 新增:
- extension=amqp.so
- 輸出phpinfo看下是否擴充套件已經載入成功,have fun:)
下面是get方式獲取訊息佇列內的資訊php程式碼(用consume回撥方式的時候,回撥函式一直連線保持,沒解決).
<?php
class RabbitMqModel extends BaseModel
{
private $conn; //rabbitMq伺服器連線
private $channel; //rabbitMq虛擬主機
private $ex; //rabbitMq交換機
/**
* 建立佇列需要用到的虛擬主機、交換機
*/
public function createRabbitMqSer()
{
$conn_args = array('host' => '172.18.107.66', 'port' => '5672', 'login' => 'guest',
'password' => 'guest','vhost'=>'/');
$this->conn = new AMQPConnection($conn_args);
if ($this->conn->connect()) {
echo "Established a connection to the broker <br />";
}
else {
echo "Cannot connect to the broker <br /> ";
}
//建立channel
$this->channel = new AMQPChannel($this->conn);
//建立exchange交換機
$this->ex = new AMQPExchange($this->channel);
$this->ex->setName('exchange');//建立名字
$this->ex->setType(AMQP_EX_TYPE_DIRECT);
$this->ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$this->ex->declare();
}
/**
*
* 向佇列裡面新增資訊
*/
public function setRabbitMqInfo()
{
$this->createRabbitMqSer();
//建立佇列
$q = new AMQPQueue($this->channel);
//設定佇列名字 如果不存在則新增
$q->setName('queue');
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$q->declare();
echo "<br />";
echo 'queue bind: '.$q->bind('exchange','route.key');//將你的佇列繫結到routingKey
//echo 'queue bind: '.$q->bind('exchange','route.'.$timestr);//將你的佇列繫結到routingKey
echo "<br />";
//釋出訊息
$message = json_encode(array('Hello World!','php','c++'));
$this->channel->startTransaction();
echo "send: ".$this->ex->publish($message, 'route.key'); //將你的訊息通過制定routingKey傳送
//echo "send: ".$ex->publish($message, 'route.'.$timestr); //將你的訊息通過制定routingKey傳送
$this->channel->commitTransaction();
$this->conn->disconnect();
}
/**
*
* 獲取佇列裡的資訊
*/
public function getRabbitMqInfo ()
{
$this->createRabbitMqSer();
//建立佇列
$q = new AMQPQueue($this->channel);
//設定佇列名字 如果不存在則新增
$q->setName('queue');
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$q->declare();
echo "<br />";
echo 'queue bind: '.$q->bind('exchange','route.key');//將你的佇列繫結到routingKey
// $q->consume(array($this,'processMessage')); //需手動應答
while($q->declare()){
$messages = $q->get(AMQP_AUTOACK) ;
if ($messages){
echo "<pre>";
print_r(json_decode($messages->getBody(),true))."\n";
}
}
// disconnect
$this->conn->disconnect();
}
}
參考:
http://www.rabbitmq.com/install.html
http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/
http://code.google.com/p/php-rabbit/