php+redis實現延遲佇列(訂單超時未支付。會員時間過期)
阿新 • • 發佈:2019-01-09
基於redis有序集實現延遲任務執行,比如某個時間給某個使用者發簡訊,訂單過期處理,等等
我是在tp5框架上寫的,實現起來很簡單,對於一些不是很複雜的應用足夠了,目前在公司專案中使用,後臺程序並沒有實現多程序,
1、命令列指令碼 執行方法:php think delay-queue queuename(這是有序集的key)
namespace app\command; use app\common\lib\delayqueue\DelayQueue; use think\console\Command; use think\console\Input; use think\console\Output; use think\Db; class DelayQueueWorker extends Command { const COMMAND_ARGV_1 = 'queue'; protected function configure() { $this->setName('delay-queue')->setDescription('延遲佇列任務程序'); $this->addArgument(self::COMMAND_ARGV_1); } protected function execute(Input $input, Output $output) { $queue = $input->getArgument(self::COMMAND_ARGV_1); //引數1 延遲隊列表名,對應與redis的有序集key名 while (true) { DelayQueue::getInstance($queue)->perform(); usleep(300000); } } }
庫類目錄結構
config.php 裡是redis連線引數配置
RedisHandler.php只實現有序集的操作,重連機制還沒有實現
namespace app\common\lib\delayqueue; class RedisHandler { public $provider; private static $_instance = null; private function __construct() { $this->provider = new \Redis(); //host port $config = require_once 'config.php'; $this->provider->connect($config['redis_host'], $config['redis_port']); } final private function __clone() {} public static function getInstance() { if(!self::$_instance) { self::$_instance = new RedisHandler(); } return self::$_instance; } /** * @param string $key 有序集key * @param number $score 排序值 * @param string $value 格式化的資料 * @return int */ public function zAdd($key, $score, $value) { return $this->provider->zAdd($key, $score, $value); } /** * 獲取有序集資料 * @param $key * @param $start * @param $end * @param null $withscores * @return array */ public function zRange($key, $start, $end, $withscores = null) { return $this->provider->zRange($key, $start, $end, $withscores); } /** * 刪除有序集資料 * @param $key * @param $member * @return int */ public function zRem($key,$member) { return $this->provider->zRem($key,$member); } }
延遲佇列類
namespace app\common\lib\delayqueue;
class DelayQueue
{
private $prefix = 'delay_queue:';
private $queue;
private static $_instance = null;
private function __construct($queue) {
$this->queue = $queue;
}
final private function __clone() {}
public static function getInstance($queue = '') {
if(!self::$_instance) {
self::$_instance = new DelayQueue($queue);
}
return self::$_instance;
}
/**
* 新增任務資訊到佇列
*
* demo DelayQueue::getInstance('test')->addTask(
* 'app\common\lib\delayqueue\job\Test',
* strtotime('2018-05-02 20:55:20'),
* ['abc'=>111]
* );
*
* @param $jobClass
* @param int $runTime 執行時間
* @param array $args
*/
public function addTask($jobClass, $runTime, $args = null)
{
$key = $this->prefix.$this->queue;
$params = [
'class' => $jobClass,
'args' => $args,
'runtime' => $runTime,
];
RedisHandler::getInstance()->zAdd(
$key,
$runTime,
serialize($params)
);
}
/**
* 執行job
* @return bool
*/
public function perform()
{
$key = $this->prefix.$this->queue;
//取出有序集第一個元素
$result = RedisHandler::getInstance()->zRange($key, 0 ,0);
if (!$result) {
return false;
}
$jobInfo = unserialize($result[0]);
print_r('job: '.$jobInfo['class'].' will run at: '. date('Y-m-d H:i:s',$jobInfo['runtime']).PHP_EOL);
$jobClass = $jobInfo['class'];
if( [email protected]_exists($jobClass)) {
print_r($jobClass.' undefined'. PHP_EOL);
RedisHandler::getInstance()->zRem($key, $result[0]);
return false;
}
// 到時間執行
if (time() >= $jobInfo['runtime']) {
$job = new $jobClass;
$job->setPayload($jobInfo['args']);
$jobResult = $job->preform();
if ($jobResult) {
// 將任務移除
RedisHandler::getInstance()->zRem($key, $result[0]);
return true;
}
}
return false;
}
}
非同步任務基類:
namespace app\common\lib\delayqueue;
class DelayJob
{
protected $payload;
public function preform ()
{
// todo
return true;
}
public function setPayload($args = null)
{
$this->payload = $args;
}
}
所有非同步執行的任務都解除安裝job目錄下,且要繼承DelayJob,你可以實現任何你想延遲執行的任務
如:
namespace app\common\lib\delayqueue\job;
use app\common\lib\delayqueue\DelayJob;
class Test extends DelayJob
{
public function preform()
{
// payload 裡應該有處理任務所需的引數,通過DelayQueue的addTask傳入
print_r('test job'.PHP_EOL);
return true;
}
}
使用方法:
假設使用者建立了一個訂單,訂單在10分鐘後失效,那麼在訂單建立後加入:
DelayQueue::getInstance('close_order')->addTask(
'app\common\lib\delayqueue\job\CloseOrder', // 自己實現的job
strtotime('2018-05-02 20:55:20'), // 訂單失效時間
['order_id'=>123456] // 傳遞給job的引數
);
close_order 是有序集的key
命令列啟動程序
php think delay-queue close_order