Swoole Redis 連線池的實現
阿新 • • 發佈:2019-06-03
概述
這是關於 Swoole 入門學習的第九篇文章:Swoole Redis 連線池的實現。
- 第八篇:Swoole MySQL 連線池的實現
- 第七篇:Swoole RPC 的實現
- 第六篇:Swoole 整合成一個小框架
- 第五篇:Swoole 多協議 多埠 的應用
- 第四篇:Swoole HTTP 的應用
- 第三篇:Swoole WebSocket 的應用
- 第二篇:Swoole Task 的應用
- 第一篇:Swoole Timer 的應用
收到讀者反饋,“亮哥,文章能多點圖片嗎?就是將執行結果以圖片的形式展示...”
我個人覺得這是比較懶、動手能力差的表現,恩... 要勤快些。
但誰讓文章是寫給你們看的那,我以後儘量文章寫的圖文並茂一點。
上篇文章 分享了 MySQL 連線池,這篇文章 咱們來分享下 Redis 連線池。
在上篇文章的基礎上進行簡單調整即可,將例項化 MySQL 的地方,修改成例項化 Redis 即可,還要注意一些方法的調整。
這篇文章僅僅只實現一個 Redis 連線池,篇幅就太少了,順便將前幾篇整合一下。
大概 Demo 中包含這些點:
- 實現 MySQL 連線池
- 實現 MySQL CURD 方法的定義
- 實現 Redis 連線池
- 實現 Redis 方法的定義
- 滿足 HTTP、TCP、WebSocket 呼叫
- 提供 Demo 供測試
- 調整 目錄結構
HTTP 呼叫:
- 實現 讀取 MySQL 中資料的 Demo
- 實現 讀取 Redis 中資料的 Demo
TCP 呼叫:
- 實現 讀取 MySQL 中資料的 Demo
- 實現 讀取 Redis 中資料的 Demo
WebSocket 呼叫:
- 實現 每秒展示 API 呼叫量 Demo
目錄結構
├─ client │ ├─ http │ ├── mysql.php //測試 MySQL 連線 │ ├── redis.php //測試 Redis 連線 │ ├─ tcp │ ├── mysql.php //測試 MySQL 連線 │ ├── redis.php //測試 Redis 連線 │ ├─ websocket │ ├── index.html //實現 API 呼叫量展示 ├─ controller │ ├─ Order.php //實現 MySQL CURD │ ├─ Product.php //實現 Redis 呼叫 │ ├─ Statistic.php //模擬 API 呼叫資料 ├─ server │ ├─ config │ ├── config.php //預設配置 │ ├── mysql.php //MySQL 配置 │ ├── redis.php //Redis 配置 │ ├─ core │ ├── Common.php //公共方法 │ ├── Core.php //核心檔案 │ ├── HandlerException.php //異常處理 │ ├── callback //回撥處理 │ ├── OnRequest.php │ ├── OnReceive.php │ ├── OnTask.php │ ├── ... │ ├── mysql │ ├── MysqlDB.php │ ├── MysqlPool.php │ ├── redis │ ├── RedisDB.php │ ├── RedisPool.php │ ├─ log -- 需要 讀/寫 許可權 │ ├── ... ├─ index.php //入口檔案
程式碼
server/core/redis/RedisPool.php
<?php
if (!defined('SERVER_PATH')) exit("No Access");
class RedisPool
{
private static $instance;
private $pool;
private $config;
public static function getInstance($config = null)
{
if (empty(self::$instance)) {
if (empty($config)) {
throw new RuntimeException("Redis config empty");
}
self::$instance = new static($config);
}
return self::$instance;
}
public function __construct($config)
{
if (empty($this->pool)) {
$this->config = $config;
$this->pool = new chan($config['master']['pool_size']);
for ($i = 0; $i < $config['master']['pool_size']; $i++) {
go(function() use ($config) {
$redis = new RedisDB();
$res = $redis->connect($config);
if ($res === false) {
throw new RuntimeException("Failed to connect redis server");
} else {
$this->pool->push($redis);
}
});
}
}
}
public function get()
{
if ($this->pool->length() > 0) {
$redis = $this->pool->pop($this->config['master']['pool_get_timeout']);
if (false === $redis) {
throw new RuntimeException("Pop redis timeout");
}
defer(function () use ($redis) { //釋放
$this->pool->push($redis);
});
return $redis;
} else {
throw new RuntimeException("Pool length <= 0");
}
}
}
server/core/redis/RedisDB.php
<?php
if (!defined('SERVER_PATH')) exit("No Access");
class RedisDB
{
private $master;
private $slave;
private $config;
public function __call($name, $arguments)
{
// TODO 主庫的操作
$command_master = ['set', 'hset', 'sadd'];
if (in_array($name, $command_master)) {
$db = $this->_get_usable_db('slave');
} else {
$db = $this->_get_usable_db('master');
}
$result = call_user_func_array([$db, $name], $arguments);
return $result;
}
public function connect($config)
{
//主庫
$master = new Swoole\Coroutine\Redis();
$res = $master->connect($config['master']['host'], $config['master']['port']);
if ($res === false) {
throw new RuntimeException($master->errCode, $master->errMsg);
} else {
$this->master = $master;
}
//從庫
$slave = new Swoole\Coroutine\Redis();
$res = $slave->connect($config['slave']['host'], $config['slave']['port']);
if ($res === false) {
throw new RuntimeException($slave->errCode, $slave->errMsg);
} else {
$this->slave = $slave;
}
$this->config = $config;
return $res;
}
private function _get_usable_db($type)
{
if ($type == 'master') {
if (!$this->master->connected) {
$master = new Swoole\Coroutine\Redis();
$res = $master->connect($this->config['master']['host'], $this->config['master']['port']);
if ($res === false) {
throw new RuntimeException($master->errCode, $master->errMsg);
} else {
$this->master = $master;
}
}
return $this->master;
} elseif ($type == 'slave') {
if (!$this->slave->connected) {
$slave = new Swoole\Coroutine\Redis();
$res = $slave->connect($this->config['slave']['host'], $this->config['slave']['port']);
if ($res === false) {
throw new RuntimeException($slave->errCode, $slave->errMsg);
} else {
$this->slave = $slave;
}
}
return $this->slave;
}
}
}
client/http/redis.php
<?php
$demo = [
'type' => 'SW',
'token' => 'Bb1R3YLipbkTp5p0',
'param' => [
'class' => 'Product',
'method' => 'set',
'param' => [
'key' => 'C4649',
'value' => '訂單-C4649'
],
],
];
$ch = curl_init();
$options = [
CURLOPT_URL => 'http://10.211.55.4:9509/',
CURLOPT_POST => 1,
CURLOPT_POSTFIELDS => json_encode($demo),
];
curl_setopt_array($ch, $options);
curl_exec($ch);
curl_close($ch);
client/tpc/redis.php
<?php
class Client
{
private $client;
public function __construct() {
$this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->client->on('Connect', [$this, 'onConnect']);
$this->client->on('Receive', [$this, 'onReceive']);
$this->client->on('Close', [$this, 'onClose']);
$this->client->on('Error', [$this, 'onError']);
}
public function connect() {
if(!$fp = $this->client->connect("0.0.0.0", 9510, 1)) {
echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;
return;
}
}
public function onConnect() {
fwrite(STDOUT, "測試RPC (Y or N):");
swoole_event_add(STDIN, function() {
$msg = trim(fgets(STDIN));
if ($msg == 'y') {
$this->send();
}
fwrite(STDOUT, "測試RPC (Y or N):");
});
}
public function onReceive($cli, $data) {
echo '[Received]:'.$data;
}
public function send() {
$demo = [
'type' => 'SW',
'token' => 'Bb1R3YLipbkTp5p0',
'param' => [
'class' => 'Product',
'method' => 'get',
'param' => [
'code' => 'C4649'
],
],
];
$this->client->send(json_encode($demo));
}
public function onClose() {
echo "Client close connection".PHP_EOL;
}
public function onError() {
}
}
$client = new Client();
$client->connect();
client/websocket/index.html
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="description" content="">
<meta name="keywords" content="">
<title>Demo</title>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script>
<script src="http://echarts.baidu.com/gallery/vendors/echarts/echarts.min.js"></script>
</head>
<body>
<!-- 為ECharts準備一個具備大小(寬高)的Dom -->
<div id="main" style="width: 900px;height:400px;"></div>
<script type="text/javascript">
if ("WebSocket" in window) {
// 基於準備好的dom,初始化echarts例項
var myChart = echarts.init(document.getElementById('main'));
var wsServer = 'ws://10.211.55.4:9509';
var ws = new WebSocket(wsServer);
ws.onopen = function (evt) {
if (ws.readyState == 1) {
console.log('WebSocket 連線成功...');
} else {
console.log('WebSocket 連線失敗...');
}
if (ws.readyState == 1) {
ws.send('開始請求...');
} else {
alert('WebSocket 連線失敗');
}
};
ws.onmessage = function (evt) {
console.log('Retrieved data from server: ' + evt.data);
var evt_data = jQuery.parseJSON(evt.data);
myChart.setOption({
xAxis: {
data : evt_data.time
},
series: [{
data: evt_data.value
}]
});
};
ws.onerror = function (evt) {
alert('WebSocket 發生錯誤');
console.log(evt);
};
ws.onclose = function() {
alert('WebSocket 連線關閉');
console.log('WebSocket 連線關閉...');
};
// 指定圖表的配置項和資料
$.ajax({
url : 'http://10.211.55.4:9509/', // 請求url
type : "post", // 提交方式
dataType : "json", // 資料型別
data : {
'type' : 'SW',
'token' : 'Bb1R3YLipbkTp5p0',
'param' : {
'class' : 'Statistic',
'method' : 'init'
}
},
beforeSend:function() {
},
success : function(rs) {
if (rs.code != 1) {
alert('獲取資料失敗');
} else {
var option = {
title: {
text: 'API 呼叫量',
x:'center'
},
tooltip: {
trigger: 'axis',
axisPointer: {
animation: false
}
},
xAxis: {
type : 'category',
data : rs.data.time
},
yAxis: {
type: 'value',
boundaryGap: [0, '100%'],
name: '使用量',
splitLine: {
show: false
}
},
series: [{
name: '使用量',
type: 'line',
showSymbol: false,
hoverAnimation: false,
data: rs.data.value
}]
};
// 使用剛指定的配置項和資料顯示圖表。
if (option && typeof option === "object") {
myChart.setOption(option, true);
}
}
},
error : function(){
alert('伺服器請求異常');
}
});
} else {
alert("您的瀏覽器不支援 WebSocket!");
}
</script>
</body>
</html>
還涉及到,OnMessage.php、OnTask.php 、OnWorkerStart.php 等,就不貼程式碼了。
執行
小框架的啟動/關閉/熱載入,看看這篇文章: 第六篇:Swoole 整合成一個小框架
裡面 Demo 在 client 資料夾下。
http 目錄下的檔案,放到自己虛擬目錄下,用瀏覽器訪問。
tcp 目錄下的檔案,在 CLI 下執行。
websocket 目錄下的檔案,直接點選在瀏覽器訪問。
擴充套件
官方協程 Redis 客戶端手冊:
https://wiki.swoole.com/wiki/page/589.html
大家可以嘗試使用官方提供的其他方法。
小結
Demo 程式碼僅供參考,裡面有很多不嚴謹的地方,根據自己的需要進行修改 ...
上面的 Demo 需要原始碼的,加我微信。(選單-> 加我微信-> 掃我)
本文歡迎轉發,轉發請註明作者和出處,謝謝