用 Redis 實現 PHP 的簡單訊息佇列
阿新 • • 發佈:2018-12-09
訊息佇列就是在訊息的傳輸過程中,可以儲存訊息的容器。
常見用途:
- 儲存轉發:非同步處理耗時的任務
- 分散式事務:多個消費者消費同一個訊息佇列
- 應對高併發:通過訊息佇列儲存任務,慢慢處理
- 釋出訂閱:實現解耦
PHP 可以基於 Redis 的 List 資料型別實現簡單的訊息佇列,可以參考 php-resque。當然也可以使用更強大的 RabbitMQ。
實現方式
PHP 守護程序
PHP 業務程式碼:
<?php
class MyDaemon
{
public $procNum = 8; // 程序總數
// 啟動程序
public function run()
{
for ($i = 0; $i < $this->procNum; $i++) {
$nPID = \pcntl_fork();//建立子程序
if ($nPID == 0) {
//子程序
\Org\Util\MsgQ::init();
$this->work();
exit(0);
}
}
// 等待子程序執行完畢,避免殭屍程序
$n = 0;
while ($n < $this->procNum) {
$nStatus = -1;
$nPID = \pcntl_wait($nStatus);
if ($nPID > 0) {
++$n;
}
}
}
//業務程式碼
public function work()
{
$MsgData = "";
while (true) {
usleep(10000); // 10 ms 執行一次
$ret = MsgQ::BlockSubsribe("MyMsgName", $MsgData);
// 業務程式碼
}
}
訊息佇列(基於Redis)庫程式碼:
<?php
namespace Org\Util;
class MsgQ {
public static $errCode = 0;
public static $errMsg = "";
public static $redis;
private static $preFix = "MsgQ.";
private static $timeOut = 10;
private static $redisHost = '';
private static $redisPort = '';
private static $redisAuth = '';
function __construct()
{
self::$redis = new \Redis();
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
$ret = self::$redis->auth($redisAuth);
}
function __destruct()
{
if(self::$redis) {
self::$redis->close();
}
}
public static function init($timeOut = 0){
if (!self::$redis) {
self::$redis = new \Redis();
if(!empty($timeOut)){
self::$timeOut = $timeOut;
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
$ret = self::$redis->auth($redisAuth);
}
else{
self::$timeOut = 0;
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,259200);
$ret = self::$redis->auth($redisAuth);
}
}
}
public static function Publish($pubKey,$data){
if(!self::PingAndConnect()){
return false;
}
$ret = self::$redis->rPush(self::$preFix.$pubKey,$data);
if ($ret === false){
return false;
}
return true;
}
public static function GetListLen($pubKey,&$len){
if(!self::PingAndConnect()){
return false;
}
$len = 0;
$ret = self::$redis->lLen(self::$preFix.$pubKey);
if ($ret === false){
return false;
}
$len = $ret;
return true;
}
public static function Subsribe($pubKey,&$data){
if(!self::PingAndConnect()){
return false;
}
$ret = self::$redis->lPop(self::$preFix.$pubKey);
if ($ret === false){
return false;
}
$data = $ret;
return true;
}
public static function BlockSubsribe($pubKey,&$data){
if(!self::PingAndConnect()){
return false;
}
try{
$ret = self::$redis->blPop(array(self::$preFix.$pubKey),0);
}
catch(Exception $e){
if(!self::PingAndConnect(true)){
return false;
}
return false;
}
if ($ret === false){
return false;
}
if ($ret === array()){
return false;
}
$data = $ret[1];
return true;
}
private static function PingAndConnect($isException = false){
if (!self::$redis) {
self::$redis = new \Redis();
if (self::$timeOut == 0){
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,259200);
}
else{
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
}
if ($ret === false){
return false;
}
$ret = self::$redis->auth($redisAuth);
if ($ret === false){
return false;
}
}
else{
if (self::$timeOut == 0 && !$isException){
return true;
}
$ret = self::$redis->ping();
if ($ret === false){
if (self::$timeOut == 0){
ini_set('default_socket_timeout', 259200);
$ret = self::$redis->connect($redisHost,$redisPort,259200);
}
else{
$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
}
if ($ret === false){
return false;
}
$ret = self::$redis->auth($redisAuth);
if ($ret === false){
return false;
}
}
}
return true;
}
}
重啟守護程序的 shell 指令碼 restartprocess.sh
:
#!/bin/sh
if [ ! -n "$1" ]; then
echo "input proc name"
exit
else
procname=$1
fi
pids=`(ps -ef | grep "$procname" | grep -v "grep" | grep -v $0) | awk '{print $2}'`
for pid in ${pids[*]}
do
kill -9 $pid
done
cd /path/to/your/project/
setsid $procname &
啟動守護程序的命令:
restartprocess.sh "php index.php /path/to/your/MyDaemon/func/run"
Linux 定時任務
可以設定一分鐘或一秒鐘執行一次 PHP 指令碼。因為每次處理訊息的時間不固定,可能導致訊息積壓或伺服器負載過大。
手工執行指令碼
用於處理偶然需求,簡單。