guzzle post傳送body json_Mix PHP V2 例項:AliCloud 簡訊協程池非同步傳送守護程式
前些時間我們釋出了 Mix PHP V2 例項:協程池非同步郵件傳送守護程式 範例,這一次我們提供一個使用大廠 SDK 通過 Swoole Hook 協程化來並行執行簡訊傳送任務,本文是一個程式碼簡單、IO 效能極強的範例。
請先升級到 mix-framework >= v2.0.5。
本範例依然使用訊息佇列的方式接收簡訊傳送任務,訊息中介軟體使用:
- redis
生產者
通常框架中使用 Redis 會安裝一個類庫來使用,本例使用原生程式碼,便於理解。
// 連線 $redis = new Redis(); if (!$redis->connect('127.0.0.1', 6379)) { throw new Exception('Redis connect failed.'); } $redis->auth(''); $redis->select(0); // 投遞任務 for($i = 0; $i < 3; $i++){ $data = [ 'phone' => '***', 'templateCode' => 'SMS_***', 'templateParam' => ['code' => 123456], ]; $redis->lpush('queue:sms', serialize($data)); }
消費者
使用的是 ali 雲的簡訊服務,檢視官方 PHP SDK 文件 ,使用的庫為:
composer require alibabacloud/client
通過檢視該庫的 composer 依賴檔案,我們得知該庫基於 guzzlehttp 開發,因為 Mix PHP 提供了無需修改程式碼就可 Hook Guzzle 庫可在協程中使用的工具 Mix PHP V2 生態:讓 Guzzle 支援 Swoole 的 Hook 協程,所以能基本確定該庫可在 Swoole 協程中使用。
首先我們安裝 https://github.com/mix-php/guzzle-hook 讓 alibabacloud/client
composer require mix/guzzle-hook
然後在專案的 composer.json
檔案中增加 extra 配置項,如下:
"extra": {
"include_files": [
"vendor/mix/guzzle-hook/src/functions_include.php"
]
}
更新自動載入:
composer dump-autoload
下面我們採用 Mix PHP V2 的守護程式、協程池來完成一個超高效能的簡訊傳送程式。
首先我們在配置 applications/console/config/main.php
// 命令
'commands' => [
'smser' => [
'Smser',
'description' => "SMS send daemon demo.",
'options' => [
[['d', 'daemon'], 'description' => 'Run in the background'],
],
],
],
註冊的命令中指定的 Smser 命令類,接下來我們編寫一個 SmserCommand 類:
applications/console/src/Commands/SmserCommand.php
<?php
namespace ConsoleCommands;
use ConsoleLibrariesSmserWorker;
use MixConcurrentCoroutinePoolDispatcher;
use MixConsoleCommandLineFlag;
use MixCoreCoroutine;
use MixCoreCoroutineChannel;
use MixCoreEvent;
use MixHelperProcessHelper;
use AlibabaCloudClientAlibabaCloud;
/**
* Class SmserCommand
* @package DaemonCommands
* @author liu,jian <[email protected]>
*/
class SmserCommand
{
const ACCESS_KEY = '***';
const ACCESS_SECRET = '***';
/**
* 退出
* @var bool
*/
public $quit = false;
/**
* 主函式
*/
public function main()
{
// 守護處理
$daemon = Flag::bool(['d', 'daemon'], false);
if ($daemon) {
ProcessHelper::daemon();
}
// 捕獲訊號
ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], function ($signal) {
$this->quit = true;
ProcessHelper::signal([SIGHUP, SIGINT, SIGTERM, SIGQUIT], null);
});
// 設定ali雲全域性引數
AlibabaCloud::accessKeyClient(static::ACCESS_KEY, static::ACCESS_SECRET)->regionId('cn-hangzhou')->asDefaultClient();
// 手動關閉Swoole檔案Hook,因為ali雲依賴的uuid庫有檔案hook協程相容問題,Swoole 4.4已經適配該問題
Coroutine::enableHook(SWOOLE_HOOK_ALL ^ SWOOLE_HOOK_FILE);
// 協程池執行任務
xgo(function () {
$maxWorkers = 20;
$maxQueue = 20;
$jobQueue = new Channel($maxQueue);
$dispatch = new Dispatcher([
'jobQueue' => $jobQueue,
'maxWorkers' => $maxWorkers,
]);
$dispatch->start(SmserWorker::class);
// 投放任務
$redis = app()->redisPool->getConnection();
while (true) {
if ($this->quit) {
$dispatch->stop();
return;
}
try {
$data = $redis->brPop(['queue:sms'], 3);
} catch (Throwable $e) {
$dispatch->stop();
return;
}
if (!$data) {
continue;
}
$data = array_pop($data); // brPop命令最後一個鍵才是值
$jobQueue->push($data);
}
});
// 等待事件
Event::wait();
}
}
從$data = $redis->brPop(['queue:sms'], 3);
外部的異常捕獲可得知,當 Redis 連接出錯時,比如 Redis 重啟、連線異常時協程池會安全退出,也就是說當程序異常退出後用戶需使用supervisor
、pm2
等工具重啟守護程序。
上面是一個 Mix PHP 協程池的使用程式碼,基本可以直接複製使用,框架預設包含了協程池的 Demo,本次例項只是修改了協程池的 Worker,本命令主要是完成從 Redis 佇列中獲取訊息然後 push 到 jobQueue 中,jobQueue 中的資料會被 20 個 Worker 例項中某一個搶佔後並行執行,本例的傳送程式碼邏輯就在 SmserWorker 類中:
applications/console/src/Libraries/SmserWorker.php
<?php
namespace ConsoleLibraries;
use MixConcurrentCoroutinePoolAbstractWorker;
use MixConcurrentCoroutinePoolWorkerInterface;
/**
* Class SmserWorker
* @package DaemonLibraries
* @author liu,jian <[email protected]>
*/
class SmserWorker extends AbstractWorker implements WorkerInterface
{
/**
* 郵件傳送器
* @var Smser
*/
public $smser;
/**
* 初始化事件
*/
public function onInitialize()
{
parent::onInitialize(); // TODO: Change the autogenerated stub
// 例項化一些需重用的物件
$this->smser = new Smser();
}
/**
* 處理
* @param $data
*/
public function handle($data)
{
// TODO: Implement handle() method.
$data = unserialize($data);
if (empty($data)) {
return;
}
try {
$result = $this->smser->send($data['phone'], $data['templateCode'], $data['templateParam']);
app()->log->info("SMS sent successfully:phone {phone} templateCode {templateCode} result {result}", array_merge($data, ['result' => json_encode($result, JSON_UNESCAPED_UNICODE)]));
} catch (Throwable $e) {
app()->log->error("SMS failed to send:phone {phone} templateCode {templateCode} error {error}", array_merge($data, ['error' => $e->getMessage()]));
}
}
}
由以上程式碼可見,Worker 在初始化時,新增了一個 Smser 類的屬性,當 jobQueue 訊息投遞過來時訊息會傳遞到 handle 方法,在該方法中使用 Mailer 類的例項完成郵件傳送任務,所以我們要編寫了一個 Smser 傳送程式:
applications/console/src/Libraries/Smser.php
<?php
namespace ConsoleLibraries;
use AlibabaCloudClientAlibabaCloud;
use AlibabaCloudClientExceptionClientException;
use AlibabaCloudClientExceptionServerException;
use MixCoreCoroutine;
/**
* Class Smser
* @package ConsoleLibraries
* @author liu,jian <[email protected]>
*/
class Smser
{
/**
* 配置資訊
*/
const SIGN_NAME = '***';
/**
* Smser constructor.
*/
public function __construct()
{
// 開啟協程鉤子
Coroutine::enableHook();
}
/**
* 傳送
* @param $phone
* @param $templateCode
* @param $templateParam
* @return array
* @throws ClientException
* @throws ServerException
*/
public function send($phone, $templateCode, $templateParam)
{
$result = AlibabaCloud::rpc()
->product('Dysmsapi')
// ->scheme('https') // https | http
->version('2017-05-25')
->action('SendSms')
->method('POST')
->options([
'query' => [
'PhoneNumbers' => $phone,
'SignName' => static::SIGN_NAME,
'TemplateCode' => $templateCode,
'TemplateParam' => json_encode($templateParam),
],
])
->request();
return $result->toArray();
}
}
以上就完成了全部的程式碼邏輯,現在我們開始測試,先啟動消費者守護程式:
[[email protected] bin]# ./mix-console smser
將上文的生產者指令碼命名為 push.php
然後在 CLI 中執行 (開一個新終端):
[[email protected] bin]# php /tmp/push.php
消費者守護程式結果:
[[email protected] bin]# ./mix-console smser
[info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"OK","RequestId":"4071D031-6D9E-4F70-9269-6C1979080858","BizId":"939807358670612546^0","Code":"OK"}
[info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"觸發分鐘級流控Permits:1","RequestId":"490B73D7-317E-4362-B2DD-5E2153A7B891","Code":"isv.BUSINESS_LIMIT_CONTROL"}
[info] 2019-05-24 12:03:32 <101014> [message] SMS sent successfully:phone *** templateCode SMS_*** result {"Message":"觸發分鐘級流控Permits:1","RequestId":"1FD22EDB-BAA4-4416-8FF9-242EDCF34359","Code":"isv.BUSINESS_LIMIT_CONTROL"}
命令列終端列印了傳送成功的日誌,傳送完成。