php搭建簡單rpc(解決mongodb連線數的問題)
阿新 • • 發佈:2019-02-11
rpc解釋
RPC 的全稱是 Remote Procedure Call 是一種程序間通訊方式。它允許程式呼叫另一個地址空間(通常是共享網路的另一臺機器上)的過程或函式,而不用程式設計師顯式編碼這個遠端呼叫的細節。即程式設計師無論是呼叫本地的還是遠端的,本質上編寫的呼叫程式碼基本相同。
某網上解釋,具體基礎解釋可以自行百度,其實我通俗的解釋好了就是遠端呼叫方法
現在有兩臺伺服器A和B,這時候兩臺伺服器分別執行不同的php服務。這時候兩個服務想要互相呼叫彼此的程式碼最原始的方式就是彼此開放api介面進行http協議互動。
然後我們可以使用rpc進行代理直接本地伺服器代理執行返回結果,rpc使用的C(client)/S(server)結構
目前支援rpc的框架有很多 swoole 和 workman 不一定需要自己手動構建rpc
流程
- server端tcp伺服器(steam_socket_server)
- client連線tcp伺服器(steam_socket_clinet)
- server端解析接受的位元組
- 本地執行結果
- server端將執行結果傳輸回client端
rpc可以解決mongodb無限連線池的問題,mongodb的連線數跟伺服器php-fpm掛載數相關係 理論值為mongodb的連線數 = 伺服器php-fpm總數 - 2 如果負載的伺服器多了 連線數很容易撐不下去。因為我們採用的是api伺服器直接去連線mongodb沒有架設一層代理層,這就是為什麼我要去構建rpc層的主要原因
server層程式碼
<?php
$errno = 0;
$errstr = '';
//先握手 判斷 埠是否能正常註冊
// creating the socket...
$socket = stream_socket_server('tcp://127.0.0.1:8850', $errno, $errstr);
if (!$socket)
{
exit("[create socket error]:{$errno}-{$errstr}");
}
fclose($socket );
//receive message
while (true)
{
// disconnected every 1 seconds...
$this->receiveMessage('127.0.0.1','8850',1);
}
function receiveMessage($ipServer,$portNumber,$nbSecondsIdle)
{
$errno = 0;
$errstr = '';
// creating the socket...
$socket = stream_socket_server('tcp://'.$ipServer.':'.$portNumber, $errno, $errstr);
//設定堵塞 類似 redis的佇列堵塞 直到拿到資料為止
stream_set_blocking($socket,1);
if (!$socket)
{
$logData = [
'errno' => $errno,
'errstr' => $errstr,
'msg' => 'create socket error'
];
CoobarLog::error($logData,'rpcerror');
echo "$errstr ($errno)" . PHP_EOL;
}
else
{
// while there is connection, i'll receive it... if I didn't receive a message within $nbSecondsIdle seconds, the following function will stop.
while ($conn = @stream_socket_accept($socket,$nbSecondsIdle))
{
$message = fread($conn, 2048);
//1.解析message 進行安全驗證
//2.根據message的引數進行本地代理執行 參考我目前的協議 module class function parmas 通過module+class+function 定位需要代理的方法然後call_user_func_array去呼叫
//3.返回本地執行結果
$result = **********
fwrite($conn,json_encode($result));
fclose($conn);
}
fclose($socket);
}
}
推薦rpc的啟動方式使用nohup 後臺註冊執行緒方式
nohup php rpcserver.php &
client程式碼
<?php
/**
* Created by PhpStorm.
* User: hls
* Date: 2017/11/16
* Time: 下午3:03
*/
namespace phprpc;
class RpcClient
{
private static $config = [];
private $requestBody = [];
private static $init = true;
const LENGTH_MAX = 4028;
/**
* 例項化
* @param $config
*/
public static function instance($config)
{
if (!isset($config['host']))
throw new \Exception('no rpc host',100);
if (!isset($config['port']))
throw new \Exception('no rpc port',101);
self::$config = $config;
self::$init = false;
return new self();
}
/**
* @param $module 模組名
* @param $class 類名
* @param array $params 引數 陣列
* @return array|mixed|\stdClass
* @throws RpcClientException
*/
public function sendMessage($module,$class,$function,array $params)
{
if (self::$init)
throw new \Exception('no instance rpc',102);
$connection = $this->connect();
$this->setRequestBody($module,$class,$function,$params);
return $this->send($connection,json_encode(self::getRequestBody()));
}
/**
* 傳送訊息
* @param $connection
* @param $string
* @return array|mixed|\stdClass
* @throws RpcClientException
*/
private function send($connection,$string)
{
fwrite($connection,$string);
$returnData = fgets($connection,self::LENGTH_MAX);
fclose($connection);
if (empty(json_decode($returnData,true)))
throw new \Exception('rpc server return data not illegal');
return json_decode($returnData,true);
}
/**
* 連線rpc伺服器
* @param array $config
* @return bool|null|resource
* @throws RpcClientException
*/
private function connect($config = [])
{
$errno = '';
$errstr = '';
if (empty($config)) {
$config = self::$config;
}
$connection = stream_socket_client("tcp://{$config['host']}:{$config['port']}",$errno,$errstr);
//重連兩次
for ($i = 0;$i < 2;$i++) {
if (!$connection) {
$connection = stream_socket_client("tcp://{$config['host']}:{$config['port']}",$errno,$errstr);
} else {
break;
}
}
if (!$connection)
throw new \Exception($errstr,$errno);
return $connection;
}
/**
* 獲取請求包體
* @return array
*/
private function getRequestBody()
{
return [
'username' => $this->requestBody['username'],
'password' => $this->requestBody['password'],
'module' => $this->requestBody['module'],
'class' => $this->requestBody['class'],
'function' => $this->requestBody['function'],
'params' => $this->requestBody['params'],
];
}
/**
* 設定請求包體
* @param $module
* @param $class
* @param $params
* @return bool
*/
public function setRequestBody($module,$class,$function,$params)
{
$this->requestBody['username'] = isset(self::$config['username']) ? self::$config['username'] : '';
$this->requestBody['password'] = isset(self::$config['password']) ? self::$config['password'] : '';
$this->requestBody['module'] = $module;
$this->requestBody['class'] = $class;
$this->requestBody['function'] = $function;
$this->requestBody['params'] = $params;
return true;
}
}
客戶端呼叫
$config = ['host' => '10.46.231.6','port' => 5200,'username' => 'coobar','password' => '52pAVVvxBAIGDPPj'];
$result = RpcClient::instance($config)->sendMessage('mongodb','Handle','getConfig',[TurntableConfig::getConfigName(),'172']);
就是比較簡單的rpc的原型
還需要在這上面增加優化的地方
- rpc-client端需要支援多個rpc-server的輪詢支援,防止單臺rpc-server伺服器壓力過大或者伺服器崩潰 導致rpc伺服器失敗
- 對rpc-server端進行許可權認證
- 對rpc-client 進行重連機制,我目前程式碼使用的預設重連2次
- rpc-server端的日誌體系,需要記錄每次執行的上行引數和下行返回結果,方便排查問題
- rpc-server端進行異常監聽,這個很重要,很多東西都是通過實踐進行優化
最後開心的是
使用rpc呼叫mongodb後,連線數不漲啦不漲啦!!!但是現在還是需要進行各種的壓測和特殊情況實踐,如果直接用於生產環境估計夠嗆的!