1. 程式人生 > >Swoole Redis 連線池的實現

Swoole Redis 連線池的實現

概述

這是關於 Swoole 入門學習的第九篇文章:Swoole Redis 連線池的實現。

  • 第八篇:Swoole MySQL 連線池的實現
  • 第七篇:Swoole RPC 的實現
  • 第六篇:Swoole 整合成一個小框架
  • 第五篇:Swoole 多協議 多埠 的應用
  • 第四篇:Swoole HTTP 的應用
  • 第三篇:Swoole WebSocket 的應用
  • 第二篇:Swoole Task 的應用
  • 第一篇:Swoole Timer 的應用

收到讀者反饋,“亮哥,文章能多點圖片嗎?就是將執行結果以圖片的形式展示...”

我個人覺得這是比較懶、動手能力差的表現,恩... 要勤快些。

但誰讓文章是寫給你們看的那,我以後儘量文章寫的圖文並茂一點。

上篇文章 分享了 MySQL 連線池,這篇文章 咱們來分享下 Redis 連線池。

在上篇文章的基礎上進行簡單調整即可,將例項化 MySQL 的地方,修改成例項化 Redis 即可,還要注意一些方法的調整。

這篇文章僅僅只實現一個 Redis 連線池,篇幅就太少了,順便將前幾篇整合一下。

大概 Demo 中包含這些點:

  • 實現 MySQL 連線池
  • 實現 MySQL CURD 方法的定義
  • 實現 Redis 連線池
  • 實現 Redis 方法的定義
  • 滿足 HTTP、TCP、WebSocket 呼叫
  • 提供 Demo 供測試
  • 調整 目錄結構

HTTP 呼叫:

  • 實現 讀取 MySQL 中資料的 Demo
  • 實現 讀取 Redis 中資料的 Demo

TCP 呼叫:

  • 實現 讀取 MySQL 中資料的 Demo
  • 實現 讀取 Redis 中資料的 Demo

WebSocket 呼叫:

  • 實現 每秒展示 API 呼叫量 Demo

目錄結構

├─ client
│  ├─ http
│     ├── mysql.php //測試 MySQL 連線
│     ├── redis.php //測試 Redis 連線
│  ├─ tcp
│     ├── mysql.php //測試 MySQL 連線
│     ├── redis.php //測試 Redis 連線
│  ├─ websocket
│     ├── index.html //實現 API 呼叫量展示
├─ controller
│  ├─ Order.php     //實現 MySQL CURD
│  ├─ Product.php   //實現 Redis 呼叫
│  ├─ Statistic.php //模擬 API 呼叫資料
├─ server
│  ├─ config
│     ├── config.php //預設配置
│     ├── mysql.php  //MySQL 配置
│     ├── redis.php  //Redis 配置
│  ├─ core
│     ├── Common.php //公共方法
│     ├── Core.php   //核心檔案
│     ├── HandlerException.php //異常處理
│     ├── callback //回撥處理
│         ├── OnRequest.php
│         ├── OnReceive.php
│         ├── OnTask.php
│         ├── ...
│     ├── mysql
│         ├── MysqlDB.php
│         ├── MysqlPool.php
│     ├── redis
│         ├── RedisDB.php
│         ├── RedisPool.php
│  ├─ log  -- 需要 讀/寫 許可權
│     ├── ...
├─ index.php //入口檔案

程式碼

server/core/redis/RedisPool.php

<?php

if (!defined('SERVER_PATH')) exit("No Access");

class RedisPool
{
    private static $instance;
    private $pool;
    private $config;

    public static function getInstance($config = null)
    {
        if (empty(self::$instance)) {
            if (empty($config)) {
                throw new RuntimeException("Redis config empty");
            }
            self::$instance = new static($config);
        }
        return self::$instance;
    }

    public function __construct($config)
    {
        if (empty($this->pool)) {
            $this->config = $config;
            $this->pool = new chan($config['master']['pool_size']);
            for ($i = 0; $i < $config['master']['pool_size']; $i++) {
                go(function() use ($config) {
                    $redis = new RedisDB();
                    $res = $redis->connect($config);
                    if ($res === false) {
                        throw new RuntimeException("Failed to connect redis server");
                    } else {
                        $this->pool->push($redis);
                    }
                });
            }
        }
    }

    public function get()
    {
        if ($this->pool->length() > 0) {
            $redis = $this->pool->pop($this->config['master']['pool_get_timeout']);
            if (false === $redis) {
                throw new RuntimeException("Pop redis timeout");
            }
            defer(function () use ($redis) { //釋放
                $this->pool->push($redis);
            });
            return $redis;
        } else {
            throw new RuntimeException("Pool length <= 0");
        }
    }
}

server/core/redis/RedisDB.php

<?php

if (!defined('SERVER_PATH')) exit("No Access");

class RedisDB
{
    private $master;
    private $slave;
    private $config;

    public function __call($name, $arguments)
    {
        // TODO 主庫的操作
        $command_master = ['set', 'hset', 'sadd'];

        if (in_array($name, $command_master)) {
            $db = $this->_get_usable_db('slave');
        } else {
            $db = $this->_get_usable_db('master');
        }
        $result = call_user_func_array([$db, $name], $arguments);
        return $result;
    }

    public function connect($config)
    {
        //主庫
        $master = new Swoole\Coroutine\Redis();
        $res = $master->connect($config['master']['host'], $config['master']['port']);
        if ($res === false) {
            throw new RuntimeException($master->errCode, $master->errMsg);
        } else {
            $this->master = $master;
        }

        //從庫
        $slave = new Swoole\Coroutine\Redis();
        $res = $slave->connect($config['slave']['host'], $config['slave']['port']);
        if ($res === false) {
            throw new RuntimeException($slave->errCode, $slave->errMsg);
        } else {
            $this->slave = $slave;
        }

        $this->config = $config;
        return $res;
    }

    private function _get_usable_db($type)
    {
        if ($type == 'master') {
            if (!$this->master->connected) {
                $master = new Swoole\Coroutine\Redis();
                $res = $master->connect($this->config['master']['host'], $this->config['master']['port']);
                if ($res === false) {
                    throw new RuntimeException($master->errCode, $master->errMsg);
                } else {
                    $this->master = $master;
                }
            }
            return $this->master;
        } elseif ($type == 'slave') {
            if (!$this->slave->connected) {
                $slave = new Swoole\Coroutine\Redis();
                $res = $slave->connect($this->config['slave']['host'], $this->config['slave']['port']);
                if ($res === false) {
                    throw new RuntimeException($slave->errCode, $slave->errMsg);
                } else {
                    $this->slave = $slave;
                }
            }
            return $this->slave;
        }
    }
}

client/http/redis.php

<?php

$demo = [
    'type'  => 'SW',
    'token' => 'Bb1R3YLipbkTp5p0',
    'param' => [
        'class'  => 'Product',
        'method' => 'set',
        'param' => [
            'key'   => 'C4649',
            'value' => '訂單-C4649'
        ],
    ],
];

$ch = curl_init();
$options = [
    CURLOPT_URL  => 'http://10.211.55.4:9509/',
    CURLOPT_POST => 1,
    CURLOPT_POSTFIELDS => json_encode($demo),
];
curl_setopt_array($ch, $options);
curl_exec($ch);
curl_close($ch);

client/tpc/redis.php

<?php

class Client
{
    private $client;

    public function __construct() {
        $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);

        $this->client->on('Connect', [$this, 'onConnect']);
        $this->client->on('Receive', [$this, 'onReceive']);
        $this->client->on('Close', [$this, 'onClose']);
        $this->client->on('Error', [$this, 'onError']);
    }

    public function connect() {
        if(!$fp = $this->client->connect("0.0.0.0", 9510, 1)) {
            echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;
            return;
        }
    }

    public function onConnect() {

        fwrite(STDOUT, "測試RPC (Y or N):");
        swoole_event_add(STDIN, function() {
            $msg = trim(fgets(STDIN));
            if ($msg == 'y') {
                $this->send();
            }
            fwrite(STDOUT, "測試RPC (Y or N):");
        });
    }

    public function onReceive($cli, $data) {
        echo '[Received]:'.$data;
    }

    public function send() {
        $demo = [
            'type'  => 'SW',
            'token' => 'Bb1R3YLipbkTp5p0',
            'param' => [
                'class'  => 'Product',
                'method' => 'get',
                'param' => [
                    'code' => 'C4649'
                ],
            ],
        ];
        $this->client->send(json_encode($demo));
    }

    public function onClose() {
        echo "Client close connection".PHP_EOL;
    }

    public function onError() {

    }
}

$client = new Client();
$client->connect();

client/websocket/index.html

<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <meta name="description" content="">
    <meta name="keywords" content="">
    <title>Demo</title>
    <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script>
    <script src="http://echarts.baidu.com/gallery/vendors/echarts/echarts.min.js"></script>
</head>
<body>
<!-- 為ECharts準備一個具備大小(寬高)的Dom -->
<div id="main" style="width: 900px;height:400px;"></div>
<script type="text/javascript">
    if ("WebSocket" in window) {
        // 基於準備好的dom,初始化echarts例項
        var myChart = echarts.init(document.getElementById('main'));
        var wsServer = 'ws://10.211.55.4:9509';
        var ws = new WebSocket(wsServer);

        ws.onopen = function (evt) {
            if (ws.readyState == 1) {
                console.log('WebSocket 連線成功...');
            } else {
                console.log('WebSocket 連線失敗...');
            }

            if (ws.readyState == 1) {
                ws.send('開始請求...');
            } else {
                alert('WebSocket 連線失敗');
            }
        };

        ws.onmessage = function (evt) {
            console.log('Retrieved data from server: ' + evt.data);
            var evt_data = jQuery.parseJSON(evt.data);
            myChart.setOption({
                xAxis: {
                    data : evt_data.time
                },
                series: [{
                    data: evt_data.value
                }]
            });

        };

        ws.onerror = function (evt) {
            alert('WebSocket 發生錯誤');
            console.log(evt);
        };

        ws.onclose = function() {
            alert('WebSocket 連線關閉');
            console.log('WebSocket 連線關閉...');
        };

        // 指定圖表的配置項和資料
        $.ajax({
            url      : 'http://10.211.55.4:9509/', // 請求url
            type     : "post", // 提交方式
            dataType : "json", // 資料型別
            data : {
                'type'  : 'SW',
                'token' : 'Bb1R3YLipbkTp5p0',
                'param' : {
                    'class'  : 'Statistic',
                    'method' : 'init'
                }
            },
            beforeSend:function() {

            },
            success : function(rs) {
                if (rs.code != 1) {
                    alert('獲取資料失敗');
                } else {
                    var option = {
                        title: {
                            text: 'API 呼叫量',
                            x:'center'
                        },
                        tooltip: {
                            trigger: 'axis',
                            axisPointer: {
                                animation: false
                            }
                        },
                        xAxis: {
                            type : 'category',
                            data : rs.data.time
                        },
                        yAxis: {
                            type: 'value',
                            boundaryGap: [0, '100%'],
                            name: '使用量',
                            splitLine: {
                                show: false
                            }
                        },
                        series: [{
                            name: '使用量',
                            type: 'line',
                            showSymbol: false,
                            hoverAnimation: false,
                            data: rs.data.value
                        }]
                    };

                    // 使用剛指定的配置項和資料顯示圖表。
                    if (option && typeof option === "object") {
                        myChart.setOption(option, true);
                    }
                }
            },
            error : function(){
                alert('伺服器請求異常');
            }
        });
    } else {
        alert("您的瀏覽器不支援 WebSocket!");
    }
</script>
</body>
</html>

還涉及到,OnMessage.php、OnTask.php 、OnWorkerStart.php 等,就不貼程式碼了。

執行

小框架的啟動/關閉/熱載入,看看這篇文章: 第六篇:Swoole 整合成一個小框架

裡面 Demo 在 client 資料夾下。

http 目錄下的檔案,放到自己虛擬目錄下,用瀏覽器訪問。

tcp 目錄下的檔案,在 CLI 下執行。

websocket 目錄下的檔案,直接點選在瀏覽器訪問。

擴充套件

官方協程 Redis 客戶端手冊:

https://wiki.swoole.com/wiki/page/589.html

大家可以嘗試使用官方提供的其他方法。

小結

Demo 程式碼僅供參考,裡面有很多不嚴謹的地方,根據自己的需要進行修改 ...

上面的 Demo 需要原始碼的,加我微信。(選單-> 加我微信-> 掃我)

本文歡迎轉發,轉發請註明作者和出處,謝謝