快速搭建基於beanstalk的php訊息佇列服務
本專案實現基於beanstalk的php訊息佇列服務,包括生產與消費訊息案例
<?php /** * 訊息生產/接收類 * @example * // 生產單條訊息,goods管道組 * $mq = new MessageQueue(); * $mq->product('goods', 111111); * // 生產多條訊息 * $mq->product('goods', array(111111, 111112)); * * // 訊息佇列監聽處理指令碼,goods管道組,solr管道 * <?php * $mq = new MessageQueue('solr'); * $mq->watch('goods', function($message) { * $goods_id = intval($message); * // 以下為具體業務處理邏輯 * // * // ... * // 返回true表示已處理完畢,伺服器將刪除該條訊息 * return true; * }); */ require_once dirname(__DIR__).'/BeanstalkClient.php'; class MessageQueue { // 訂閱者ID private $_clientID = null; // 訂閱者清單 private $_subscribers = array(); // beanstalkd連線配置資訊 private $_beanstalkdConfig = array(); /** * beanstalk client * @var BeanstalkClient */ private $_beanstalk = null; /** * 初始化訊息客戶端 * @param string $clientID 分配給訊息接受端的ID標識 */ public function __construct($clientID = null) { $this->_clientID = $clientID; $this->_setConfig(); } /** * 生產訊息, 對管道內的所有事件推送訊息 * @param string $queue 佇列名 -- 管道組 * @param [string|array] $messages 訊息內容,多條使用陣列 */ public function product($queue, $messages) { try { if(!isset($this->_subscribers[$queue])) { throw new Exception('queue of "'.$queue.'" havn\'t configured, ' .'go '.__DIR__.'/../config/params.php and configure it'); } $beanstalk = $this->getBeanstalkClient(); if(!is_array($messages)) { $messages = array($messages); } foreach($this->_subscribers[$queue] as $clientID) { $beanstalk->useTube($queue.'.'.$clientID); foreach($messages as $message) { if(strlen($message)){ $beanstalk->put(11, 0, 60, $message); $this->_log('product', $queue, $clientID, $message); } } } } catch (Exception $e) { throw new Exception($e->getMessage()); } return true; } /** * 根據佇列名和事件名投遞訊息, 只對指定管道和事件推送訊息 * @param [string] $queue [佇列名] * @param [string] $event [事件名] * @param [string] $message [訊息內容] * @param [int] $delay [延時時間] * @return [void] */ public function product_conf($queue, $event, $message, $delay = 0) { $beanstalk = $this->getBeanstalkClient(); $beanstalk->useTube("{$queue}.{$event}"); $beanstalk->put(11, $delay, 60, $message); } /** * 監聽佇列並處理訊息 * @param string $queue 訂閱的佇列名 * @param function $callback 回撥方法(訊息處理函式,會將訊息內容作為引數給$callback) */ public function watch($queue, $callback) { try { if(!$this->_checkQueueExist($queue, $this->_clientID)) { $this->_log('checkQueue', $queue, $this->_clientID, ''); throw new Exception($this->_clientID.' is not allow to access this queue'); } if(!is_object($callback)) { $this->_log('isObject', $queue, $this->_clientID,''); throw new Exception('param of callback is not a function'); } $this->_beanstalkdConfig['persistent'] = false; $beanstalk = new BeanstalkClient($this->_beanstalkdConfig); $beanstalk->connect(); $beanstalk->watch($queue.'.'.$this->_clientID); $retry = 0; for(;;) { $job = $beanstalk->reserve(); if($job) { $result = $callback($job['body']); //處理任務 if(true === $result) { $beanstalk->delete($job['id']); $this->_log('consume', $queue, $this->_clientID, $job['body']); }else{ $beanstalk->bury($job['id'],''); $this->_log('bury', $queue, $this->_clientID, $job['body']); } } else { $this->_log('error', $queue, $this->_clientID, $job['body']); // 設定 error_reporting(0) 時watcher指令碼會陷入死迴圈,這裡設定重連 if ($retry++ >= 10) { $retry = 0; $this->_log('error', $queue, $this->_clientID, 'try to reconnect.'); sleep(5); // 等待beanstalkd服務恢復 $beanstalk->connect(); $beanstalk->watch($queue.'.'.$this->_clientID); } } } $beanstalk->disconnect(); } catch (Exception $e) { $this->_log('error', $queue, $this->_clientID,''); throw new Exception($e->getMessage()); } } /** * 初始化配置資訊 */ private function _setConfig() { $config = require dirname(__DIR__).'/config/params.php'; $this->_subscribers = $config['subscribers']; $this->_beanstalkdConfig = $config['beanstalkd']; } /** * 檢查當前客戶端監聽的佇列是否存在 * @param string $queue 佇列名 * @param string $clientID 客戶端ID * @return boolean */ private function _checkQueueExist($queue, $clientID) { return isset($this->_subscribers[$queue]) && in_array($clientID, $this->_subscribers[$queue]); } /** * 獲取beanstalk client * @param array $config 連線配置 * @return BeanstalkClient */ private function getBeanstalkClient() { if (is_null($this->_beanstalk)) { $this->_beanstalk = new BeanstalkClient($this->_beanstalkdConfig); $this->_beanstalk->connect(); } try { // 檢查連線 $this->_beanstalk->stats(); } catch (Exception $e) { // 若出錯則重連 $this->_beanstalk->connect(); } return $this->_beanstalk; } /** * 記錄日誌 * @param string $operation 操作型別 * @param string $queue 佇列名 -- 管道組 * @param string $clientID 客戶端ID -- 管道 * @param string $message 訊息體 */ public function _log($operation, $queue, $clientID, $message, $folder = 'mqlog') { $dir = MQ_LOG_PATH; (file_exists($dir) && is_dir($dir)) || mkdir($dir, 0777, true); $file = $dir.'/'.$queue.'.'.$clientID.'.log'; $mode = (is_file($file) && filesize($file)/1024/1024 < 20) ? "ab+" : "wb"; // 日誌大於20M則清空, 微分銷系統穩定之前先手動清log $fp = fopen($file , $mode); if(flock($fp , LOCK_EX)){ fwrite($fp , '['.date('Y-m-d H:i:s').'] '.$operation.': '.$message.PHP_EOL); flock($fp , LOCK_UN); @chmod($file, 0777); } fclose($fp); } /** * destruct * disconnect */ public function __destruct() { if (!is_null($this->_beanstalk)) { $this->_beanstalk->disconnect(); } } }
相關推薦
Java語言快速實現簡單MQ訊息佇列服務
目錄 MQ基礎回顧 主要角色 自定義協議 流程順序 專案構建流程 具體使用流程 程式碼演示 訊息處理中心 Broker 訊息處理中心服務 BrokerServer 客戶端 MqClient 測試MQ 小結
快速搭建基於beanstalk的php訊息佇列服務
本專案實現基於beanstalk的php訊息佇列服務,包括生產與消費訊息案例<?php /** * 訊息生產/接收類 * @example * // 生產單條訊息,goods管道組 * $mq = new MessageQueue(); * $mq-
滴滴出行基於RocketMQ構建企業級訊息佇列服務的實踐
本文整理自滴滴出行訊息佇列負責人 江海挺 在Apache RocketMQ開發者沙龍北京站的分享。通過本文,您將瞭解到滴滴出行: 在訊息佇列技術選型方面的思考; 為什麼選擇 RocketMQ 作為出行業務的訊息佇列解決方案; 如何構建自己的訊息佇列服務; 在 RocketMQ
HTTPSQS:基於 HTTP協議的輕量級開源簡單訊息佇列服務
HTTPSQS(HTTP Simple Queue Service)是一款基於 HTTP GET/POST 協議的輕量級開源簡單訊息佇列服務,使用 Tokyo Cabinet 的 B+Tree Key/Value 資料庫來做資料的持久化儲存。 專案網址:http://code.google.co
(四)RabbitMQ訊息佇列-服務詳細配置與日常監控管理
RabbitMQ服務管理 啟動服務:rabbitmq-server -detached【 /usr/local/rabbitmq/sbin/rabbitmq-server -detached 】 檢視狀態:rabbitmqctl status 關閉服務:rabbitmqctl stop
在阿里雲伺服器上搭建基於nginx的直播服務
對於沒有接觸過nginx的我,在看了別人搭建的直播服務後心癢癢了,也就照著搭建了一個直播服務,我是在阿里雲伺服器上搭建的,首先來說一下阿里雲伺服器,我買的是一個ECS的雲伺服器,系統是CentOS7 然後用Xshell連線上我的伺服器,發現連線不上,原來是阿里雲伺服器的
騰訊雲+tipask快速搭建基於laravel的CMS網站
一、購買騰訊雲伺服器,服務市場->基礎環境->選擇WordPress平臺映象二、按照tipask教程安裝tipask官方教程地址https://wenda.tipask.com/article/22官方教程對新手不太友好,我整理如下:1.ftp上傳檔案雲伺服器映象
SpringBoot(9) 基於Redis訊息佇列實現非同步操作
什麼是訊息佇列?所謂訊息佇列,就是一個以佇列資料結構為基礎的一個真實存在的實體,如陣列,redis中的佇列集合等等,都可以。為什麼要使用佇列?主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MyS
高可用訊息佇列服務構建-RABBITMQ
Rabbitmq has his own buildin cluster management system. Here, we don’t need Pacemaker, everything is managed by RabbitMQ itself. RabbitMQ or more genera
Spring Boot(二):快速搭建web專案或微服務
上一篇部落格對spring boot的來世今生進行了介紹,這篇就帶領大家快速的建立一個spring boot的web專案或者微服務。 一、.新建專案 1.方法一 (1)自己建立java工程 (2)新建spring boot的application (3
Amazon SQS 訊息佇列服務_訊息佇列mq解決方案
Amazon Simple Queue Service (SQS) 是一種完全託管的訊息佇列服務,可讓您分離和擴充套件微服務、分散式系統和無伺服器應用程式。SQS 消除了與管理和運營訊息型中介軟體相關的複雜性和開銷,並使開發人員能夠專注於重要工作。藉助 SQS,您可以在軟體元件之間傳送、儲
Amazon SQS價格_SQS訊息佇列服務
如果資料傳輸量超出 500TB/月,請聯絡我們。 除非另行說明,否則我們的價格不包含適用的稅費和關稅(包括增值稅和適用銷售稅)。使用日本賬單地址的客戶若要使用 AWS,則需支付日本消費稅。瞭解更多。 資料傳入和傳出是指傳入和傳出 Amazon
NoSQL初探之人人都愛Redis:(3)使用Redis作為訊息佇列服務場景應用案例
一、訊息佇列場景簡介 “訊息”是在兩臺計算機間傳送的資料單位。訊息可以非常簡單,例如只包含文字字串;也可以更復雜,可能包含嵌入物件。訊息被髮送到佇列中,“訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。 在目前廣泛的Web應用中,都會出現一種場景:在某一個時刻,網站會迎來一個使用者請求的高峰期(
使用SpringBoot快速搭建WebSocket實現訊息推送
本文旨在幫助未掌握此技能的小白掃清障礙,快速搭建websocket訊息推送服務,高手請繞行。謝謝! 首先,筆者的寫作背景也是一名剛剛打通websocket訊息推送服務的小白。在連續幾日的蒐集資料下,最終在沒有找到一個完整的解決方案的情況下。摸索出正確的結果,倍
ubuntu搭建基於pptpd的vpn服務遇到619 800等問題
ubuntu搭建vpn伺服器的文章非常多,但是開啟ufw往往連結失敗。 解決方案如下: 我們需要王before ufw scripts新增iptables命令,新增命令有如下兩種方式: 1: iptables -I INPUT -p 47 -m stat
幾種常見的微服務架構方案簡述——ZeroC IceGrid、Spring Cloud、基於訊息佇列
2017-07-26 http://www.broadview.com.cn/article/348 微服務架構是當前很熱門的一個概念,它不是憑空產生的,是技術發展的必然結果。雖然微服務架構沒有公認的技術標準和規範草案,但業界已經有一些很有影響力的開源微服務架構平臺,架構師可以根據公司的技術實力並結合專案
基於Docker搭建分散式訊息佇列Kafka
本文基於Docker搭建一套單節點的Kafka訊息佇列,Kafka依賴Zookeeper為其管理叢集資訊,雖然本例不涉及叢集,但是該有的元件都還是會有,典型的kafka分散式架構如下圖所示。本例搭建的示例包含Zookeeper + Kafka + Kafka-manger mark &
快速搭建訊息佇列的企業框架
RocketMQ 是阿里團隊應用的強大的訊息中介軟體,而 Spring Boot 是目前企業 Java 流行的框架,如何快速應用二者為企業服務是 Java 工程師必修之課。 本場 Chat 首先會帶大家先熟悉 Spring Boot 專案以及搭建,然後整合 RocketMQ 訊息中介軟體實現單機部
linux實訓第二天總結--快速搭建Httpd服務&部署基於Httpd的網路Yum&搭建NFS共享&兩個終端之間”聊天室”
DAY02 案例一-->部署網路yum源 1.0快速搭建Httpd服務 1.1部署基於Httpd的網路Yum 案例1.0-->
.NET Core微服務之基於EasyNetQ使用RabbitMQ訊息佇列
一、訊息佇列與RabbitMQ 1.1 訊息佇列 “訊息”是在兩臺計算機間傳送的資料單位。訊息可以非常簡單,例如只包含文字字串;也可以更復雜,可能包含嵌入物件。訊息被髮送到佇列中,“訊息佇列”是在訊息的傳輸過程中儲存訊息的容器。 訊息佇列(Message Queue),是分散式系統中重要