1. 程式人生 > 其它 >訊息佇列(一) MySQL實現訊息佇列

訊息佇列(一) MySQL實現訊息佇列

訊息佇列(一)MySQL實現訊息佇列

(原創內容,轉載請註明來源,謝謝)

一、概述

訊息佇列(MessageQueue,通常簡稱MQ)是一種程序間通訊或同一程序的不同執行緒間的通訊方式,是分散式應用間交換資訊的一種技術。通過訊息佇列,應用程式可獨立地執行,它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程式接收此訊息。

訊息佇列有多種實現方式,可以用關係型資料庫(如Mysql)、Nosql(如redis)、現有框架(如rabbitMQ)等。

Mysql處理訊息佇列的場景:主要是在資料處理量大、耗時久、處理流程繁雜、處理內容多、需要持久化(入庫)、業務處理要求相對不實時的場景,如發郵件、發簡訊、訂單後續處理、操作資料記錄日誌等。

二、場景分析

現假設有一個訂單處理系統,包括使用者支付產生訂單、貨物分配及發貨兩個子流程。現由於訂單產生量大,故如果將產生訂單、貨物分配合並在一起執行,對於使用者而言,將要等待較久的時間才能看到支付成功的頁面,使用者體驗不佳。因此可以將這兩個流程分開。

1)使用者支付產生訂單

在校驗有貨後,將貨物表相應的內容取出到臨時表,取出成功即返回使用者支付成功待發貨。

2)定時輪詢臨時表

編寫指令碼,定時(如10分鐘)輪詢此臨時表,每次取若干條支付成功的資料(如1000條,具體看處理能力及資料量而定,保證處理時間在10分鐘內),進行後續處理。

3)將第二步的資料進行後續處理,如通知物流取貨、通知倉庫出貨,成功後將臨時表的這幾條資料清除(或將狀態置成已發貨)。

根據上述場景,對於有10000個使用者同時購買時,也只需要校驗是否有貨,確認有貨就可以給使用者返回支付成功頁面。而後臺的處理即使100分鐘(上述第二步假設每10分鐘處理1000條),對於使用者而言也只是100分鐘後看到已發貨的狀態,相比於在支付頁面多花1分鐘甚至更多的時間來說,這100分鐘反而並算不了什麼。

因此,此場景就非常適合於用Mysql解決此訊息佇列。

三、程式實現

1)引入資料庫處理檔案DbDealer.php,此檔案在PDO的文章中有詳細的描述實現過程,主要是用PDO實現增刪改查。

2)建立MQ工廠類MessageQueueDealer,通過傳入的型別判斷是哪種方式的MQ(本例使用的是Mysql,但預留了Redis等場景以便後續擴充套件),以及判斷是哪種業務場景(對於本例而言是訂單處理場景)。檔名messagequeue.php

<?php
require_once('service/assembly.php');
class MessageQueueDealer{
         private static $ins;
         private function__construct(){}
         private function__clone(){}
         public static functiongetInstance(){
                   if(null ==self::$ins || !($ins instanceof MessageQueueDealer)){
                            self::$ins= new MessageQueueDealer();
                   }
                   returnself::$ins;
         }
         //mq處理函式type=mysql,redis,..(mq型別)
         public functionmqDealer($type, $condition){
                   switch($type){
                            case'mysql':
                                     $this->mysqlMq($condition);
                                     break;
                            default:
                                     break;
                   }
         }
         //mysql的mqcondition=order,...(業務場景)
         private functionmysqlMq($condition){
                   switch($condition){
                            case'order':
                                     $order= new Order();
                                     $orderInfo= $order->transOrder();
                   }
         }
}       
$mq = MessageQueueDealer::getInstance()->mqDealer('mysql','order');

3)引入服務中間檔案,其用於引用各個處理類,本例引入訂單處理類。檔名assembly.php

         <?php
require_once('order.php');

4)訂單處理類Order,用於定時接處理任務。檔名order.php

         <?php
require_once('dbDealer.php');
//訂單處理類,將支付成功的訂單移入另一個表,並且從本表刪除
class Order{
         publicfunction transOrder(){
                   $db= DbDealer::getInstance()->getConnection();
                   $res= $db->setLimit(2)->setOrderBy(array('asc', 'id'))
                                       ->setSelectColumn(' id,name ')->get_mytable_by_status(3);//每次處理兩條,把status=3的記錄挪走
                   $arrOrder= $res->fetchAll(PDO::FETCH_ASSOC);//內容全部取出成關聯陣列
                   //兩個步驟一起,插入新表以及移除舊錶
                   if(!empty($arrOrder)){
                            $maxId= 0;
                            foreach($arrOrderas $order){
                                     $maxId= $maxId < $order['id'] ? $order['id'] : $maxId;
                            }
                            $db->startTrans();
                            $db->insertBatch('mynewtable',$arrOrder);
                            $db->delete('mytable',array('id' => '<'.$maxId));
                            $db->commitTrans();
                   }                
         }
}

5)編寫crontab指令碼,讓其定時執行messagequeue,以完成訂單轉移。

在linux介面輸入crontab –e,進入vim的編輯介面,輸入:

*/10 * * * * php/usr/share/nginx/html/message_queue/messagequeue.php

四、總結

Mysql實現訊息佇列的方式較為簡單,其在處理非實時的資料時具有較好優勢,因為其存取方便,而非實時情況下也不會有大量的資料庫連線,防止正常業務因為大量的連線而讓資料庫伺服器奔潰。

——written by linhxx 2017.07.26