Swoole 實戰:MySQL 查詢器的實現(協程連線池版)
阿新 • • 發佈:2020-05-03
[toc]
### 需求分析
本篇我們將通過 Swoole 實現一個自帶連線池的 MySQL 查詢器:
1. 支援通過鏈式呼叫構造並執行 SQL 語句;
2. 支援連線池技術;
3. 支援多協程事務併發執行(協程安全性);
4. 支援連線物件的健康檢測;
5. 支援連線物件斷線重連;
6. 程式需要可擴充套件,為未來的改造留好擴充套件點;
完整專案地址:[協程版 MySQL 查詢器](https://github.com/linvanda/mysql)
(注:該專案並非示例專案,而是生產可用的,已經在公司內部穩定使用。)
### 使用示例
- 查詢:
```php
$query->select(['uid', 'name'])
->from('users u')
->join('auth_users au', "u.uid=au.uid")
->where(['uid' => $uid])
->groupBy("u.phone")
->having("count(u.phone)>1")
->orderBy("u.uid desc")
->limit(10, 0)
->list();
```
- 插入:
```php
$query->insert('users')
->values(
[
[
'name' => 'linvanda',
'phone' => '18687664562',
'nickname' => '林子',
],
[
'name' => 'xiake',
'phone' => '18989876543',
'nickname' => '俠客',
],
]
)->execute();// 這裡是批量插入,不需要批量插入的話,傳入一維陣列即可
// 延遲插入
$query->insert('users')
->delayed()
->values(
[
'name' => 'linvanda',
'phone' => '18687664562',
'nickname' => '林子',
]
)->execute();
```
- 更新:
```php
$query->update('users u')
->join('auth_users au', "u.uid=au.uid")
->set(['u.name' => '粽子'])
->where("u.uid=:uid", ['uid' => 123])
->execute();
```
- 刪除:
```php
$query->delete('users')
->where("uid=:uid", ['uid' => 123])
->execute();
```
- 事務:
```php
$query->begin();
$query->update('users u')
->join('auth_users au', "u.uid=au.uid")
->set(['u.name' => '粽子'])
->where("u.uid=:uid", ['uid' => 123])
->execute();
...
$query->commit();
```
### 模組設計
1. 查詢模組:
- 查詢器(Query,入口)
- SQL構造器(Builder)
1. 事務模組:
- 事務介面(ITransaction)
- 協程版事務類(CoTransaction)
- 協程上下文(TContext)
1. 連線池模組:
- 連線池介面(IPool)
- 協程連線池類(CoPool)
1. 資料庫連線(驅動)模組:
- 連線介面(IConnector)
- 連線生成器介面(IConnectorBuilder)
- 協程連線類(CoConnector)
- 協程連線生成器(CoConnectorBuilder)
- 資料庫連線配置類(DBConfig)
- 資料庫連線(統計)資訊類(ConnectorInfo)
我們希望通過統一的入口對外提供服務,將複雜性隱藏在內部。該統一入口由**查詢模組**提供。該模組由**查詢器**和**SQL 構造器**構成,其中**查詢器**作為外界唯一入口,而構造器是一個 Trait,因為這樣可以讓外界通過查詢器入口直接使用構造器提供的 SQL 組裝功能。
查詢器通過**事務模組**執行 SQL。這裡的**事務**有兩個層面含義:資料庫操作的事務性(顯式或隱式事務,由 `CoTransaction` 類保障),以及多協程下的執行環境隔離性(由 `TContext` 類保障)。
事務模組需要通過資料庫連線物件執行具體的 SQL。連線物件由**連線池**模組提供。
連線池模組維護(建立、回收、銷燬)**資料庫連線**物件,具體是通過**資料庫連線**模組的**連線生成器**生成新資料庫連線。
模組之間依賴於介面而非具體實現:查詢模組依賴事務模組的 `ITransaction` 介面;事務模組依賴連線池模組的 `IPool` 介面和資料庫連線模組的 `IConnector` 介面;連線池模組依賴資料庫連線模組的 `IConnectorBuilder` 介面。
### UML 類圖
![MySQL 查詢器 UML 類圖](https://img2020.cnblogs.com/blog/1997761/202005/1997761-20200502185303151-151024225.png)
下面,我們分模組具體講解。
### 入口
由查詢模組對外提供統一的使用入口。查詢模組由兩部分構成:查詢器和 SQL 構造器。為了讓呼叫方可以直接通過查詢器來構造 SQL(而不用先例項化一個構造器構造 SQL 然後傳給查詢器),我將構造器設計成 Trait 供查詢器 Query 使用。
我們先看看查詢器類 Query:
```php
class Query
{
use Builder;
public const MODEL_READ = 'read';
public const MODEL_WRITE = 'write';
private $transaction;
public function __construct(ITransaction $transaction)
{
$this->transaction = $transaction;
}
/**
* 開啟事務
*/
public function begin($model = 'write'): bool
{
return $this->transaction->begin($model);
}
/**
* 提交事務
*/
public function commit(): bool
{
return $this->transaction->commit();
}
/**
* 回滾事務
*/
public function rollback(): bool
{
return $this->transaction->rollback();
}
/**
* 便捷方法:列表查詢
*/
public function list(): array
{
$list = $this->transaction->command(...$this->compile());
if ($list === false) {
throw new DBException($this->lastError(), $this->lastErrorNo());
}
return $list;
}
/**
* 便捷方法:查詢一行記錄
*/
public function one(): array
{
$list = $this->transaction->command(...$this->limit(1)->compile());
if ($list === false) {
throw new DBException($this->lastError(), $this->lastErrorNo());
}
if ($list) {
return $list[0];
}
return [];
}
...
/**
* 執行 SQL
* 有兩種方式:
* 1. 調此方法時傳入相關引數;
* 2. 通過 Builder 提供的 Active Record 方法組裝 SQL,調此方法(不傳引數)執行並返回結果
*/
public function execute(string $preSql = '', array $params = [])
{
if (!func_num_args()) {
$result = $this->transaction->command(...$this->compile());
} else {
$result = $this->transaction->command(...$this->prepareSQL($preSql, $params));
}
if ($result === false) {
throw new DBException($this->lastError(), $this->lastErrorNo());
}
return $result;
}
public function lastInsertId()
{
return $this->transaction->lastInsertId();
}
public function affectedRows()
{
return $this->transaction->affectedRows();
}
}
```
該入口類做了以下幾件事情:
- 提供 list()、one()、page()、execute() 等方法執行 SQL 語句,其內部是通過 transaction 實現的;
- 通過 Builder 這個 Trait 對外提供 SQL 構造功能;
- 委託 transaction 實現事務功能;
我們再簡單看下 Builder 的實現:
```php
Trait Builder
{
...
public function select($fields = null)
{
if ($this->type) {
return $this;
}
$this->type = 'select';
$this->fields($fields);
return $this;
}
/**
* 預處理 SQL
* @param string $sql 格式:select * from t_name where uid=:uid
* @param array $params 格式:['uid' => $uid]
* @return array 輸出格式:sql: select * from t_name where uid=?,params: [$uid]
* @throws \Exception
*/
private function prepareSQL(string $sql, array $params)
{
$sql = trim($sql);
if (!$params) {
return [$sql, []];
}
preg_match_all('/:([^\s;]+)/', $sql, $matches);
if (!($matches = $matches[1])) {
return [$sql, []];
}
if (count($matches) !== count($params)) {
throw new \Exception("SQL 佔位數與引數個數不符。SQL:$sql,引數:" . print_r($params, true));
}
$p = [];
foreach ($matches as $flag) {
if (!array_key_exists($flag, $params)) {
throw new \Exception("SQL 佔位符與引數不符。SQL:$sql,引數:" . print_r($params, true));
}
$value = $params[$flag];
if ($this->isExpression($value)) {
$sql = preg_replace("/:$flag(?=\s|$)/", $value, $sql);
} else {
$p[] = $value;
}
}
$sql = preg_replace('/:[-a-zA-Z0-9_]+/', '?', $sql);
return [$sql, $p];
}
/**
* 編譯
* 目前僅支援 select,update,insert,replace,delete
* @param bool $reset 編譯後是否重置構造器
* @return array [$preSql, $params]
*/
private function compile(bool $reset = true)
{
if (!$this->type) {
return ['', []];
}
$method = 'compile' . ucfirst($this->type);
if (method_exists($this, $method)) {
$this->rawSqlInfo = $this->$method();
if ($reset) {
$this->reset();
}
return $this->rawSqlInfo;
}
return ['', []];
}
private function compileSelect()
{
$sql = "select $this->fields ";
$params = [];
if ($this->table) {
$sql .= implode(
' ',
array_filter([
'from',
$this->table,
$this->join,
$this->where,
$this->groupBy,
$this->having,
$this->orderBy,
$this->limitStr()
])
);
$params = array_merge($this->joinParams, $this->whereParams, $this->havingParams);
}
return [$this->trimSpace($sql), $params];
}
...
/**
* 條件(where、on、having 等)
* 為了記憶和使用方便,目前只提供了最基本的一些形式,複雜的條件請使用原生寫法
* $conditions 陣列格式:
* // 基本的 and 查詢
* [
* 'uid' => 232,
* 'name' => '里斯',
* 'b.age' => 34,
* 'level_id' => [1,2,3], // in
* 'count' => new Expression('count + 1'),
* ]
*
* [
* "(uid=:uid1 or uid=:uid2) and count=:count", // 原生預處理 SQL
* ['uid1' => 12, 'uid2' => 13, 'count' => new Expression('count+1')]
* ]
* @param string|array $conditions
* @return array [$preSql, $params],$preSql: 用 ? 佔位的預處理 SQL
* @throws \Exception
*/
private function condition($conditions)
{
if (is_string($conditions)) {
return [$conditions, []];
}
if (!$conditions || !is_array($conditions)) {
return [];
}
if (is_int(key($conditions)) && count($conditions) <= 2) {
if (count($conditions) == 1) {
$conditions[1] = [];
}
return $this->prepareSQL($conditions[0], $conditions[1]);
}
$where = '1=1';
$params = [];
foreach ($conditions as $key => $condition) {
$key = $this->plainText($key);
if (is_array($condition)) {
// in 查詢
$where .= " and $key in(" . implode(',', array_fill(0, count($condition), '?')) . ')';
$params = array_merge($params, $condition);
} else {
// = 查詢
if ($this->isExpression($condition)) {
$where .= " and $key = $condition";
} else {
$where .= " and $key = ?";
$params[] = $condition;
}
}
}
return [str_replace('1=1 and ', '', $where), $params];
}
...
}
```
構造器主要提供和 SQL 子句對應的方法來構造和編譯 SQL,並提供對原生 SQL 的支援。
該構造器並未對所有的 SQL 語句做方法上的實現(比如子查詢),只對最常用的功能提供了支援,複雜的 SQL 建議直接寫 SQL 語句(一些框架對複雜 SQL 構造也提供了方法級別的支援,但這其實會帶來使用和維護上的複雜性,它導致 SQL 不夠直觀)。
[完整的查詢模組程式碼](https://github.com/linvanda/mysql/blob/master/src/Query.php)
### 事務
事務是集中管理 SQL 執行上下文的地方,所有的 SQL 都是在事務中執行的(沒有調 begin() 則是隱式事務)。
我們的查詢器是協程安全的,即一個 Query 例項可以在多個協程中併發執行事務而不會相互影響。協程安全性是通過事務模組保證的,這裡需要處理兩個維度的“事務”:資料庫維度和協程維度。不但需要保證資料庫事務的完整執行,還要保證多個協程間的 SQL 執行不會相互影響。
我們先看一個多協程併發執行事務的例子(在兩個子協程中使用同一個 Query 例項執行事務:先從資料庫查詢使用者資訊,然後更新姓名):
```php
$query = new Query(...);
for ($i = 0; $i < 2; $i++) {
go(function () use ($query) {
$query->begin();
$user = $query->select("uid,name")->from("users")->where("phone=:phone", ["phone" => "13908987654"])->one();
$query->update('users')->set(['name' => "李四"])->where("uid=:uid", ['uid' => $user['uid']])->execute();
$query->commit();
});
}
```
上面程式碼執行步驟如圖:
![](https://img2020.cnblogs.com/blog/1997761/202005/1997761-20200502185606932-219101862.png)
在上圖兩個協程不斷切換過程中,各自的事務是在獨立執行的,互不影響。
現實中,我們會在倉儲中使用查詢器,每個倉儲持有一個查詢器例項,而倉儲是單例模式,多協程共享的,因而查詢器也是多協程共享的。如下:
```php
/**
* MySQL 倉儲基類
* 倉儲是單例模式(通過容器實現單例),多協程會共享同一個倉儲例項
*/
abstract class MySQLRepository extends Repository implements ITransactional
{
/**
* 查詢器
*/
protected $query;
public function __construct()
{
if (!$this->dbAlias()) {
throw new \Exception('dbName can not be null');
}
// 通過工廠建立查詢器例項
$this->query = MySQLFactory::build($this->dbAlias());
}
...
}
```
事務模組是如何實現協程併發事務的隔離性呢?我們用協程上下文 `TContext` 類實現協程間資料的隔離,事務類 `CoTransaction` 持有 `TContext` 例項,事務中所有的狀態資訊都通過 `TContext` 存取,以實現協程間狀態資料互不影響。
我們先看看協程上下文類:
```php
class TContext implements \ArrayAccess
{
private $container = [];
...
public function offsetGet($offset)
{
if (!isset($this->container[Co::getuid()])) {
return null;
}
return $this->container[Co::getuid()][$offset] ?? null;
}
public function offsetSet($offset, $value)
{
$cuid = Co::getuid();
if (!isset($this->container[$cuid])) {
$this->init();
}
$this->container[$cuid][$offset] = $value;
}
private function init()
{
$this->container[Co::getuid()] = [];
// 協程退出時需要清理當前協程上下文
Co::defer(function () {
unset($this->container[Co::getuid()]);
});
}
}
```
協程上下文內部通過 $container 陣列維護每個協程的資料。該類實現了 ArrayAccess 介面,可以通過下標訪問,如:
```php
// 建立上下文例項
$context = new TContext();
// 設定當前協程的資料
$context["model"] = "write";
// 訪問當前協程的資料
$context["model"];
```
再看看事務。
事務介面定義:
```php
interface ITransaction
{
public function begin(string $model = 'write', bool $isImplicit = false): bool;
/**
* 傳送 SQL 指令
*/
public function command(string $preSql, array $params = []);
/**
* 提交事務
* @param bool $isImplicit 是否隱式事務,隱式事務不會向 MySQL 提交 commit (要求資料庫伺服器開啟了自動提交的配置)
* @return bool
* @throws \Exception
*/
public function commit(bool $isImplicit = false): bool;
public function rollback(): bool;
/**
* 獲取或設定當前事務執行模式
* @param string 讀/寫模式 read/write
* @return string 當前事務執行模式
*/
public function model(?string $model = null): string;
...
/**
* 獲取一次事務中執行的 SQL 列表
* @return array
*/
public function sql():array;
}
```
上面介面定義了事務管理器的主要工作:開啟事務、執行 SQL、提交/回滾事務以及和本次事務執行相關的資訊。
我們再來看看它的實現類 `CoTransaction`,該類是整個查詢器中最重要的類,我們把整個類的程式碼完整貼出來:
```php
/**
* 協程版事務管理器
* 注意:事務開啟直到提交/回滾的過程中會一直佔用某個 IConnector 例項,如果有很多長事務,則會很快耗完連線池資源
*/
class CoTransaction implements ITransaction
{
private $pool;
// 事務的所有狀態資訊(執行狀態、SQL、執行模式、執行結果等)都是儲存在上下文中
private $context;
/**
* 建立事務例項時需要提供連線池,並在內部建立該事物的協程上下文例項
*/
public function __construct(IPool $pool)
{
$this->pool = $pool;
$this->context = new TContext();
}
public function __destruct()
{
// 如果事務沒有結束,則回滾
if ($this->isRunning()) {
$this->rollback();
}
}
/**
* 開啟事務
*/
public function begin(string $model = 'write', bool $isImplicit = false): bool
{
// 如果事務已經開啟了,則直接返回
if ($this->isRunning()) {
return true;
}
// 事務模式(決定從讀連線池還是寫連線池拿連線物件)
$this->model($model);
// 設定事務執行狀態
$this->isRunning(true);
// 獲取資料庫連線
try {
if (!($connector = $this->connector())) {
throw new ConnectException("獲取連線失敗");
}
} catch (\Exception $exception) {
$this->isRunning(false);
throw new TransactionException($exception->getMessage(), $exception->getCode());
}
// 開啟新事務前,需要清除上一次事務的資料
$this->resetLastExecInfo();
$this->clearSQL();
// 呼叫資料庫連線物件的 begin 方法開始事務(如果是隱式事務則不呼叫)
return $isImplicit || $connector->begin();
}
/**
* 執行 SQL 指令
* 如果是隱式事務,則在該方法中自動呼叫 begin 和 commit 方法
*/
public function command(string $preSql, array $params = [])
{
if (!$preSql) {
return false;
}
// 是否隱式事務:外界沒有呼叫 begin 而是直接呼叫 command 則為隱式事務
$isImplicit = !$this->isRunning();
// 如果是隱式事務,則需要自動開啟事務
if ($isImplicit && !$this->begin($this->calcModelFromSQL($preSql), true)) {
return false;
}
// 執行 SQL
$result = $this->exec([$preSql, $params]);
// 隱式事務需要及時提交
if ($isImplicit && !$this->commit($isImplicit)) {
return false;
}
return $result;
}
/**
* 提交事務
*/
public function commit(bool $isImplicit = false): bool
{
if (!$this->isRunning()) {
return true;
}
$result = true;
if (!$isImplicit) {
// 顯式事務才需要真正提交到 MySQL 伺服器
if ($conn = $this->connector(false)) {
$result = $conn->commit();
if ($result === false) {
// 執行失敗,試圖回滾
$this->rollback();
return false;
}
} else {
return false;
}
}
// 釋放事務佔用的資源
$this->releaseTransResource();
return $result;
}
/**
* 回滾事務
* 無論是提交還是回滾,都需要釋放本次事務佔用的資源
*/
public function rollback(): bool
{
if (!$this->isRunning()) {
return true;
}
if ($conn = $this->connector(false)) {
$conn->rollback();
}
$this->releaseTransResource();
return true;
}
/**
* 獲取或設定當前事務執行模式
*/
public function model(?string $model = null): string
{
// 事務處於開啟狀態時不允許切換執行模式
if (!isset($model) || $this->isRunning()) {
return $this->context['model'];
}
$this->context['model'] = $model === 'read' ? 'read' : 'write';
return $model;
}
public function lastInsertId()
{
return $this->getLastExecInfo('insert_id');
}
public function affectedRows()
{
return $this->getLastExecInfo('affected_rows');
}
public function lastError()
{
return $this->getLastExecInfo('error');
}
public function lastErrorNo()
{
return $this->getLastExecInfo('error_no');
}
/**
* 本次事務執行的所有 SQL
* 該版本並沒有做記錄
*/
public function sql(): array
{
return $this->context['sql'] ?? [];
}
/**
* 釋放當前事務佔用的資源
*/
private function releaseTransResource()
{
// 儲存本次事務相關執行結果供外界查詢使用
$this->saveLastExecInfo();
// 歸還連線資源
$this->giveBackConnector();
unset($this->context['model']);
$this->isRunning(false);
}
/**
* 儲存事務最終執行的一些資訊
*/
private function saveLastExecInfo()
{
if ($conn = $this->connector(false)) {
$this->context['last_exec_info'] = [
'insert_id' => $conn->insertId(),
'error' => $conn->lastError(),
'error_no' => $conn->lastErrorNo(),
'affected_rows' => $conn->affectedRows(),
];
} else {
$this->context['last_exec_info'] = [];
}
}
private function resetLastExecInfo()
{
unset($this->context['last_exec_info']);
}
private function getLastExecInfo(string $key)
{
return isset($this->context['last_exec_info']) ? $this->context['last_exec_info'][$key] : '';
}
/**
* 執行指令池中的指令
* @param $sqlInfo
* @return mixed
* @throws
*/
private function exec(array $sqlInfo)
{
if (!$sqlInfo || !$this->isRunning()) {
return true;
}
return $this->connector()->query($sqlInfo[0], $sqlInfo[1]);
}
private function clearSQL()
{
unset($this->context['sql']);
}
private function calcModelFromSQL(string $sql): string
{
if (preg_match('/^(update|replace|delete|insert|drop|grant|truncate|alter|create)\s/i', trim($sql))) {
return 'write';
}
return 'read';
}
/**
* 獲取連線資源
*/
private function connector(bool $usePool = true)
{
if ($connector = $this->context['connector']) {
return $connector;
}
if (!$usePool) {
return null;
}
$this->context['connector'] = $this->pool->getConnector($this->model());
return $this->context['connector'];
}
/**
* 歸還連線資源
*/
private function giveBackConnector()
{
if ($this->context['connector']) {
$this->pool->pushConnector($this->context['connector']);
}
unset($this->context['connector']);
}
private function isRunning(?bool $val = null)
{
if (isset($val)) {
$this->context['is_running'] = $val;
} else {
return $this->context['is_running'] ?? false;
}
}
}
```
該類中,一次 SQL 執行(無論是顯式事務還是隱式事務)的步驟:
> begin -> exec -> commit/rollback
1. begin:
- 判斷是否可開啟新事務(如果已有事務在執行,則不可開啟);
- 設定事務執行模式(read/write);
- 將當前事務狀態設定為 running;
- 獲取連線物件;
- 清理本事務例項中上次事務的痕跡(上下文、SQL);
- 調連線物件的 begin 啟動資料庫事務;
2. exec:
- 呼叫連線物件的 query 方法執行 SQL(prepare 模式);
3. commit:
- 判斷當前狀態是否可提交(running 狀態才可以提交);
- 呼叫連線物件的 commit 方法提交資料庫事務(如果失敗則走回滾);
- 釋放本次事務佔用的資源(儲存本次事務執行的相關資訊、歸還連線物件、清除上下文裡面相關資訊)
4. rollback:
- 判斷當前狀態是否可回滾;
- 呼叫連線物件的 rollback 回滾資料庫事務;
- 釋放本次事務佔用的資源(同上);
**優化:**
類 `CoTransaction` 依賴 `IPool` 連線池,這種設計並不合理(違反了迪米特法則)。從邏輯上說,事務管理類真正依賴的是連線物件,而非連線池物件,因而事務模組應該依賴連線模組而不是連線池模組。讓事務管理類依賴連線池,一方面向事務模組暴露了連線管理的細節, 另一方面意味著如果使用該事務管理類,就必須使用連線池技術。
一種優化方案是,在連線模組提供一個連線管理類供外部(事務模組)取還連線:
```php
interface IConnectorManager
{
public function getConnector() IConnector;
public function giveBackConnector(IConnector $conn);
}
```
將 `IConnectorManager` 注入到 `CoTransaction` 中:
```php
class CoTransaction implements ITransaction
{
...
public function __construct(IConnectorManager $connMgr)
{
...
}
}
```
連線管理器 `IConnectorManager` 承擔了工廠方法角色,至此,事務模組僅依賴連線模組,而不用依賴連線池。
### 連線池
連線池模組由 IPool 介面和 CoPool 實現類組成。
連線池模組和連線模組之間的關係比較巧妙(上面優化後的方案)。從高層(介面層面)來說,連線池模組依賴連線模組:連線池操作(取還)`IConnector` 的例項;從實現上來說,連線模組同時又依賴連線池模組:`PoolConnectorManager`(使用連線池技術的連線管理器)依賴連線池模組來操作連線物件(由於該依賴是實現層面的而非介面層面,因而它不是必然的,如果連線管理器不使用連線池技術則不需要依賴連線池模組)。“連線管理器”這個角色很重要:它對外(事務模組)遮蔽了連線池模組的存在,代價是在內部引入了對連線池模組的依賴(也就是用內部依賴換外部依賴)。
經過上面的分析我們得出,連線池模組和連線模組具有較強的耦合性,連線模組可以對外遮蔽掉連線池模組的存在,因而在設計上我們可以將這兩個模組看成一個大模組放在一個目錄下面,在該目錄下再細分成兩個內部模組即可。
我們先看看連線池介面。
```php
interface IPool
{
/**
* 從連線池中獲取連線物件
*/
public function getConnector(string $type = 'write'): IConnector;
/**
* 歸還連線
*/
public function pushConnector(IConnector $connector): bool;
/**
* 連線池中連線數
* @return array ['read' => 3, 'write' => 3]
*/
public function count(): array;
/**
* 關閉連線池
*/
public function close(): bool;
}
```
從上面介面定義我們發現,該連線池並非通用連線池,而是針對資料庫連線做的定製(`count()` 方法返回的資料裡面有 read、write,它暴露了一個細節:該連線池內部維護了讀寫兩種連線池)。此設計也透露出該模組和連線模組的強耦合性。
實現類 `CoPool` 稍顯複雜,我們貼出程式碼:
```php
class CoPool implements IPool
{
...
protected static $container = [];
protected $readPool;
protected $writePool;
// 資料庫連線生成器,連線池使用此生成器建立連線物件
protected $connectorBuilder;
// 連線池大小
protected $size;
// 記錄每個連線的相關資訊
protected $connectsInfo = [];
// 當前存活的連線數(包括不在池中的)
protected $connectNum;
// 讀連線數
protected $readConnectNum;
// 寫連線數
protected $writeConnectNum;
protected $maxSleepTime;
protected $maxExecCount;
protected $status;
// 連續等待連線物件失敗次數(一般是長事務導致某次事務處理長時間佔用連線資源)
protected $waitTimeoutNum;
protected function __construct(IConnectorBuilder $connectorBuilder, int $size = 25, int $maxSleepTime = 600, int $maxExecCount = 1000)
{
// 建立讀寫 Channel
$this->readPool = new co\Channel($size);
$this->writePool = new co\Channel($size);
...
}
/**
* 單例
* 實際是偽單例
*/
public static function instance(IConnectorBuilder $connectorBuilder, int $size = 25, int $maxSleepTime = 600, int $maxExecCount = 1000): CoPool
{
// 同一個連線生成器建立的連線物件由同一個連線池管理
if (!isset(static::$container[$connectorBuilder->getKey()])) {
static::$container[$connectorBuilder->getKey()] = new static($connectorBuilder, $size, $maxSleepTime, $maxExecCount);
}
return static::$container[$connectorBuilder->getKey()];
}
/**
* 從連線池中獲取 MySQL 連線物件
*/
public function getConnector(string $type = 'write'): IConnector
{
if (!$this->isOk()) {
throw new PoolClosedException("連線池已經關閉,無法獲取連線");
}
// 根據讀寫模式選擇是使用讀連線池還是寫連線池
$pool = $this->getPool($type);
// 連線池是空的,試圖建立連線
if ($pool->isEmpty()) {
// 超額,不能再建立,需等待
// 此處試圖建立的數量大於連線池真正大小,是為了應對高峰期等異常情況。歸還的時候多創建出來的會直接被關閉掉(隨建隨銷)
if (($type == 'read' ? $this->readConnectNum : $this->writeConnectNum) > $this->size * 6) {
// 多次等待失敗,則直接返回
// 優化點:這裡面沒有針對讀寫池分別計數
if ($this->waitTimeoutNum > self::MAX_WAIT_TIMEOUT_NUM) {
// 超出了等待失敗次數限制,直接拋異常
throw new ConnectFatalException("多次獲取連線超時,請檢查資料庫伺服器狀態");
}
// 等待連線歸還
$conn = $pool->pop(4);
// 等待失敗
if (!$conn) {
switch ($pool->errCode) {
case SWOOLE_CHANNEL_TIMEOUT:
// 優化:要區分讀池子超時還是寫池子超時
$this->waitTimeoutNum++;
$errMsg = "獲取連線超時";
break;
case SWOOLE_CHANNEL_CLOSED:
$errMsg = "獲取連線失敗:連線池已關閉";
break;
default:
$errMsg = "獲取連線失敗";
break;
}
throw new ConnectException($errMsg);
}
} else {
try {
// 建立新連線
$conn = $this->createConnector($type);
} catch (ConnectException $exception) {
if ($exception->getCode() == 1040) {
// 連線資料庫時失敗:Too many connections,此時需要等待被佔用的連線歸還
// 這裡可以優化下:先判斷下該連線池有無在維護的連線,有的話才等待
$conn = $pool->pop(4);
}
if (!$conn) {
// 等待連線超時,記錄超時次數
if ($pool->errCode == SWOOLE_CHANNEL_TIMEOUT) {
$this->waitTimeoutNum++;
}
throw new ConnectException($exception->getMessage(), $exception->getCode());
}
}
}
} else {
// 從連線池獲取
$conn = $pool->pop(1);
}
// 變更該連線的狀態資訊
$connectInfo = $this->connectInfo($conn);
$connectInfo->popTime = time();
$connectInfo->status = ConnectorInfo::STATUS_BUSY;
// 成功拿到連線,將等待次數清零
$this->waitTimeoutNum = 0;
return $conn;
}
/**
* 歸還連線
* @param IConnector $connector
* @return bool
*/
public function pushConnector(IConnector $connector): bool
{
if (!$connector) {
return true;
}
$connInfo = $this->connectInfo($connector);
$pool = $this->getPool($connInfo->type);
// 如果連線池有問題(如已關閉)、滿了、連線有問題,則直接關閉
if (!$this->isOk() || $pool->isFull() || !$this->isHealthy($connector)) {
return $this->closeConnector($connector);
}
// 變更連線狀態
if ($connInfo) {
$connInfo->status = ConnectorInfo::STATUS_IDLE;
$connInfo->pushTime = time();
}
return $pool->push($connector);
}
/**
* 關閉連線池
* @return bool
*/
public function close(): bool
{
$this->status = self::STATUS_CLOSED;
// 關閉通道中所有的連線。等待5ms為的是防止還有等待push的排隊協程
while ($conn = $this->readPool->pop(0.005)) {
$this->closeConnector($conn);
}
while ($conn = $this->writePool->pop(0.005)) {
$this->closeConnector($conn);
}
// 關閉通道
$this->readPool->close();
$this->writePool->close();
return true;
}
public function count(): array
{
return [
'read' => $this->readPool->length(),
'write' => $this->writePool->length()
];
}
protected function closeConnector(IConnector $connector)
{
if (!$connector) {
return true;
}
$objId = $this->getObjectId($connector);
// 關閉連線
$connector->close();
// 處理計數
$this->untickConnectNum($this->connectsInfo[$objId]->type);
// 刪除統計資訊
unset($this->connectsInfo[$objId]);
return true;
}
protected function isOk()
{
return $this->status == self::STATUS_OK;
}
protected function getPool($type = 'write'): co\Channel
{
if (!$type || !in_array($type, ['read', 'write'])) {
$type = 'write';
}
return $type === 'write' ? $this->writePool : $this->readPool;
}
/**
* 建立新連線物件
* @param string $type
* @return IConnector
* @throws \Dev\MySQL\Exception\ConnectException
*/
protected function createConnector($type = 'write'): IConnector
{
$conn = $this->connectorBuilder->build($type);
if ($conn) {
// 要在 connect 前 tick(計數),否則無法阻止高併發協程打入(因為 connect 會造成本協程控制權讓出,此時本次計數還沒有增加)
$this->tickConnectNum($type);
try {
$conn->connect();
} catch (ConnectException $exception) {
// 撤銷 tick
$this->untickConnectNum($type);
throw new ConnectException($exception->getMessage(), $exception->getCode());
}
// 建立本連線的統計資訊例項
$this->connectsInfo[$this->getObjectId($conn)] = new ConnectorInfo($conn, $type);
}
return $conn;
}
protected function tickConnectNum(string $type)
{
$this->changeConnectNum($type, 1);
}
protected function untickConnectNum(string $type)
{
$this->changeConnectNum($type, -1);
}
private function changeConnectNum(string $type, $num)
{
$this->connectNum = $this->connectNum + $num;
if ($type == 'read') {
$this->readConnectNum = $this->readConnectNum + $num;
} else {
$this->writeConnectNum = $this->writeConnectNum + $num;
}
}
/**
* 檢查連線物件的健康情況,以下情況視為不健康:
* 1. SQL 執行次數超過閾值;
* 2. 連線物件距最後使用時間超過閾值;
* 3. 連線物件不是連線池建立的
* @param IConnector $connector
* @return bool
*/
protected function isHealthy(IConnector $connector): bool
{
$connectorInfo = $this->connectInfo($connector);
if (!$connectorInfo) {
return false;
}
// 如果連線處於忙態(一般是還處於事務未提交狀態),則一律返回 ok
if ($connectorInfo->status === ConnectorInfo::STATUS_BUSY) {
return true;
}
if (
$connectorInfo->execCount() >= $this->maxExecCount ||
time() - $connectorInfo->lastExecTime() >= $this->maxSleepTime
) {
return false;
}
return true;
}
protected function connectInfo(IConnector $connector): ConnectorInfo
{
return $this->connectsInfo[$this->getObjectId($connector)];
}
protected function getObjectId($object): string
{
return spl_object_hash($object);
}
}
```
這裡有幾點需要注意:
1. 連線池使用的是偽單例模式,同一個生成器對應的是同一個連線池例項;
1. 連線池內部維護了讀寫兩個池子,生成器生成的讀寫連線物件分別放入對應的池子裡面;
1. 從連線池取連線物件的時候,如果連線池為空,則根據情況決定是建立新連線還是等待。此處並非是在池子滿了的情況下就等待,而是會超額建立,為的是應對峰值等異常情況。當然一個優化點是,將溢位比例做成可配置的,由具體的專案決定溢位多少。另外,如果建立新連線的時候資料庫伺服器報連線過多的錯誤,也需要轉為等待連線歸還;
1. 如果多次等待連線失敗(超時),則後面的請求會直接丟擲異常(直到池子不為空)。這裡有個優化點:目前的實現沒有區分是讀池子超時還是寫池子超時;
1. 歸還連線時,如果池子滿了,或者連線壽命到期了,則直接關閉連線;
後面在連線模組會講解連線生成器,到時我們會知道一個連線池例項到底維護的是哪些連線物件。
### 連線
連線模組負責和資料庫建立連線併發出 SQL 請求,其底層使用 Swoole 的 MySQL 驅動。連線模組由**連線物件**和**連線生成器**構成,對外暴露 `IConnector` 和 `IConnectorBuilder` 介面。
(在我們的優化版本中,一方面引入了**連線管理器** `IConnectorManager`,另一方面將**連線模組**和**連線池模組**合併成一個大模組,因而整個連線模組對外暴露的是 `IConnectorManager` 和 `IConnector` 兩個介面。)
連線物件的實現比較簡單,我們重點看下 `CoConnector` 裡面查詢的處理:
```php
class CoConnector implements IConnector
{
...
public function __destruct()
{
$this->close();
}
public function connect(): bool
{
if ($this->mysql->connected) {
return true;
}
$conn = $this->mysql->connect($this->config);
if (!$conn) {
throw new ConnectException($this->mysql->connect_error, $this->mysql->connect_errno);
}
return $conn;
}
/**
* 執行 SQL 語句
*/
public function query(string $sql, array $params = [], int $timeout = 180)
{
$prepare = $params ? true : false;
$this->execCount++;
$this->lastExecTime = time();
// 是否要走 prepare 模式
if ($prepare) {
$statement = $this->mysql->prepare($sql, $timeout);
// 失敗,嘗試重新連線資料庫
if ($statement === false && $this->tryReconnectForQueryFail()) {
$statement = $this->mysql->prepare($sql, $timeout);
}
if ($statement === false) {
$result = false;
goto done;
}
// execute
$result = $statement->execute($params, $timeout);
// 失敗,嘗試重新連線資料庫
if ($result === false && $this->tryReconnectForQueryFail()) {
$result = $statement->execute($params, $timeout);
}
} else {
$result = $this->mysql->query($sql, $timeout);
// 失敗,嘗試重新連線資料庫
if ($result === false && $this->tryReconnectForQueryFail()) {
$result = $this->mysql->query($sql, $timeout);
}
}
done:
$this->lastExpendTime = time() - $this->lastExecTime;
$this->peakExpendTime = max($this->lastExpendTime, $this->peakExpendTime);
return $result;
}
...
/**
* 失敗重連
*/
private function tryReconnectForQueryFail()
{
if ($this->mysql->connected || !in_array($this->mysql->errno, [2006, 2013])) {
return false;
}
// 嘗試重新連線
$connRst = $this->connect();
if ($connRst) {
// 連線成功,需要重置以下錯誤(swoole 在重連成功後並沒有重置這些屬性)
$this->mysql->error = '';
$this->mysql->errno = 0;
$this->mysql->connect_error = '';
$this->mysql->connect_errno = 0;
}
return $connRst;
}
...
}
```
這裡重點關注下查詢的時候斷線重連機制:先嚐試執行 SQL,如果返回連線失敗,則嘗試重新連線。
我們再看看連線生成器:
```php
class CoConnectorBuilder implements IConnectorBuilder
{
protected static $container = [];
protected $writeConfig;
protected $readConfigs;
protected $key;
/**
* 建立生成器的時候需要提供讀寫連線配置,其中讀配置是一個數組(可以有多個讀連線)
*/
protected function __construct(DBConfig $writeConfig = null, array $readConfigs = [])
{
$this->writeConfig = $writeConfig;
$this->readConfigs = $readConfigs;
}
/**
* 這裡採用偽單例:同樣的讀寫配置使用同一個生成器
*/
public static function instance(DBConfig $writeConfig = null, array $readConfigs = []): CoConnectorBuilder
{
if ($writeConfig && !$readConfigs) {
$readConfigs = [$writeConfig];
}
$key = self::calcKey($writeConfig, $readConfigs);
if (!isset(self::$container[$key])) {
$builder = new static($writeConfig, $readConfigs);
$builder->key = $key;
self::$container[$key] = $builder;
}
return self::$container[$key];
}
/**
* 建立並返回 IConnector 物件
* @param string $connType read/write
* @return IConnector
*/
public function build(string $connType = 'write'): IConnector
{
/** @var DBConfig */
$config = $connType == 'read' ? $this->getReadConfig() : $this->writeConfig;
if (!($config instanceof DBConfig)) {
return null;
}
return new CoConnector($config->host, $config->user, $config->password, $config->database, $config->port, $config->timeout, $config->charset);
}
public function getKey(): string
{
return $this->key;
}
private function getReadConfig()
{
return $this->readConfigs[mt_rand(0, count($this->readConfigs) - 1)];
}
/**
* 根據配置計算 key,完全相同的配置對應同樣的 key
*/
private static function calcKey(DBConfig $writeConfig = null, array $readConfigs = []): string
{
$joinStr = function ($conf)
{
$arr = [
$conf->host,
$conf->port,
$conf->user,
$conf->password,
$conf->database,
$conf->charset,
$conf->timeout,
];
sort($arr);
return implode('-', $arr);
};
$readArr = [];
foreach ($readConfigs as $readConfig) {
$readArr[] = $joinStr($readConfig);
}
sort($readArr);
return md5($joinStr($writeConfig) . implode('$', $readArr));
}
}
```
該生成器是針對一主多從資料庫架構的(包括未走讀寫分離的),如果使用是是其他資料庫架構(如多主架構),則建立其他生成器即可。
同一套讀寫配置使用同一個生成器,對應的連線池也是同一個。
`DBConfig` 是一個 DTO 物件,不再闡述。
### 查詢器的組裝
使用工廠組裝查詢器例項:
```php
class MySQLFactory
{
/**
* @param string $dbAlias 資料庫配置別名,對應配置檔案中資料庫配置的 key
*/
public static function build(string $dbAlias): Query
{
// 從配置檔案獲取資料庫配置
$dbConf = Config::getInstance()->getConf("mysql.$dbAlias");
if (!$dbConf) {
throw new ConfigNotFoundException("mysql." . $dbAlias);
}
if (!isset($dbConf['read']) && !isset($dbConf['write'])) {
$writeConf = $dbConf;
$readConfs = [$writeConf];
} else {
$writeConf = $dbConf['write'] ?? [];
$readConfs = $dbConf['read'] ?? [$writeConf];
}
$writeConfObj = self::createConfObj($writeConf);
$readConfObjs = [];
foreach ($readConfs as $readConf) {
$readConfObjs[] = self::createConfObj($readConf);
}
// 建立生成器、連線池、事務管理器
// 在優化後版本中,用連線管理器代替連線池的位置即可
$mySQLBuilder = CoConnectorBuilder::instance($writeConfObj, $readConfObjs);
$pool = CoPool::instance($mySQLBuilder, $dbConf['pool']['size'] ?? 30);
$transaction = new CoTransaction($pool);
return new Query($transaction);
}
private static function createConfObj(array $config): DBConfig
{
if (!$config) {
throw new Exception("config is null");
}
return new DBConfig(
$config['host'],
$config['user'],
$config['password'],
$config['database'],
$config['port'] ?? 3306,
$config['timeout'] ?? 3,
$config['charset'] ?? 'utf8'
);
}
}
```
至此,整個查詢器的編寫、建立和使用就完成了。
### 總結
1. 專案的開發需要劃分模組,模組之間儘量減少耦合,通過介面通訊(模組之間依賴介面而不是實現);
1. 如果兩個模組之間具有強耦合性,則往往意味著兩者本身應該歸併到同一個模組中,在其內部劃分子模組,對外遮蔽內部細節,如本專案的連線模組和連線池模組;
1. 如果模組之間存在不合常理的依賴關係,則意味著模組劃分有問題,如本專案中的事務模組依賴連線池模組;
1. 有問題的模組劃分往往違反第一點(也就是迪米特法則),會造成模組暴露細節、過多的依賴關係,影響設計的靈活性、可擴充套件性,如本專案中事務模組依賴連線池模組(雖然是實現層面的依賴而非介面層面),造成要使用 `CoTransaction` 時必須同時使用連線池;
1. 編寫生產可用的專案時需要注意處理異常場景,如本專案中從連線池獲取連線物件,以及在連線物件上執行 SQL 時的斷線重連;
1. 設計本身是迭代式的,並非一蹴而就、一次性設計即可完成的,本專案在開發過程中已經經歷過幾次小重構,在本次分析時仍然發現一些設計上的缺陷。重構屬於專案開發的一部分;
優化版 UML 圖:
![](https://img2020.cnblogs.com/blog/1997761/202005/1997761-20200502185658990-6097055