rabbitmq php 學習
阿新 • • 發佈:2017-12-06
php.ini 發布消息 tables nal coff sed bsp hosts 無法
參考文檔:
http://www.cnblogs.com/phpinfo/p/4104551...
http://blog.csdn.net/historyasamirror/ar...
依賴包安裝
yum install ncurses-devel unixODBC unixODBC-devel
erlang環境
wget http://erlang.org/download/otp_src_18.1.tar.gz
tar -zxvf otp_src_18.1.tar.gz
cd otp_src_18.1
./configure --prefix=/usr/local/erlang
make
make install
# 配置erlang環境變量
vim /etc/profile
# 增加內容:
export PATH="$PATH:/usr/local/erlang/bin"
# 保存退出,並刷新變量
source /etc/profile
# 測試erlang是否安裝成功
# 安裝完成以後,執行erl看是否能打開eshell,用’halt().’退出,註意後面的點號,那是erlang的結束符。
[root@localhost src]# erl
Erlang/OTP 17 [erts-6.1] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V6.1 (abort with ^G)
2> 9+3.
12
3> halt().
安裝rabbitmq依賴文件,安裝rabbitmq
# 安裝rabbitmq依賴包
yum install xmlto
# 安裝rabbitmq服務端
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.7/rabbitmq-server-3.5.7.tar.gz
tar zxvf rabbitmq-server-3.5.7.tar.gz
cd rabbitmq-server-3.5.7/
make
make install TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc
# 配置hosts
vim /etc/hosts
# 增加一行內容
# 當前IP地址 綁定HOSTNAME名稱(vim /etc/sysconfig/network)
192.168.2.208 localhost.localdomain
# 這種會提示錯誤(Warning: PID file not written; -detached was passed.)
/usr/local/rabbitmq/sbin/rabbitmq-server -detached 啟動rabbitmq
/usr/local/rabbitmq/sbin/rabbitmqctl status 查看狀態
/usr/local/rabbitmq/sbin/rabbitmqctl stop 關閉rabbitmq
# 目前我自己使用
/usr/local/rabbitmq/sbin/rabbitmq-server start & 啟動rabbitmq
/usr/local/rabbitmq/sbin/rabbitmqctl status 查看狀態
/usr/local/rabbitmq/sbin/rabbitmqctl stop 關閉rabbitmq
啟用管理插件
mkdir /etc/rabbitmq
/usr/local/rabbitmq/sbin/rabbitmq-plugins list 查看插件列表
/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management (啟用插件)
/usr/local/rabbitmq/sbin/rabbitmq-plugins disable rabbitmq_management (禁用插件)
# 重啟rabbitmq
# 訪問 http://127.0.0.1:15672/
# 如果有iptables
vim /etc/sysconfig/iptables
# 增加一下內容
-A INPUT -m state --state NEW -m tcp -p tcp --dport 15672 -j ACCEPT
# 重啟動iptable
service iptables restart
開機自啟動配置
#!/bin/sh
#start rabbitMq
sudo /usr/local/rabbitmq/sbin/rabbitmq-server & > /usr/local/rabbitmq/logs/rabbitmq.log 2>&1
RabbitMQ PHP擴展安裝
# 安裝rabbitmq-c依賴包
yum install libtool autoconf
# 安裝rabbitmq-c ( 最好下載 0.5的,0.6安裝可能會報錯)
# 版本下載:https://github.com/alanxz/rabbitmq-c/releases/tag/v0.5.0
wget https://github.com/alanxz/rabbitmq-c/releases/download/v0.5.0/rabbitmq-c-0.5.0.tar.gz
tar -zxvf v0.5.0
cd rabbitmq-c-0.5.0/
autoreconf -i
./configure --prefix=/usr/local/rabbitmq-c
make
make install
# 安裝PHP擴展 amqp
wget http://pecl.php.net/get/amqp-1.6.1.tgz
tar zxvf amqp-1.6.1.tgz
cd amqp-1.6.1
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c
make
make install
# 編輯php.ini文件,增加amqp擴展支持
vim /usr/local/php/etc/php.ini
# 增加下面內容
; rabbitmq擴展支持
extension=amqp.so
# 重啟php-fpm
/etc/init.d/php-fpm restart
驗證是否成功 phpinfo()查看下是否支持amqp擴展
相關配置
hostname mq // 設置hostname名稱
vim /etc/sysconfig/network // 設置hostname
vim /etc/hosts // 編輯hosts
./rabbitmqctl add_user admin admin // 添加用戶
./rabbitmqctl set_user_tags admin administrator // 添加admin 到 administrator分組
./rabbitmqctl set_permissions -p / admin "*." "*." "*." // 添加權限
創建配置文件
#在/usr/rabbitmq/sbin/rabbitmq-defaults 查看config文件路徑
# 創建配置文件
touch/usr/rabbitmq/sbin
#vm_memory_high_watermark 內存低水位線,若低於該水位線,則開啟流控機制,阻止所有請求,默認值是0.4,即內存總量的40%,
#vm_memory_high_watermark_paging_ratio 內存低水位線的多少百分比開始通過寫入磁盤文件來釋放內存
vi /usr/rabbitmq/sbin/rabbitmq.config 輸入
[
{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75},
{vm_memory_high_watermark, 0.7}]}
].
創建環境文件
touch /etc/rabbitmq/rabbitmq-env.conf
#輸入
RABBITMQ_NODENAME=FZTEC-240088 節點名稱
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 監聽IP
RABBITMQ_NODE_PORT=5672 監聽端口
RABBITMQ_LOG_BASE=/data/rabbitmq/log 日誌目錄
RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 插件目錄
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 後端存儲目錄
操作命令
查看exchange信息
/usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete arguments
查看隊列信息
/usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me
查看綁定信息
/usr/rabbitmq/sbin/rabbitmqctl list_bindings
查看連接信息
/usr/rabbitmq/sbin/rabbitmqctl list_connections
php的server端腳本
<?php
$routingkey=‘key‘;
//設置你的連接
$conn_args = array(‘host‘ => ‘localhost‘, ‘port‘ => ‘5672‘, ‘login‘ => ‘guest‘, ‘password‘ => ‘guest‘);
$conn = new AMQPConnection($conn_args);
if ($conn->connect()) {
echo "Established a connection to the broker \n";
}
else {
echo "Cannot connect to the broker \n ";
}
//你的消息
$message = json_encode(array(‘Hello World3!‘,‘php3‘,‘c++3:‘));
//創建channel
$channel = new AMQPChannel($conn);
//創建exchange
$ex = new AMQPExchange($channel);
$ex->setName(‘exchange‘);//創建名字
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
//$ex->setFlags(AMQP_AUTODELETE);
//echo "exchange status:".$ex->declare();
echo "exchange status:".$ex->declareExchange();
echo "\n";
for($i=0;$i<100;$i++){
if($routingkey==‘key2‘){
$routingkey=‘key‘;
}else{
$routingkey=‘key2‘;
}
$ex->publish($message,$routingkey);
}
/*
$ex->publish($message,$routingkey);
創建隊列
$q = new AMQPQueue($channel);
設置隊列名字 如果不存在則添加
$q->setName(‘queue‘);
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
echo "queue status: ".$q->declare();
echo "\n";
echo ‘queue bind: ‘.$q->bind(‘exchange‘,‘route.key‘);
將你的隊列綁定到routingKey
echo "\n";
$channel->startTransaction();
echo "send: ".$ex->publish($message, ‘route.key‘); //將你的消息通過制定routingKey發送
$channel->commitTransaction();
$conn->disconnect();
*/
php客戶端腳本
<?php
$bindingkey=‘key2‘;
//連接RabbitMQ
$conn_args = array( ‘host‘=>‘127.0.0.1‘ , ‘port‘=> ‘5672‘, ‘login‘=>‘guest‘ , ‘password‘=> ‘guest‘,‘vhost‘ =>‘/‘);
$conn = new AMQPConnection($conn_args);
$conn->connect();
//設置queue名稱,使用exchange,綁定routingkey
$channel = new AMQPChannel($conn);
$q = new AMQPQueue($channel);
$q->setName(‘queue2‘);
$q->setFlags(AMQP_DURABLE);
$q->declare();
$q->bind(‘exchange‘,$bindingkey);
//消息獲取
$messages = $q->get(AMQP_AUTOACK) ;
if ($messages){
var_dump(json_decode($messages->getBody(), true ));
}
$conn->disconnect();
?>
翻譯了部分mq常量設置,不正確的地方,大家以試驗為準
/**
* Passing in this constant as a flag will forcefully disable all other flags.
* Use this if you want to temporarily disable the amqp.auto_ack ini setting.
* 傳遞這個參數作為標誌將完全禁用其他標誌,如果你想臨時禁用amqp.auto_ack設置起效
*/
define(‘AMQP_NOPARAM‘, 0);
/**
* Durable exchanges and queues will survive a broker restart, complete with all of their data.
* 持久化交換機和隊列,當代理重啟動後依然存在,並包括它們中的完整數據
*/
define(‘AMQP_DURABLE‘, 2);
/**
* Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist.
* 被動模式的交換機和隊列不能被重新定義,但是如果交換機和隊列不存在,代理將扔出一個錯誤提示
*/
define(‘AMQP_PASSIVE‘, 4);
/**
* Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue.
* 僅對隊列有效,這個人標誌定義隊列僅允許一個客戶端連接並且從其消費消息
*/
define(‘AMQP_EXCLUSIVE‘, 8);
/**
* For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound
* to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete
* flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no
* subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be
* automatically deleted with the client disconnects.
* 對交換機而言,自動刪除標誌表示交換機將在沒有隊列綁定的情況下被自動刪除,如果從沒有隊列和其綁定過,這個交換機將不會被刪除.
* 對隊列而言,自動刪除標誌表示如果沒有消費者和你綁定的話將被自動刪除,如果從沒有消費者和其綁定,將不被刪除,獨占隊列在客戶斷
* 開連接的時候將總是會被刪除
*/
define(‘AMQP_AUTODELETE‘, 16);
/**
* Clients are not allowed to make specific queue bindings to exchanges defined with this flag.
* 這個標誌標識不允許自定義隊列綁定到交換機上
*/
define(‘AMQP_INTERNAL‘, 32);
/**
* When passed to the consume method for a clustered environment, do not consume from the local node.
* 在集群環境消費方法中傳遞這個參數,表示將不會從本地站點消費消息
*/
define(‘AMQP_NOLOCAL‘, 64);
/**
* When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag,
* the messages will be immediately marked as acknowledged by the server upon delivery.
* 當在隊列get方法中作為標誌傳遞這個參數的時候,消息將在被服務器輸出之前標誌為acknowledged (已收到)
*/
define(‘AMQP_AUTOACK‘, 128);
/**
* Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty.
* 在隊列建立時候傳遞這個參數,這個標誌表示隊列將在為空的時候被刪除
*/
define(‘AMQP_IFEMPTY‘, 256);
/**
* Passed on queue or exchange creation, this flag indicates that the queue or exchange should be
* deleted when no clients are connected to the given queue or exchange.
* 在交換機或者隊列建立的時候傳遞這個參數,這個標誌表示沒有客戶端連接的時候,交換機或者隊列將被刪除
*/
define(‘AMQP_IFUNUSED‘, 512);
/**
* When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned.
* 當發布消息的時候,消息必須被正確路由到一個有效的隊列,否則將返回一個錯誤
*/
define(‘AMQP_MANDATORY‘, 1024);
/**
* When publishing a message, mark this message for immediate processing by the broker. (High priority message.)
* 當發布消息時候,這個消息將被立即處理.
*/
define(‘AMQP_IMMEDIATE‘, 2048);
/**
* If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple
* messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message.
* If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding
* messages.
* 當在調用AMQPQueue::ack時候設置這個標誌,傳遞標簽將被視為最大包含數量,以便通過單個方法標示多個消息為已收到,如果設置為0
* 傳遞標簽指向單個消息,如果設置了AMQP_MULTIPLE,並且傳遞標簽是0,將所有未完成消息標示為已收到
*/
define(‘AMQP_MULTIPLE‘, 4096);
/**
* If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait
* for a reply method. If the server could not complete the method it will raise a channel or connection exception.
* 當在調用AMQPExchange::bind()方法的時候,服務器將不響應請求,客戶端將不應該等待響應,如果服務器無法完成該方法,將會拋出一個異常
*/
define(‘AMQP_NOWAIT‘, 8192);
/**
* If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue.
* 如果在調用AMQPQueue::nack方法時候設置,消息將會被傳遞回隊列
*/
define(‘AMQP_REQUEUE‘, 16384);
/**
* A direct exchange type.
* direct類型交換機
*/
define(‘AMQP_EX_TYPE_DIRECT‘, ‘direct‘);
/**
* A fanout exchange type.
* fanout類型交換機
*/
define(‘AMQP_EX_TYPE_FANOUT‘, ‘fanout‘);
/**
* A topic exchange type.
* topic類型交換機
*/
define(‘AMQP_EX_TYPE_TOPIC‘, ‘topic‘);
/**
* A header exchange type.
* header類型交換機
*/
define(‘AMQP_EX_TYPE_HEADERS‘, ‘headers‘);
/**
* socket連接超時設置
*/
define(‘AMQP_OS_SOCKET_TIMEOUT_ERRNO‘, 536870947);
rabbitmq php 學習