PHP(Mysql/Redis)消息隊列的介紹及應用場景案例--轉載
鄭重提示:本博客轉載自好友博客,個人覺得寫的很牛逼所以未經同意強行轉載,原博客連接 http://www.cnblogs.com/wt645631686/p/8243438.html 歡迎訪問
在進行網站設計的時候,有時候會遇到給用戶大量發送短信,或者訂單系統有大量的日誌需要記錄,還有做秒殺設計的時候,服務器無法承受這種瞬間的壓力,無法正常處理,咱們怎麽才能保證系統正常有效的運行呢?這時候我們就要引用消息隊列來實現這類的需求,這時候就需要一個中間的系統進行分流和解壓。消息隊列就是一個中間件,需要配合其他合理使用。
消息隊列的概念、原理和場景
本質上講,消息隊列結構就是一個隊列結構的中間件
,也就是說把消息和內容放入這個容器之後,就可以直接的返回,不需要等它後期處理的結果,另外會有一個程序會讀取這些數據,並按照順序進行逐個的處理,也就是說按照並發非常大的一個環節的時候,同時呢你又不需要立即獲得這個環節的返回結果,那麽使用消息隊列可以比較好的解決這個問題。一個經典的消息隊列結果應該是這樣的過程:
由一個業務系統進行入隊,把消息逐個插入消息隊列中,插入成功之後直接返回成功的結果,然後後續有一個消息處理系統,這個系統會把消息隊列中的記錄逐個進行取出並且進行處理,進行出隊的操作。
消息系統適合的場景
冗余
首先數據需要冗余的時候,比如經常做訂單系統,後續需要嚴格的轉換和記錄,消息隊列可以把這些數據持久化存儲在隊列中,然後由訂單處理程序進行獲取,後續處理完成之後再把這條記錄刪除,保證每條記錄都能處理完成。
解耦
消息隊列分離了兩套系統,解決了兩套系統深度耦合的問題。使用消息隊列後,入隊的系統和出隊的系統沒有直接的關系,入隊系統和出隊系統其中一套系統崩潰的時候,都不會影響到另一個系統的正常運轉。
流量削峰
這種場景最經典的就是秒殺搶購,這種情況會出現很大的流量劇增,大量的需求集中在短短的幾秒內,對服務器的瞬間壓力非常大,而我們配合緩存使用消息隊列能非常有效的頂住瞬間訪問量,防止服務器頂不住而崩潰。
異步通訊
消息本身可以使入隊的系統直接返回,所以實現了程序的異步操作,因此只要適合於異步的場景都可以使用消息隊列來實現。
擴展性
比如訂單入隊之後或許會有財務系統進行處理,但是後期我想加配貨系統,我只需要讓配貨系統訂閱消息隊列就可以了,這樣就很容易擴展。
排序保證
這種情況指的是在有些場景下數據處理順序是非常重要的,這種情況非常適合隊列處理,因為隊列本身就可以做成單線程的單進單出的系統,從而保證數據按照順序進行處理。
常見消息隊列實現優缺點
隊列介質有哪些?
Mysql:可靠性高、易實現,速度慢,比如表就可以。
Redis:速度快,單條大消息包時效率低。redis提供了list,適合做消息隊列,但是redis有一個問題,消息包過大的時候,效率就慢了,一般單條內容都不大
消息系統:專業性強、可靠,但學習成本高,如RabbitMQ
消息處理三種觸發機制
死循環方式讀取處理:讓一個死循環的程序不斷地讀取一個隊列,並且進行後期處理,這種方式失效性是比較強的,因為這種程序不斷地掃描消息隊列,因此消息隊列裏一旦有數據,就可以進行後續處理。但是這樣會造成服務器壓力,最關鍵的是也不會知道程序什麽時候會掛掉,一旦出現故障,沒辦法及時恢復,這種情況比較適合做秒殺,因為秒殺的時間點比較集中,一旦有秒殺可以立即處理。
定時任務:每隔幾秒或者幾分鐘執行一次,這樣做的最大好處就是把壓力分開了,無論入隊的系統在哪個時間點入隊的峰值是多麽不平均,但由於出隊的系統是定時執行的,所以會把壓力均攤,每個時間點的壓力會差不太多,所以還是比較流行的,尤其是訂單系統和物流配貨系統這類的,如訂單系統會把寫入隊列,用戶就可以看到我的訂單在等物流配貨了,這樣物流系統就會定時把訂單進行匯總處理,這樣壓力就不會太大,唯一的缺點就是定時和間隔和數量要把握好,不要
等上一個定時任務沒有執行完呢,下一個定時任務又開始了,這樣容易出現不可預測的問題。
守護進程:類似於PHP-FPM和PHP-CGI進程,需要linux的shell基礎。
解耦案例:隊列處理訂單系統和配送系統
在網購的時候,提交訂單之後,看到自己的訂單貨物在配送中,這樣就參與進來一個系統是配送系統,如果我們在做架構的時候,把訂單系統和配送系統設計到一起,就會出現問題。首先對於訂單系統來說,訂單系統處理壓力較大,對於配送系統來說沒必要對這些壓力做及時反映,我們沒必要在訂單系統出現問題的情況下,同時配送系統出現問題,這時候就會同時影響兩個系統的運轉,我們可以解耦解決。這兩個系統分開之後,我們可以通過一個隊列表來實現兩個系統的溝通。首先,訂單系統會接收用戶的訂單,進行訂單的處理,會把這些訂單寫到隊列表中,這個隊列表是溝通兩個系統的關鍵,由配送系統中的定時執行的程序來讀取隊列表進行處理,配送系統處理之後,會把已經處理的記錄進行標記,這就是流程。
具體細節設計如下(Mysql隊列舉例):
首先,由order.php的文件接收用戶的訂單,然後生成訂單號並對訂單進行處理,訂單系統處理完成之後會把配送系統需要的數據增加到隊列表中,隊列表可以這麽設計,大概六個字段,order_id(訂單主鍵id),status(訂單狀態),mobile(用戶手機號),address(收獲地址),created_at(創建的時間),updated_at(後期配送系統處理完成時間),然後有一個定時腳本,每分鐘啟動配送處理程序,配送處理程序這個goods.php用來處理隊列表中的數據,當處理完成之後,會把隊列表中的字段狀態改為處理完成,這樣整個流程結束。
創建訂單表
CREATE TABLE `order_queue` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT ‘訂單的id號‘, `order_id` int(11) NOT NULL, `mobile` varchar(20) NOT NULL COMMENT ‘用戶的手機號‘, `address` varchar(100) NOT NULL COMMENT ‘用戶的地址‘, `created_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘訂單創建的時間‘, `updated_at` datetime NOT NULL DEFAULT ‘0000-00-00 00:00:00‘ COMMENT ‘物流系統處理完成的時間‘, `status` tinyint(2) NOT NULL COMMENT ‘當前狀態,0 未處理,1 已處理,2處理中‘, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
處理訂單的order.php文件
<?php include ‘class/db.php‘; if(!empty($_GET[‘mobile‘])){ $order_id = rand(10000,99999); $insert_data = array( ‘order_id‘=>$order_id, ‘mobile‘=>$_GET[‘mobile‘], //記得過濾 ‘created_at‘=>date(‘Y-m-d H:i:s‘,time()), ‘order_id‘=>$order_id, ‘status‘=>0, //0,未處理狀態 ); $db = DB::getIntance(); //把數據放入隊列表中 $res = $db->insert(‘order_queue‘,$insert_data); if($res){ echo $insert_data[‘order_id‘]."保存成功"; }else{ echo "保存失敗"; } }else{ echo "1"; } ?>
配送系統處理訂單的文件goods.php
<?php //配送系統處理訂單並進行標記 include ‘class/db.php‘; $db = DB::getIntance(); //1:先要把要處理的數據狀態改為待處理 $waiting = array(‘status‘=>0,); $lock = array(‘status‘=>2,); $res_lock = $db->update(‘order_queue‘,$lock,$waiting,2); //2:選擇出剛剛更新的數據,然後進行配送系統處理 if($res_lock){ //選擇出要處理的訂單內容 $res = $db->selectAll(‘order_queue‘,$lock); //然後由配貨系統進行處理.....等操作 //3:把處理過的改為已處理狀態 $success = array( ‘status‘=>1, ‘updated_at‘=>date(‘Y-m-d H;i:s‘,time()), ); $res_last = $db->update(‘order_queue‘,$success,$lock); if($res_last){ echo "Success:".$res_last; }else{ echo "Fail:".$res_last; } }else{ echo "ALL Finished"; } ?>
定時執行腳本的goods.sh,每分鐘執行一次
#!/bin/bash date "+%G-%m-%d %H:%M:%S" //當前年月日 cd /data/wwwroot/default/mq/ php goods.php
然後crontab任務定時執行腳本,並創建日誌文件,然後指定輸出格式
*/1 * * * * /data/wwwroot/default/mq/good.sh >> /data/wwwroot/default/mq/log.log 2>&1 //指定腳本目錄並格式化輸出//當然要創建log.log文件
tail -f log.log //監控日誌
這樣訂單系統和配送系統是相互獨立的,並不影響另一個系統的正常運行。
再舉一個關於Mysql消息隊列的例子,以發送短信為例:
<?php $db = new Db(); $sms = new Sms(); while(true){ $item = $db->getFirstRecord(); //獲取數據表第一條記錄 if(!$item){ //如果隊列中沒有數據,則結束定時器 break; } $res = $sms->send($item[‘phone‘],$item[‘content‘]); //發送短信 if($res){ $db->deleteFristRecord(); //刪除發送成功的記錄 echo $item[‘phone‘].‘發送成功‘; }else{ echo $item[‘phone‘].‘發送失敗,稍後繼續嘗試‘; } sleep(10); //每隔十秒循環一次 } echo ‘發送完畢!‘; ?>
將代碼保存為timer_sms.php,打開命令行,執行定時器:
php定時器將會根據設定的時間間隔(這裏設的是10秒),自動完成發送短信的任務。任務完成後將自動退出定時器,不再占用服務器資源。
根據我的測試,PHP定時器占用資源並不多,不會對服務器造成壓力。而且是異步訪問數據庫,也不會影響數據庫的運行。
這種方式的優點是:
1、後臺運行,前臺無需等待
2、成功率高,失敗的記錄會自動重發,直到成功
流量削峰案例:Redis的List類型實現秒殺
為什麽要使用Redis而不適用Mysql呢?因為Redis是基於內存,速度要快很多,而Mysql需要往硬盤裏寫,因為其他業務還要使用Mysql,如果秒殺使用Mysql的話,會把Mysql的資源耗光,這樣其他的業務在讀取Mysql肯定出問題。另外Redis對數據有一個持久化作用,這樣要比Memcache要有優勢,並且數據類型要多,這次要用的就是Redis的List,可以向頭部或者尾部向Redis的鏈表增加元素,這樣Redis在實現一個輕量級的隊列非常有優勢。
LPUSH/LPUSHX:LPUSH是將值插入到鏈表的頭部,LPUSHX是檢測這個鏈表是否存在,如果存在的話會插入頭部,如果不存在會忽略這個數據
RPUSH/RPUSHX:將值插入到鏈表的尾部。同上,位置相反
LPOP:移除並獲取鏈表中的第一個元素。
RPOP:移除並獲取鏈表中最後一個元素。
LTRIM:保留指定區間內的元素。
LLEN:獲取鏈表的長度。
LSET:用索引設置鏈表元素的值。
LINDEX:通過索引獲取鏈表中的元素。
LRANGE:獲取鏈表指定範圍內的元素。
【秒殺業務程序】記錄哪個用戶參與了秒殺,同時記錄時間,這樣方便後續處理,用戶的ID會存儲到【Redis】的鏈表裏進行排隊,比如打算讓前10個人秒殺成功,後面的人秒殺失敗,這樣讓redis鏈表的長度保持為10就可以了,10個以後如果再到redis請求追加數據,那麽程序上拒絕請求,在redis存取之後,後面的程序會對redis進行取值,因為數據不能長久放在緩存,後面有一個程序遍歷處理redis的值,放入數據庫永久保存,因為秒殺本來不會太長,可以用腳本循環掃描。
詳細說明:
首先Redis程序會把用戶的請求數據放入redis,主要是uid和微秒時間戳;然後檢查redis鏈表的長度,超出長度就放棄處理;死循環數據讀取redis鏈表的內容,入庫。
數據庫代碼:
CREATE TABLE `redis_queue` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `uid` int(11) NOT NULL DEFAULT ‘0‘, `time_stamp` varchar(24) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=88 DEFAULT CHARSET=utf8;
另外兩個程序,一個是把用戶的請求接收寫入redis的程序,另一個是把redis的數據拿出寫入數據庫的程序。
接收用戶請求的程序:
<?php $redis = new Redis(); $redis->connect(‘127.0.0.1‘,6379); $redis-_name = ‘miaosha‘; //秒殺用戶湧入模擬,100個用戶 for ($i =0; $i < 100; $i++) { $uid = rand(1000000,99999999); } //檢查redis鏈表長度(已存在數量) $num = 10; if ($redis->lLen($redis_name) < 10 ) { //加入鏈表尾部 $redis->rPush($redis_name, $uid.‘%‘.microtime()); } else { //如果達到10個 //秒殺結束 } $redis->close();
處理程序(拿redis數據寫入mysql)
<?php //從隊列頭部讀一個值,判斷這個值是否存在,如果存在則切割出時間、uid保存到數據庫中。(對於redis而言,如果從redis取出這個值,那麽這個值就不在redis隊列裏了,如果出現問題失敗了,那麽我們需要有一個機制把失敗的數據重新放入redis鏈表中) $redis = new Redis(); $redis->connect(‘127.0.0.1‘,6379); $redis-_name = ‘miaosha‘; //死循環檢測redis隊列 while(1) { $user = $redis->lpop($redis_name); if (!$user || $user == ‘null‘) { //如果沒有數據跳出循環 //如果一直執行,速度是非常快的,那麽服務器壓力大,這裏2秒一次 sleep(2); //跳出循環 continue; } //拿出微秒時間戳和uid $user_arr = explode(‘%‘, $user); $insert_data = array( ‘uid‘ => $user_arr[0]; ‘time_stamp‘ => $user_arr[1]; ); $res = $db->insert(‘redis_queue‘, $insert_data); //如果插入失敗 if (!$res) { //從哪個方向取出,從哪個方向插回 $redis->lpush($redis_name, $user); sleep(2); } } $redis->close();
測試的話,可以先執行循環檢測腳本,然後再執行秒殺腳本開始測試,監測Mysql數據庫的變化。
RabbitMQ:更專業額消息系統實現方案
可以看本人博客當前欄目了解
http://www.cnblogs.com/wt645631686/category/1171220.html
PHP(Mysql/Redis)消息隊列的介紹及應用場景案例--轉載