1. 程式人生 > >PHP(Mysql/Redis)消息隊列的介紹及應用場景案例--轉載

PHP(Mysql/Redis)消息隊列的介紹及應用場景案例--轉載

接收 根據 好友 學習 range nod 存取 情況下 ble

鄭重提示:本博客轉載自好友博客,個人覺得寫的很牛逼所以未經同意強行轉載,原博客連接 http://www.cnblogs.com/wt645631686/p/8243438.html 歡迎訪問

在進行網站設計的時候,有時候會遇到給用戶大量發送短信,或者訂單系統有大量的日誌需要記錄,還有做秒殺設計的時候,服務器無法承受這種瞬間的壓力,無法正常處理,咱們怎麽才能保證系統正常有效的運行呢?這時候我們就要引用消息隊列來實現這類的需求,這時候就需要一個中間的系統進行分流和解壓。消息隊列就是一個中間件,需要配合其他合理使用。

消息隊列的概念、原理和場景

本質上講,消息隊列結構就是一個隊列結構的中間件
,也就是說把消息和內容放入這個容器之後,就可以直接的返回,不需要等它後期處理的結果,另外會有一個程序會讀取這些數據,並按照順序進行逐個的處理,也就是說按照並發非常大的一個環節的時候,同時呢你又不需要立即獲得這個環節的返回結果,那麽使用消息隊列可以比較好的解決這個問題。一個經典的消息隊列結果應該是這樣的過程:
由一個業務系統進行入隊,把消息逐個插入消息隊列中,插入成功之後直接返回成功的結果,然後後續有一個消息處理系統,這個系統會把消息隊列中的記錄逐個進行取出並且進行處理,進行出隊的操作。
消息系統適合的場景
冗余
首先數據需要冗余的時候,比如經常做訂單系統,後續需要嚴格的轉換和記錄,消息隊列可以把這些數據持久化存儲在隊列中,然後由訂單處理程序進行獲取,後續處理完成之後再把這條記錄刪除,保證每條記錄都能處理完成。
解耦
消息隊列分離了兩套系統,解決了兩套系統深度耦合的問題。使用消息隊列後,入隊的系統和出隊的系統沒有直接的關系,入隊系統和出隊系統其中一套系統崩潰的時候,都不會影響到另一個系統的正常運轉。
流量削峰
這種場景最經典的就是秒殺搶購,這種情況會出現很大的流量劇增,大量的需求集中在短短的幾秒內,對服務器的瞬間壓力非常大,而我們配合緩存使用消息隊列能非常有效的頂住瞬間訪問量,防止服務器頂不住而崩潰。
異步通訊
消息本身可以使入隊的系統直接返回,所以實現了程序的異步操作,因此只要適合於異步的場景都可以使用消息隊列來實現。
擴展性
比如訂單入隊之後或許會有財務系統進行處理,但是後期我想加配貨系統,我只需要讓配貨系統訂閱消息隊列就可以了,這樣就很容易擴展。
排序保證
這種情況指的是在有些場景下數據處理順序是非常重要的,這種情況非常適合隊列處理,因為隊列本身就可以做成單線程的單進單出的系統,從而保證數據按照順序進行處理。

常見消息隊列實現優缺點
隊列介質有哪些?
Mysql:可靠性高、易實現,速度慢,比如表就可以。
Redis:速度快,單條大消息包時效率低。redis提供了list,適合做消息隊列,但是redis有一個問題,消息包過大的時候,效率就慢了,一般單條內容都不大
消息系統:專業性強、可靠,但學習成本高,如RabbitMQ

消息處理三種觸發機制
死循環方式讀取處理:讓一個死循環的程序不斷地讀取一個隊列,並且進行後期處理,這種方式失效性是比較強的,因為這種程序不斷地掃描消息隊列,因此消息隊列裏一旦有數據,就可以進行後續處理。但是這樣會造成服務器壓力,最關鍵的是也不會知道程序什麽時候會掛掉,一旦出現故障,沒辦法及時恢復,這種情況比較適合做秒殺,因為秒殺的時間點比較集中,一旦有秒殺可以立即處理。
定時任務:每隔幾秒或者幾分鐘執行一次,這樣做的最大好處就是把壓力分開了,無論入隊的系統在哪個時間點入隊的峰值是多麽不平均,但由於出隊的系統是定時執行的,所以會把壓力均攤,每個時間點的壓力會差不太多,所以還是比較流行的,尤其是訂單系統和物流配貨系統這類的,如訂單系統會把寫入隊列,用戶就可以看到我的訂單在等物流配貨了,這樣物流系統就會定時把訂單進行匯總處理,這樣壓力就不會太大,唯一的缺點就是定時和間隔和數量要把握好,不要
等上一個定時任務沒有執行完呢,下一個定時任務又開始了,這樣容易出現不可預測的問題。
守護進程:類似於PHP-FPM和PHP-CGI進程,需要linux的shell基礎。

解耦案例:隊列處理訂單系統和配送系統
在網購的時候,提交訂單之後,看到自己的訂單貨物在配送中,這樣就參與進來一個系統是配送系統,如果我們在做架構的時候,把訂單系統和配送系統設計到一起,就會出現問題。首先對於訂單系統來說,訂單系統處理壓力較大,對於配送系統來說沒必要對這些壓力做及時反映,我們沒必要在訂單系統出現問題的情況下,同時配送系統出現問題,這時候就會同時影響兩個系統的運轉,我們可以解耦解決。這兩個系統分開之後,我們可以通過一個隊列表來實現兩個系統的溝通。首先,訂單系統會接收用戶的訂單,進行訂單的處理,會把這些訂單寫到隊列表中,這個隊列表是溝通兩個系統的關鍵,由配送系統中的定時執行的程序來讀取隊列表進行處理,配送系統處理之後,會把已經處理的記錄進行標記,這就是流程。
具體細節設計如下(Mysql隊列舉例):

技術分享圖片

首先,由order.php的文件接收用戶的訂單,然後生成訂單號並對訂單進行處理,訂單系統處理完成之後會把配送系統需要的數據增加到隊列表中,隊列表可以這麽設計,大概六個字段,order_id(訂單主鍵id),status(訂單狀態),mobile(用戶手機號),address(收獲地址),created_at(創建的時間),updated_at(後期配送系統處理完成時間),然後有一個定時腳本,每分鐘啟動配送處理程序,配送處理程序這個goods.php用來處理隊列表中的數據,當處理完成之後,會把隊列表中的字段狀態改為處理完成,這樣整個流程結束。

創建訂單表

技術分享圖片
CREATE TABLE `order_queue` (  
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT ‘訂單的id號‘,  
  `order_id` int(11) NOT NULL,  
  `mobile` varchar(20) NOT NULL COMMENT ‘用戶的手機號‘,  
  `address` varchar(100) NOT NULL COMMENT ‘用戶的地址‘,  
  `created_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘訂單創建的時間‘,  
  `updated_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘物流系統處理完成的時間‘,  
  `status` tinyint(2) NOT NULL COMMENT ‘當前狀態,0 未處理,1 已處理,2處理中‘,  
  PRIMARY KEY (`id`)  
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;  
技術分享圖片

處理訂單的order.php文件

技術分享圖片
<?php   
include ‘class/db.php‘;  
  
if(!empty($_GET[‘mobile‘])){  
    $order_id = rand(10000,99999);  
    $insert_data = array(  
        ‘order_id‘=>$order_id,  
        ‘mobile‘=>$_GET[‘mobile‘],      //記得過濾
        ‘created_at‘=>date(‘Y-m-d H:i:s‘,time()),  
        ‘order_id‘=>$order_id,  
        ‘status‘=>0,    //0,未處理狀態
    );  
    $db = DB::getIntance();  
//把數據放入隊列表中
    $res = $db->insert(‘order_queue‘,$insert_data);  
    if($res){  
        echo $insert_data[‘order_id‘]."保存成功";  
    }else{  
        echo "保存失敗";  
    }  
}else{  
    echo "1";  
}  
?> 
技術分享圖片

配送系統處理訂單的文件goods.php

技術分享圖片
<?php   
//配送系統處理訂單並進行標記
include ‘class/db.php‘;  
$db = DB::getIntance();  
//1:先要把要處理的數據狀態改為待處理
$waiting = array(‘status‘=>0,);  
$lock = array(‘status‘=>2,);  
$res_lock = $db->update(‘order_queue‘,$lock,$waiting,2);  
//2:選擇出剛剛更新的數據,然後進行配送系統處理
if($res_lock){  
    //選擇出要處理的訂單內容
    $res = $db->selectAll(‘order_queue‘,$lock);  
     //然後由配貨系統進行處理.....等操作
    //3:把處理過的改為已處理狀態
    $success = array(  
        ‘status‘=>1,  
        ‘updated_at‘=>date(‘Y-m-d H;i:s‘,time()),    
    );  
    $res_last = $db->update(‘order_queue‘,$success,$lock);  
    if($res_last){  
       echo "Success:".$res_last;   
    }else{  
        echo "Fail:".$res_last;  
    }  
}else{  
    echo "ALL Finished";  
}  
?>  
技術分享圖片

定時執行腳本的goods.sh,每分鐘執行一次

#!/bin/bash  
date "+%G-%m-%d %H:%M:%S"    //當前年月日
cd /data/wwwroot/default/mq/  
php goods.php  

然後crontab任務定時執行腳本,並創建日誌文件,然後指定輸出格式

*/1 * * * * /data/wwwroot/default/mq/good.sh >> /data/wwwroot/default/mq/log.log 2>&1 //指定腳本目錄並格式化輸出//當然要創建log.log文件
tail -f log.log  //監控日誌

這樣訂單系統和配送系統是相互獨立的,並不影響另一個系統的正常運行。

再舉一個關於Mysql消息隊列的例子,以發送短信為例:

技術分享圖片
<?php
$db = new Db();
$sms = new Sms();
while(true){
    $item = $db->getFirstRecord(); //獲取數據表第一條記錄
    if(!$item){
        //如果隊列中沒有數據,則結束定時器
        break;
    }
    $res = $sms->send($item[‘phone‘],$item[‘content‘]); //發送短信
    if($res){
        $db->deleteFristRecord(); //刪除發送成功的記錄
        echo $item[‘phone‘].‘發送成功‘;
    }else{
        echo $item[‘phone‘].‘發送失敗,稍後繼續嘗試‘;
    }
    sleep(10); //每隔十秒循環一次            
}
 
echo ‘發送完畢!‘;
?>
技術分享圖片

將代碼保存為timer_sms.php,打開命令行,執行定時器:

php定時器將會根據設定的時間間隔(這裏設的是10秒),自動完成發送短信的任務。任務完成後將自動退出定時器,不再占用服務器資源。

根據我的測試,PHP定時器占用資源並不多,不會對服務器造成壓力。而且是異步訪問數據庫,也不會影響數據庫的運行。

這種方式的優點是:

1、後臺運行,前臺無需等待

2、成功率高,失敗的記錄會自動重發,直到成功

流量削峰案例:Redis的List類型實現秒殺

為什麽要使用Redis而不適用Mysql呢?因為Redis是基於內存,速度要快很多,而Mysql需要往硬盤裏寫,因為其他業務還要使用Mysql,如果秒殺使用Mysql的話,會把Mysql的資源耗光,這樣其他的業務在讀取Mysql肯定出問題。另外Redis對數據有一個持久化作用,這樣要比Memcache要有優勢,並且數據類型要多,這次要用的就是Redis的List,可以向頭部或者尾部向Redis的鏈表增加元素,這樣Redis在實現一個輕量級的隊列非常有優勢。
LPUSH/LPUSHX:LPUSH是將值插入到鏈表的頭部,LPUSHX是檢測這個鏈表是否存在,如果存在的話會插入頭部,如果不存在會忽略這個數據
RPUSH/RPUSHX:將值插入到鏈表的尾部。同上,位置相反
LPOP:移除並獲取鏈表中的第一個元素。
RPOP:移除並獲取鏈表中最後一個元素。
LTRIM:保留指定區間內的元素。
LLEN:獲取鏈表的長度。
LSET:用索引設置鏈表元素的值。
LINDEX:通過索引獲取鏈表中的元素。
LRANGE:獲取鏈表指定範圍內的元素。

技術分享圖片

【秒殺業務程序】記錄哪個用戶參與了秒殺,同時記錄時間,這樣方便後續處理,用戶的ID會存儲到【Redis】的鏈表裏進行排隊,比如打算讓前10個人秒殺成功,後面的人秒殺失敗,這樣讓redis鏈表的長度保持為10就可以了,10個以後如果再到redis請求追加數據,那麽程序上拒絕請求,在redis存取之後,後面的程序會對redis進行取值,因為數據不能長久放在緩存,後面有一個程序遍歷處理redis的值,放入數據庫永久保存,因為秒殺本來不會太長,可以用腳本循環掃描。
詳細說明:
首先Redis程序會把用戶的請求數據放入redis,主要是uid和微秒時間戳;然後檢查redis鏈表的長度,超出長度就放棄處理;死循環數據讀取redis鏈表的內容,入庫。

數據庫代碼:

技術分享圖片
 CREATE TABLE `redis_queue` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `uid` int(11) NOT NULL DEFAULT ‘0‘,
  `time_stamp` varchar(24) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=88 DEFAULT CHARSET=utf8;
技術分享圖片

另外兩個程序,一個是把用戶的請求接收寫入redis的程序,另一個是把redis的數據拿出寫入數據庫的程序。

接收用戶請求的程序:

技術分享圖片
<?php

$redis = new Redis();
$redis->connect(‘127.0.0.1‘,6379);
$redis-_name = ‘miaosha‘;


//秒殺用戶湧入模擬,100個用戶
for ($i =0; $i < 100; $i++) {
    $uid = rand(1000000,99999999);
}
//檢查redis鏈表長度(已存在數量)
$num = 10;
if ($redis->lLen($redis_name) < 10 ) {
    //加入鏈表尾部
    $redis->rPush($redis_name, $uid.‘%‘.microtime());
} else {  //如果達到10個
    //秒殺結束
}
$redis->close();
技術分享圖片

處理程序(拿redis數據寫入mysql)

技術分享圖片
<?php
//從隊列頭部讀一個值,判斷這個值是否存在,如果存在則切割出時間、uid保存到數據庫中。(對於redis而言,如果從redis取出這個值,那麽這個值就不在redis隊列裏了,如果出現問題失敗了,那麽我們需要有一個機制把失敗的數據重新放入redis鏈表中)
$redis = new Redis();
$redis->connect(‘127.0.0.1‘,6379);
$redis-_name = ‘miaosha‘;

//死循環檢測redis隊列
while(1) {
    $user = $redis->lpop($redis_name);
    if (!$user || $user == ‘null‘) {  //如果沒有數據跳出循環
        //如果一直執行,速度是非常快的,那麽服務器壓力大,這裏2秒一次
        sleep(2);
        //跳出循環
        continue;
    } 
    //拿出微秒時間戳和uid
    $user_arr = explode(‘%‘, $user);
    $insert_data = array(
        ‘uid‘ => $user_arr[0];
        ‘time_stamp‘ => $user_arr[1];
    );
    $res = $db->insert(‘redis_queue‘, $insert_data);
    //如果插入失敗
    if (!$res) {
        //從哪個方向取出,從哪個方向插回
        $redis->lpush($redis_name, $user);
        sleep(2);
    }
} 
$redis->close();
技術分享圖片

測試的話,可以先執行循環檢測腳本,然後再執行秒殺腳本開始測試,監測Mysql數據庫的變化。

RabbitMQ:更專業額消息系統實現方案

可以看本人博客當前欄目了解

http://www.cnblogs.com/wt645631686/category/1171220.html

PHP(Mysql/Redis)消息隊列的介紹及應用場景案例--轉載