RabbitMQ的安裝與基本使用
阿新 • • 發佈:2017-10-19
表示 消息生產者 routing .cn error 回調 方法名 while bar
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。
二、安裝RabbitMQ
Ubuntu:
sudo apt-get install erlang
sudo apt-get install rabbitmq-server
CentOS:
1.先安裝erlang
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
yum -y install ncurses-devel
yum install ncurses-devel
wget http://www.erlang.org/download/otp_src_17.5.tar.gz
tar -xzvf otp_src_17.5.tar.gz
cd otp_src_17.5
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe
make && make install
配置erlang環境
vi /etc/profile #在最後添加下文
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
使環境變量生效
source /etc/profile
測試一下是否安裝成功,在控制臺輸入命令erl
安裝完後輸入“erl”以下提示即為安裝成功
八:特別說明: 1.如果某一次消費數據沒有ACK,則此條消息會記錄為Unacknowledged,如果對於某一節點有連續三條則RabbitMQ認為此節點有故障,則不會再對它進行分發.當它斷開後或一定時間後Unacknowledged狀態消息會重新放回交換機中。手動no-ack後此Message將會放回隊列中且不會再轉發給此節點。 2.對於計算密集型的工作,我們需要建立多個Consumer,對於多個Consumer,默認的分發機制是“公平分發”,將第n個發給第n個Consumer,超過個數取n的模。 3.Exchange中的類型有:direct, topic 和fanout。 direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息。 topic :所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息,direct的區別是它的routingkey可 以模糊匹配,#代表一個或多個字符,*代表任何字符。一般為確保程序嚴謹性而使用direct。註意當使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout fanout:是廣播模式,所有bind到此exchange的queue都可以接收消息,即該一個Message可以對應多個Consumer,應用場景舉例:生產了了添加到購物車的Message,一個Consumer寫推薦商品日誌,一個Consumer寫購物車表。 4.兩個不相同的queue名的隊列綁定到同一exchange和rotingky上,此時相當於同一queue名的fanout廣播模式,兩個queue都會得都到 所有Message。如圖所示: 5.RPC的實現流程:
運行環境:https://oneinstack.com/install/
在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。如發送短信、郵件、過濾非法關鍵字等等。它還可以用於RPC。
先看一張官方圖:
一、概念: Broker:簡單來說就是消息隊列服務器實體。 Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列。 Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。 Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
[root@cloud bin]# erl Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false] Eshell V5.10.3 (abort with ^G) 1>2.安裝rabbitmq wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1.tar.gz tar -zxvf rabbitmq-server-3.5.1.tar.gz cd rabbitmq-server-3.5.1 make make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install 報錯處理: /bin/sh: xmlto: command not found /bin/sh: line 2: xmlto: command not found 解決:yum install xmlto 三、管理命令 切換到安裝目錄,sbin文件下才能執行命令,如:cd /usr/local/rabbitmq/sbin/ 啟動:./rabbitmq-server start 關閉:./rabbitmqctl stop 狀態:./rabbitmqctl status 四、插件 啟動web管理插件,切換到安裝目錄,sbin文件下才能執行命令,如:cd /usr/local/rabbitmq/sbin/ ./rabbitmq-plugins enable rabbitmq_management 錯誤解決: Error: {cannot_write_enabled_plugins_file,"/etc/rabbitmq/enabled_plugins", enoent} mkdir /etc/rabbitmq 重新啟動輸入地址:localhost:15672,帳號默認為guest,密碼guest,此帳號默認只能在本機訪問。不建議打開遠程訪問。你可以創建一個帳戶,並設置可以遠程訪問的角色進行訪問。 如:./rabbitmqctl add_user luo 123456 ./rabbitmqctl set_user_tags luo administrator 五、用戶管理 默認的guest帳戶相當於root帳戶 rabbitmqctl add_user username password 添加帳戶 rabbitmqctl change_password username newpassword 修改密碼 rabbitmqctl delete_user username 刪除帳戶 rabbitmqctl list_users 列出所有帳戶 rabbitmqctl set_user_tags User Tag 設置角色(administrator、monitoring、policymaker、management、其它) 立即生效,不需重啟 六、安裝PHP擴展(amqp) 不一定非要安裝擴展,你也可以直接下載類庫:https://github.com/videlalvaro/php-amqplib。 https://pecl.php.net/get/amqp-1.3.0.tgz tar -zxvf amqp-1.3.0.tgz cd amqp-1.3.0 /usr/local/php/bin/phpize( 可使用find / -name phpize查找phpize路徑 )
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp
(可使用find / -name php-config查找php-config路徑) 在php.ini中extenstion部分寫入extension=amqp.so 最後phpinfo檢查是否成功 七、使用PHP與之交互 Produce端://設置連接屬性
$connArgs = array(
‘host‘ => ‘localhost‘,
‘port‘ => ‘5672‘,
‘login‘ => ‘guest‘,
‘password‘ => ‘guest‘,
‘vhost‘ => ‘/‘
);
//創建RabbitMQ連接
$conn = new AMQPConnection($connArgs);
if (!$conn->connect()) {
echo "Cannot connect to the broker <br/>\n ";
}
//創建channel
$channel = new AMQPChannel($conn);
//創建exchange
$exchangeName = ‘exchange‘;
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);//創建名字
$exchange->setType(AMQP_EX_TYPE_DIRECT);//設置為direct類型
$exchange->setFlags(AMQP_DURABLE);//持久化交換機,當代理重啟動後依然存在,並包括它們中的完整數據
$exchange->setFlags(AMQP_AUTODELETE);//對交換機而言,自動刪除標誌表示交換機將在沒有隊列綁定的情況下被自動刪除,如果從沒有隊列和其綁定過,這個交換機將不會被刪除.
//發送消息
$routingKey = ‘key_test‘;
for($i=0;$i<10;$i++){
sleep(3);
$message = ‘Hello World ‘.$i;
$exchange->publish($message,$routingKey);
}
//斷開連接
$conn->disconnect();
Consumer端:
//設置連接屬性
$connArgs = array(
‘host‘ => ‘localhost‘,
‘port‘ => ‘5672‘,
‘login‘ => ‘guest‘,
‘password‘ => ‘guest‘,
‘vhost‘ => ‘/‘
);
//創建RabbitMQ連接
$conn = new AMQPConnection($connArgs);
if (!$conn->connect()) {
echo ‘cannot connect to the broker‘;
}
//創建channel
$channel = new AMQPChannel($conn);
//設置隊列名稱
$exchangeName = ‘exchange‘;
$routingKey = ‘key_test‘;
$queueName = ‘queue_test_1‘;
$queue = new AMQPQueue($channel);
$queue->setName($queueName);//設置名稱
$queue->setFlags(AMQP_DURABLE);//持久化隊列,當代理重啟動後依然存在,並包括它們中的完整數據
$queue->declare();//聲明此隊列
$queue->bind(‘exchange‘,$routingKey);//使用某交換機,並綁定某路由關鍵字
//消費數據方式一:非阻塞方式
// while(true)
// {
// sleep(1);
// $envelope = $queue->get(AMQP_AUTOACK) ;//第一個參數表示自動ACK應答
// if ($envelope){
// $messages = $envelope->getBody();
// echo $messages;
// }
// }
//消費數據方式二:以阻塞模式消費數據(推薦)
while(true)
{
$queue->consume(‘processMessage‘);//第一個參數表示要回調的方法名,第二個參數設置為AMQP_AUTOACK,表示自動ACK應答
}
/**
* 定義回調方法
*/
function processMessage($envelope, $queue)
{
$messages = $envelope->getBody();#獲取消息數據
echo $messages;
$queue->ack($envelope->getDeliveryTag()); //處理成功後,手動發送ACK應答
//$queue->nack($envelope->getDeliveryTag()); //處理不成功,手動發送NO-ACK應答,放回隊列中
}
八:特別說明: 1.如果某一次消費數據沒有ACK,則此條消息會記錄為Unacknowledged,如果對於某一節點有連續三條則RabbitMQ認為此節點有故障,則不會再對它進行分發.當它斷開後或一定時間後Unacknowledged狀態消息會重新放回交換機中。手動no-ack後此Message將會放回隊列中且不會再轉發給此節點。 2.對於計算密集型的工作,我們需要建立多個Consumer,對於多個Consumer,默認的分發機制是“公平分發”,將第n個發給第n個Consumer,超過個數取n的模。 3.Exchange中的類型有:direct, topic 和fanout。 direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息。 topic :所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息,direct的區別是它的routingkey可 以模糊匹配,#代表一個或多個字符,*代表任何字符。一般為確保程序嚴謹性而使用direct。註意當使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout fanout:是廣播模式,所有bind到此exchange的queue都可以接收消息,即該一個Message可以對應多個Consumer,應用場景舉例:生產了了添加到購物車的Message,一個Consumer寫推薦商品日誌,一個Consumer寫購物車表。 4.兩個不相同的queue名的隊列綁定到同一exchange和rotingky上,此時相當於同一queue名的fanout廣播模式,兩個queue都會得都到 所有Message。如圖所示: 5.RPC的實現流程:
RabbitMQ的安裝與基本使用