RabbitMQ訊息佇列在PHP下的應用
訊息佇列的實現中,RabbitMQ以其健壯和可靠見長.公司的專案中選擇了它作為訊息佇列的實現.關於MQ的機制和原理網上有很多文章可以看,這裡就不再贅述,只講幾個比較容易混淆的問題
1,binding key和routing key
binding key和routing key是都不過是自己設定的一組字元,只是用的地方不同,binding key是在繫結交換機和佇列時候通過方法傳遞的字串,routing key是在釋出訊息時候,順便帶上的字串,有些人說這兩個其實是一個東西,也對也不對,說對,是因為這兩個可以完全一樣,說不對,是因為這兩個起的作用不同,一個交換機可以繫結很多佇列,但是每個佇列也許需要的訊息型別不同,binding key就是這個繫結時候留在交換機和佇列之間的提示資訊,當訊息傳送出來後,隨著訊息一起傳送的routing key如果和binding key一樣就說明訊息是這個佇列要的東西,如果不一樣那就不要給這個佇列,交換機你找找下個佇列看看要不要.明白了吧,這兩個key就是暗號,對上了就是自己人,對不上那麻煩你再找找去.
binding key和routing key的配對其實也不是就要完全一樣,還可以'相似'配對,建立交換機的時候,就要告訴MQ,我要宣告的這個交換機和它上面的佇列之間傳輸訊息時候要求routing key和binding key完全一樣,這種模式叫Direct,如果routing key和binding key可以'模糊'匹配,這種模式叫Topic,如果不需要匹配,儘管發,叫Fanout.
2,持久化
交換機和佇列都可以在建立時候設定為持久化,重啟以後會回覆,但是其中的訊息未不會,如果要訊息也恢復,將訊息釋出到交換機的時候,可以指定一個標誌“Delivery Mode”(投遞模式), 1為非持久化,2為持久化.
3,流控機制
當訊息生產的速度更快,而程序的處理能力低時,訊息就會堆積起來,佔用記憶體越來越多,導致MQ崩潰,所以rabbitmq有一個流控機制,當超過限定時候就會阻止接受訊息,mq流控有三種機制
1,主動阻塞住發訊息太快的連線,這個無法調整,如果被阻塞了,在abbitmqctl 控制檯上會顯示一個blocked的狀態。 2,記憶體超過限量,會阻塞連線,在vm_memory_high_watermark可調 3,剩餘磁碟在限定以下mq會 主動阻塞所有的生產者,預設為50m,在disk_free_limit可調. 下面是在centos7上面的,MQ安裝過程.yum install ncurses-devel unixODBC unixODBC-devel2,erlang環境
wget http://www.erlang.org/download/ otp_src_17.3.tar.gz tar zxvf otp_src_17.3.tar.gz cd otp_src_17.3 ./configure --without-javac #忽略警告 make && make install
3,安裝rabbitmq依賴檔案,安裝rabbitmq
yum install xmlto wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.1/rabbitmq-server-3.4.1.tar.gz tar zxvf rabbitmq-server-3.4.1.tar.gz cd rabbitmq-server-3.4.1/ make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc make TARGET_DIR=/usr/rabbitmq SBIN_DIR=/usr/rabbitmq/sbin MAN_DIR=/usr/rabbitmq/man DOC_INSTALL_DIR=/usr/rabbitmq/doc install /usr/rabbitmq/sbin/rabbitmq-server -detached 啟動rabbitmq /usr/rabbitmq/sbin/rabbitmqctl status 檢視狀態 /usr/rabbitmq/sbin/rabbitmqctl stop 關閉rabbitmq
4,啟用管理外掛
mkdir /etc/rabbitmq cd /usr/rabbitmq/sbin ./rabbitmq-plugins enable rabbitmq_management (啟用外掛) ./rabbitmq-plugins disable rabbitmq_management (禁用外掛) # 重啟rabbitmq # 訪問 http://127.0.0.1:15672/ # 如果有iptables # vi /etc/sysconfig/iptables 增加 # -A INPUT -m state --state NEW -m tcp -p tcp --dport 15672 -j ACCEPT # 重啟動iptable systemctl restart iptables.service
5,建立配置檔案
#在/usr/rabbitmq/sbin/rabbitmq-defaults 檢視config檔案路徑 # 建立配置檔案 touch/usr/rabbitmq/sbin #vm_memory_high_watermark 記憶體低水位線,若低於該水位線,則開啟流控機制,阻止所有請求,預設值是0.4,即記憶體總量的40%, #vm_memory_high_watermark_paging_ratio 記憶體低水位線的多少百分比開始通過寫入磁碟檔案來釋放記憶體 vi /usr/rabbitmq/sbin/rabbitmq.config 輸入 [ {rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75}, {vm_memory_high_watermark, 0.7}]} ].
6,建立環境檔案
touch /etc/rabbitmq/rabbitmq-env.conf #輸入 RABBITMQ_NODENAME=FZTEC-240088 節點名稱 RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 監聽IP RABBITMQ_NODE_PORT=5672 監聽埠 RABBITMQ_LOG_BASE=/data/rabbitmq/log 日誌目錄 RABBITMQ_PLUGINS_DIR=/data/rabbitmq/plugins 外掛目錄 RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia 後端儲存目錄
7,安裝php的rabbitmq擴充套件
yum install librabbitmq-devel.x86_64 wget http://pecl.php.net/get/amqp-1.4.0.tgz tar zxvf amqp-1.4.0.tgz cd amqp-1.4.0 /usr/local/php/bin/phpize ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp make && make install vim /usr/local/php/etc/php.ini #輸入 extension=amqp.so service nginx reload service php-fpm restart
操作命令
檢視exchange資訊 /usr/rabbitmq/sbin/rabbitmqctl list_exchanges name type durable auto_delete arguments 檢視佇列資訊 /usr/rabbitmq/sbin/rabbitmqctl list_queues name durable auto_delete messages consumers me 檢視繫結資訊 /usr/rabbitmq/sbin/rabbitmqctl list_bindings 檢視連線資訊 /usr/rabbitmq/sbin/rabbitmqctl list_connections
php的server端指令碼
<?php $routingkey='key'; //設定你的連線 $conn_args = array('host' => 'localhost', 'port' => '5672', 'login' => 'guest', 'password' => 'guest'); $conn = new AMQPConnection($conn_args); if ($conn->connect()) { echo "Established a connection to the broker \n"; } else { echo "Cannot connect to the broker \n "; } //你的訊息 $message = json_encode(array('Hello World3!','php3','c++3:')); //建立channel $channel = new AMQPChannel($conn); //建立exchange $ex = new AMQPExchange($channel); $ex->setName('exchange');//建立名字 $ex->setType(AMQP_EX_TYPE_DIRECT); $ex->setFlags(AMQP_DURABLE); //$ex->setFlags(AMQP_AUTODELETE); //echo "exchange status:".$ex->declare(); echo "exchange status:".$ex->declareExchange(); echo "\n"; for($i=0;$i<100;$i++){ if($routingkey=='key2'){ $routingkey='key'; }else{ $routingkey='key2'; } $ex->publish($message,$routingkey); } /* $ex->publish($message,$routingkey); 建立佇列 $q = new AMQPQueue($channel); 設定佇列名字 如果不存在則新增 $q->setName('queue'); $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); echo "queue status: ".$q->declare(); echo "\n"; echo 'queue bind: '.$q->bind('exchange','route.key'); 將你的佇列繫結到routingKey echo "\n"; $channel->startTransaction(); echo "send: ".$ex->publish($message, 'route.key'); //將你的訊息通過制定routingKey傳送 $channel->commitTransaction(); $conn->disconnect(); */
php客戶端指令碼
<?php $bindingkey='key2'; //連線RabbitMQ $conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'guest' , 'password'=> 'guest','vhost' =>'/'); $conn = new AMQPConnection($conn_args); $conn->connect(); //設定queue名稱,使用exchange,繫結routingkey $channel = new AMQPChannel($conn); $q = new AMQPQueue($channel); $q->setName('queue2'); $q->setFlags(AMQP_DURABLE); $q->declare(); $q->bind('exchange',$bindingkey); //訊息獲取 $messages = $q->get(AMQP_AUTOACK) ; if ($messages){ var_dump(json_decode($messages->getBody(), true )); } $conn->disconnect(); ?>
翻譯了部分mq常量設定,不正確的地方,大家以試驗為準
/** * Passing in this constant as a flag will forcefully disable all other flags. * Use this if you want to temporarily disable the amqp.auto_ack ini setting. * 傳遞這個引數作為標誌將完全禁用其他標誌,如果你想臨時禁用amqp.auto_ack設定起效 */ define('AMQP_NOPARAM', 0); /** * Durable exchanges and queues will survive a broker restart, complete with all of their data. * 持久化交換機和佇列,當代理重啟動後依然存在,幷包括它們中的完整資料 */ define('AMQP_DURABLE', 2); /** * Passive exchanges and queues will not be redeclared, but the broker will throw an error if the exchange or queue does not exist. * 被動模式的交換機和佇列不能被重新定義,但是如果交換機和佇列不存在,代理將扔出一個錯誤提示 */ define('AMQP_PASSIVE', 4); /** * Valid for queues only, this flag indicates that only one client can be listening to and consuming from this queue. * 僅對佇列有效,這個人標誌定義佇列僅允許一個客戶端連線並且從其消費訊息 */ define('AMQP_EXCLUSIVE', 8); /** * For exchanges, the auto delete flag indicates that the exchange will be deleted as soon as no more queues are bound * to it. If no queues were ever bound the exchange, the exchange will never be deleted. For queues, the auto delete * flag indicates that the queue will be deleted as soon as there are no more listeners subscribed to it. If no * subscription has ever been active, the queue will never be deleted. Note: Exclusive queues will always be * automatically deleted with the client disconnects. * 對交換機而言,自動刪除標誌表示交換機將在沒有佇列繫結的情況下被自動刪除,如果從沒有佇列和其繫結過,這個交換機將不會被刪除. * 對佇列而言,自動刪除標誌表示如果沒有消費者和你繫結的話將被自動刪除,如果從沒有消費者和其繫結,將不被刪除,獨佔佇列在客戶斷 * 開連線的時候將總是會被刪除 */ define('AMQP_AUTODELETE', 16); /** * Clients are not allowed to make specific queue bindings to exchanges defined with this flag. * 這個標誌標識不允許自定義佇列繫結到交換機上 */ define('AMQP_INTERNAL', 32); /** * When passed to the consume method for a clustered environment, do not consume from the local node. * 在叢集環境消費方法中傳遞這個引數,表示將不會從本地站點消費訊息 */ define('AMQP_NOLOCAL', 64); /** * When passed to the {@link AMQPQueue::get()} and {@link AMQPQueue::get()} methods as a flag, * the messages will be immediately marked as acknowledged by the server upon delivery. * 當在佇列get方法中作為標誌傳遞這個引數的時候,訊息將在被伺服器輸出之前標誌為acknowledged (已收到) */ define('AMQP_AUTOACK', 128); /** * Passed on queue creation, this flag indicates that the queue should be deleted if it becomes empty. * 在佇列建立時候傳遞這個引數,這個標誌表示佇列將在為空的時候被刪除 */ define('AMQP_IFEMPTY', 256); /** * Passed on queue or exchange creation, this flag indicates that the queue or exchange should be * deleted when no clients are connected to the given queue or exchange. * 在交換機或者佇列建立的時候傳遞這個引數,這個標誌表示沒有客戶端連線的時候,交換機或者佇列將被刪除 */ define('AMQP_IFUNUSED', 512); /** * When publishing a message, the message must be routed to a valid queue. If it is not, an error will be returned. * 當釋出訊息的時候,訊息必須被正確路由到一個有效的佇列,否則將返回一個錯誤 */ define('AMQP_MANDATORY', 1024); /** * When publishing a message, mark this message for immediate processing by the broker. (High priority message.) * 當釋出訊息時候,這個訊息將被立即處理. */ define('AMQP_IMMEDIATE', 2048); /** * If set during a call to {@link AMQPQueue::ack()}, the delivery tag is treated as "up to and including", so that multiple * messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message. * If the AMQP_MULTIPLE flag is set, and the delivery tag is zero, this indicates acknowledgement of all outstanding * messages. * 當在呼叫AMQPQueue::ack時候設定這個標誌,傳遞標籤將被視為最大包含數量,以便通過單個方法標示多個訊息為已收到,如果設定為0 * 傳遞標籤指向單個訊息,如果設定了AMQP_MULTIPLE,並且傳遞標籤是0,將所有未完成訊息標示為已收到 */ define('AMQP_MULTIPLE', 4096); /** * If set during a call to {@link AMQPExchange::bind()}, the server will not respond to the method.The client should not wait * for a reply method. If the server could not complete the method it will raise a channel or connection exception. * 當在呼叫AMQPExchange::bind()方法的時候,伺服器將不響應請求,客戶端將不應該等待響應,如果伺服器無法完成該方法,將會丟擲一個異常 */ define('AMQP_NOWAIT', 8192); /** * If set during a call to {@link AMQPQueue::nack()}, the message will be placed back to the queue. * 如果在呼叫AMQPQueue::nack方法時候設定,訊息將會被傳遞迴佇列 */ define('AMQP_REQUEUE', 16384); /** * A direct exchange type. * direct型別交換機 */ define('AMQP_EX_TYPE_DIRECT', 'direct'); /** * A fanout exchange type. * fanout型別交換機 */ define('AMQP_EX_TYPE_FANOUT', 'fanout'); /** * A topic exchange type. * topic型別交換機 */ define('AMQP_EX_TYPE_TOPIC', 'topic'); /** * A header exchange type. * header型別交換機 */ define('AMQP_EX_TYPE_HEADERS', 'headers'); /** * socket連線超時設定 */ define('AMQP_OS_SOCKET_TIMEOUT_ERRNO', 536870947);