非同步redis佇列實現 資料入庫
業務需求
app客戶端向服務端介面傳送來json 資料 每天 發一次 清空快取後會再次傳送
出問題之前業務邏輯:
php 介面 首先將 json 轉為陣列 去重 在一張大表中插入不存在的資料
該使用者已經存在 和新增的id
入另一種詳情表
問題所在:
當用戶因特殊情況清除快取 導致app 傳送json串 入庫併發高 導致CPU 暴增到88% 並且居高不下
優化思路:
1、非同步佇列處理
2、redis 過濾(就是隻處理當天第一次請求)
3、redis 輔助儲存app名稱(驗證過後批量插入資料app名稱表中)
4、拼接插入的以及新增的如詳細表中
解決辦法:
1、介面修改 redis 過濾 + 如list佇列 並將結果存入redis中
首先 redis將之前的歷史資料放在redis 雜湊裡面 中文為鍵名 id 為鍵值
<?php /** * Created by haiyong. * User: jia * Date: 2017/9/18 * Time: 20:06 */ namespace App\Http\Controllers\App; use App\Http\Controllers\Controller; use Illuminate\Http\Request; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Redis; class OtherAppController extends Controller{ /** * app應用統計介面 * @param Request $request * @return string */ public function appTotal(Request $request) { // //歷史資料入庫 //$redis = Redis::connection('web_active'); // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName'); // $str = ''; // foreach ($app_name as $key => $val) { // $str.= "{$val} {$key} "; // } // $redis->hmset('app_name', $app_name); // echo $str;exit; $result = $request->input('res'); $list = json_decode($result, true); if (empty ($list) || !is_array($list)) { return json_encode(['result' => 'ERROR', 'msg' => 'parameter error']); } $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ; $data['time'] = date('Y-m-d'); $redis_key = 'log_app:'.$data['time']; //redis 過濾 $redis = Redis::connection('web_active'); //redis 鍵值過期設定 if (empty($redis->exists($redis_key))) { $redis->hset($redis_key, 1, 'start'); $redis->EXPIREAT($redis_key, strtotime($data['time'].'+2 day')); } //值確定 if ($redis->hexists($redis_key, $data['uid'])) { return json_encode(['result' => 'SUCCESS']); } else { //推入佇列 $redis->hset($redis_key, $data['uid'], $result); $redis->rpush('log_app_list', $data['time'] . ':' . $data['uid']); return json_encode(['result' => 'SUCCESS']); } } }
2、php 指令碼迴圈 監控redis 佇列 執行邏輯 防止記憶體溢位
mget 獲取該使用者的app id 不存在就會返回null
通過判斷null 運用redis 新值作為自增id指標 將null 補齊 之後批量入mysql 並跟新redis 雜湊 和指標值 併入庫 詳情表
<?php namespace App\Console\Commands; use Illuminate\Console\Command; use Illuminate\Support\Facades\Redis; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Storage; class AppTotal extends Command { /** * The name and signature of the console command. * * @var string */ protected $signature = 'AppTotal:run'; /** * The console command description. * * @var string */ protected $description = 'Command description'; /** * Create a new command instance. * * @return void */ public function __construct() { parent::__construct(); } /** * Execute the console command. * * @return mixed */ public function handle() { //歷史資料入庫 // $redis = Redis::connection('web_active'); // $app_name = DB::connection('phpLog')->table('app_set_name')->where("appName", '<>', ' ')->lists('id', 'appName'); // $redis->hmset('app_name', $app_name); // exit; while(1) { $redis = Redis::connection('web_active'); //佇列名稱 $res = $redis->lpop('log_app_list'); //開關按鈕 $lock = $redis->get('log_app_lock'); if (!empty($res)) { list($date,$uid) = explode(':',$res); $result = $redis->hget('log_app:'.$date, $uid); if (!empty($result)) { $table_name = 'app_total'.date('Ym'); $list = json_decode($result, true); $data['uid'] = isset($list['uid']) ? $list['uid'] : '20001' ; $data['sex'] = isset($list['sex']) ? $list['sex'] : '' ; $data['device'] = isset($list['device']) ? $list['device'] : '' ; $data['appList'] = isset($list['list']) ? $list['list'] : '' ; //資料去重 flip比unique更節約效能 $data['appList'] = array_flip($data['appList']); $data['appList'] = array_flip($data['appList']); $data['time'] = date('Y-m-d'); //app應用過濾 $app_res = $redis->hmget('app_name', $data['appList']); //新增加app陣列 $new_app = []; //mysql 入庫陣列 $mysql_new_app = []; //獲取當前redis 自增指標 $total = $redis->get('app_name_total'); foreach ($app_res as $key =>& $val) { if (is_null($val)) { $total += 1; $new_app[$data['appList'][$key]] = $total; $val = $total; array_push($mysql_new_app,['id' => $total, 'appName'=> $data['appList'][$key]]); } } if (count($new_app)){ $str = "INSERT IGNORE INTO app_set_name (id,appName) values"; foreach ($new_app as $key => $val) { $str.= "(".$val.",'".$key."'),"; } $str = trim($str, ','); //$mysql_res = DB::connection('phpLog')->table('app_set_name')->insert($mysql_new_app); $mysql_res = DB::connection('phpLog')->statement($str); if ($mysql_res) { // 設定redis 指標 $redis->set('app_name_total', $total); // redis 資料入庫 $redis->hmset('app_name', $new_app); } } // 詳情資料入庫 $data['appList'] = implode(',', $app_res); //app統計入庫 DB::connection('phpLog')->statement("INSERT IGNORE INTO ".$table_name." (uid,sex,device,`time`,appList) values('".$data['uid']."',".$data['sex'].",'".$data['device']."','".$data['time']."','".$data['appList']."')"); //log 記錄 當檔案達到123MB的時候產生記憶體保錯 所有這個地方可是利用日誌切割 或者 不寫入 日誌 Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' success '.$result."\n"); } else { Storage::disk('local')->append(DIRECTORY_SEPARATOR.'total'.DIRECTORY_SEPARATOR.'loaAppTotal.txt', date('Y-m-d H:i:s').' error '.$result."\n"); } } //執行間隔 sleep(1); //結束按鈕 if ($lock == 2) { exit; } //記憶體檢測 if(memory_get_usage()>1000*1024*1024){ exit('記憶體溢位');//大於100M記憶體退出程式,防止記憶體洩漏被系統殺死導致任務終端 } } } }
3、執定 定時任務監控指令碼執行情況
crontab -e
/2 * * * * /bin/bash /usr/local/nginx/html/test.sh 1>>/usr/local/nginx/html/log.log 2>&1
test.sh 內容 (檢視執行命令返回的程序id 如果沒有就執行命令開啟)
#!/bin/bash
alive=`ps -ef | grep AppTotal | grep -v grep | awk '{print $2}'`
if [ ! $alive ]
then
/usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
fi
記得授權哦 chmod +x test.sh
筆者用的laravel 框架 將命令啟用丟入後臺
執行命令
/usr/local/php/bin/php /var/ms/artisan AppTotal:run > /dev/null &
完事直接 ctrl -c 結束就行 命令以在後臺執行 可以用shell 中的命令檢視程序id
這樣就實現佇列非同步入庫
還有很多問題需要優化!!大致功能已經實現!!!!!!
優化完成後cpu