使用過redis做非同步佇列麼,你是怎麼用的?有什麼缺點?
Redis設計主要是用來做快取的,但是由於它自身的某種特性使得它可以用來做訊息佇列。
它有幾個阻塞式的API可以使用,正是這些阻塞式的API讓其有能力做訊息佇列;
另外,做訊息佇列的其他特性例如FIFO(先入先出)也很容易實現,只需要一個list物件從頭取資料,從尾部塞資料即可;
Redis能做訊息佇列還得益於其list物件blpop brpop介面以及Pub/Sub(釋出/訂閱)的某些介面,它們都是阻塞版的,所以可以用來做訊息佇列。(List : lpush / rpop)
方式一:生產者與消費者模式
使用list結構作為佇列,rpush生產訊息,lpop消費訊息,當lpop沒有訊息的時候,要適當sleep一會再重試。
或者,不用sleep,直接用blpop指令,在沒有訊息的時候,它會阻塞住直到訊息到來。
具體文章可檢視:PHP與redis佇列實現電商訂單自動確認收貨
方式二:釋出訂閱者模式
使用pub/sub主題訂閱者模式,可以實現1:N的訊息佇列。
基於事件的系統中,Pub/Sub是目前廣泛使用的通訊模型,它採用事件作為基本的通訊機制,提供大規模系統所要求的鬆散耦合的互動模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;釋出者(如伺服器)可將訂閱者感興趣的事件隨時通知相關訂閱者。
訊息釋出者,即publish客戶端,無需獨佔連結,你可以在publish訊息的同時,使用同一個redis-client連結進行其他操作(例如:INCR等)
訊息訂閱者,即subscribe客戶端,需要獨佔連結,即進行subscribe期間,redis-client無法穿插其他操作,此時client以阻塞的方式等待“publish端”的訊息;這一點很好理解,因此subscribe端需要使用單獨的連結,甚至需要在額外的執行緒中使用。
redis做非同步佇列的缺點
在消費者下線的情況下,生產的訊息會丟失。此場景,建議用MQ。
使用MQ可檢視:在分散式系統上,你是如何處理資料的一致性的
下面用程式碼實際實現,用釋出訂閱者模式,給大家講解講解
訊息訂閱者subscribe.php
<?php
ini_set('default_socket_timeout',-1);//php配置設定不超時
$redis=newRedis();
$redis->connect("127.0.0.1",6379);
//$redis->setOption(Redis::OPT_READ_TIMEOUT,-1);//redis方式設定不超時,推薦
$redis->subscribe(['chan'],'callback');//callback為回撥函式名稱
//$redis->subscribe(['chan'],array(newTestCall(),'callback'));//如果回撥函式是類中的方法名,這樣寫
//回撥函式,這裡寫處理邏輯
functioncallback($instance,$channelName,$message)
{
echo$channelName,"==>",$message,PHP_EOL;
//$instance,即為上面建立的redis例項物件,在回撥函式中,預設的這個引數就是,因此不需專門傳參。這裡除了SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE這4條命令之外其它命令都不能使用
//如果要使用redis中的其他命令,這樣實現
$newredis=newRedis();
$newredis->connect("127.0.0.1",6379);
echo$newredis->get('test').PHP_EOL;
$newredis->close();
//可以根據$channelName,$message,處理不同的業務邏輯
switch($chan){
case'chan-1':
...
break;
case'chan-2':
...
break;
}
switch($message){
case'msg1':
...
break;
case'msg2':
...
break;
}
}
subscribe.php中設定不超時
方法1:ini_set('default_socket_timeout', -1);方法2:$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
如果不設定不超時,60s後會報一個錯誤
PHPFatalerror:UncaughtRedisException:readerroronconnectionto127.0.0.1:6379insubscribe.php:6
方式一的實現,是通過臨時修改ini的配置值,default_socket_timeout預設為60s,default_socket_timeout是socket流的超時引數,即socket流從建立到傳輸再到關閉整個過程必須要在這個引數設定的時間以內完成,如果不能完成,那麼PHP將自動結束這個socket並返回一個警告。
方式二是通過修改redis的配置項,因此僅對redis連線生效,相對於方式1,不會產生意外的對其他方法的影響。
訊息釋出者publish.php
<?php
$redis=newRedis();
$redis->connect("127.0.0.1",6379);
$redis->publish('chan','thisisamessage');
批量訂閱
redis的psubscribe支援通過模式匹配的方式實現批量訂閱,訂閱方式
$redis->psubscribe(['my*'],'psubscribe'); //回撥函式寫函式名
或者
$redis->psubscribe(['my*'],array(new TestCall(),'psubscribe')); //回撥函式為類中的方法,類名寫你自己定義的類
subscribe.php
<?php
//ini_set('default_socket_timeout',-1);//不超時
$redis=newRedis();
$redis->connect("127.0.0.1",6379);
$redis->setOption(Redis::OPT_READ_TIMEOUT,-1);
//匹配方式1:釋出可用$redis->publish('mymest','thisisamessage');
//$redis->psubscribe(['my*'],'psubscribe');
//匹配方式2:釋出可用$redis->publish('mydest','thisisamessage');
//$redis->psubscribe(['my?est'],'psubscribe');
//匹配方式3:釋出可用$redis->publish('myaest','thisisamessage');或$redis->publish('myeest','thisisamessage');
$redis->psubscribe(['my[ae]est'],'psubscribe');
functionpsubscribe($redis,$pattern,$chan,$msg){
echo"Pattern:$pattern\n";
echo"Channel:$chan\n";
echo"Payload:$msg\n";
}
模式匹配規則
支援以下幾種,以hello舉例:
h?llo subscribes to hello, hallo and hxllo
h*llo subscribes to hllo and heeeello
h[ae]llo subscribes to hello and hallo, but not hillo
特殊字元用\轉義
pubsub方法介紹
public function pubsub( $keyword, $argument )
pubsub獲取pub/sub系統的資訊,$keyword可用為"channels", "numsub", 或者"numpat",三種,傳入不同的keyword返回的資料不同
*$redis->pubsub('channels');//Allchannels獲取所有的頻道,返回陣列
*$redis->pubsub('channels','*pattern*');//Justchannelsmatchingyourpattern,返回符合匹配模式的頻道
*$redis->pubsub('numsub',array('chan1','chan2'));//Getsubscribercountsfor'chan1'and'chan2'//返回每個訂閱頻道的數量,返回陣列
*$redis->pubsub('numpat');//Getthenumberofpatternsubscribers獲取模式匹配方式的訂閱