PRedis的cluster模式原始碼詳解
阿新 • • 發佈:2019-12-31
核心邏輯原始碼部分可以不看,因為predis原始碼很繞,我就是把程式碼貼過來而已,有興趣的童鞋可以自己去追下
更多原始碼詳解已釋出到https://github.com/Zhucola/php_frameworks_analysis,歡迎star~
目錄
整體執行流程
- 初始化專案(原始碼很繞,有興趣的同學可以自己去追一下,就是Client的構造方法)
- 假如客戶端執行set操作,會根據key做slot,並且根據slot去獲取一個節點
- 如果節點配置中指定了slots,就獲取指定的slot對應的節點(如50的slot會分配到7000埠,注意這個可能不是真實的redis槽的分配關係,比如redis做了槽的修改而PHP程式沒有更新)
- 如果節點配置中沒有指定slots或者指定的slots不匹配,就去猜節點(guessNode),猜節點會認為你槽分配是平均的(如有3個節點,配置列表中的第一個節點會認為是0-5469,第二個是5461-10922,第三個是10923-16383,然後用slot去匹配節點)
指定slots對節點對應關係的配置如下
$redis_list = [
'redis://192.168.124.10:7000?slots=1-100,500-1000','redis://192.168.124.10:7001?slots=101-499','redis://192.168.124.10:7002'
];
$redis = new Client($redis_list ,['cluster'=>'redis']);
複製程式碼
猜節點演演算法如下
$count = count($this->pool);
$index = min((int) ($slot / (int) (16384 / $count)),$count - 1);
$nodes = array_keys($this->pool);
return $nodes[$index];
複製程式碼
- 將set命令格式化,如果命令是
$redis->set("a",1234);
複製程式碼
會變成
*3
$3
SET
$1
a
$4
1234
複製程式碼
- 建立與節點的連線,並且傳送密碼和資料庫號(如果配置的話)
- 如果連線失敗,將該節點從配置列表中刪除,然後從配置列表中隨機取一個節點去做cluster slots操作,獲取所有節點真實的slot資訊、主從關係資訊,然後重新去用真實的對應關係去匹配節點,用真實的節點重新連線
- 然後節點再次連線失敗,就直接失敗了,程式碼寫死了只有一次重試機會
- 將格式化後的命令發給redis節點
- 如果命令傳送失敗,將該節點從配置列表中刪除,然後從配置列表中隨機取一個節點重新連線,做cluster slots,獲取所有節點真實的slot資訊、主從關係資訊,然後重新去用真實的對應關係去匹配節點,用真實的節點重新連線,可能重新選擇的節點和發生失敗的節點是一個,如果再次失敗就直接異常
- 這裡第一次傳送命令失敗其實連線的是主節點,如果主節點宕機,redis會將從提升為主,然後PHP獲取cluster slots資訊的時候,會獲取新提升為主節點的這個資訊(比如配置了7000埠的master,連線上了7000埠但是在傳送命令時候7000宕機,就會獲取cluster slots,然後發現7000已經變成了slave,將7005提升成了master,就會去連線7005)
- 如果命令傳送失敗,將該節點從配置列表中刪除,然後從配置列表中隨機取一個節點重新連線,做cluster slots,獲取所有節點真實的slot資訊、主從關係資訊,然後重新去用真實的對應關係去匹配節點,用真實的節點重新連線,可能重新選擇的節點和發生失敗的節點是一個,如果再次失敗就直接異常
- 讀取redis節點的響應
- 有MOVED情況,就是節點與slot的對應關係和真實的redis伺服器不一樣,處理MOVED資訊,獲取真實的slot和節點資訊(從響應資訊中獲取)
- 給MOVED的節點發cluster slots獲取叢集所有節點的slot資訊、主從關係資訊,然後重新用真實的槽節點去發資訊
- 如果給MOVED節點發cluster slots失敗,就配置的節點列表中刪除此節點,並且隨機返回一個節點去做cluster slots,然後重新發命令(有可能再次發的節點還是MOVED失敗的節點,然後再次失敗就是不可用了;或者再次發的節點是已經被提升為master的slave節點)
- 響應成功,收到OK MOVED的響應
- 有MOVED情況,就是節點與slot的對應關係和真實的redis伺服器不一樣,處理MOVED資訊,獲取真實的slot和節點資訊(從響應資訊中獲取)
- MOVED 15495 127.0.0.1:7002
複製程式碼
成功的響應
+ OK
複製程式碼
CRC16演演算法原始碼
根據key獲取slot的方法在predis\src\Cluster\ClusterStrategy.php裡面,getSlot
public function getSlot(CommandInterface $command)
{
$slot = $command->getSlot();
if (!isset($slot) && isset($this->commands[$cmdID = $command->getId()])) {
$key = call_user_func($this->commands[$cmdID],$command);
if (isset($key)) {
$slot = $this->getSlotByKey($key);
$command->setSlot($slot);
}
}
return $slot;
}
複製程式碼
會呼叫getSlotByKey方法
public function getSlotByKey($key)
{
$key = $this->extractKeyTag($key); //獲取key
$slot = $this->hashGenerator->hash($key) & 0x3FFF; //做hash後和16383取餘
return $slot;
}
複製程式碼
核心的CRC16演演算法在predis\src\Cluster\Hash\CRC16.php,可以直接拿來用
private static $CCITT_16 = array(
0x0000,0x1021,0x2042,0x3063,0x4084,0x50A5,0x60C6,0x70E7,0x8108,0x9129,0xA14A,0xB16B,0xC18C,0xD1AD,0xE1CE,0xF1EF,0x1231,0x0210,0x3273,0x2252,0x52B5,0x4294,0x72F7,0x62D6,0x9339,0x8318,0xB37B,0xA35A,0xD3BD,0xC39C,0xF3FF,0xE3DE,0x2462,0x3443,0x0420,0x1401,0x64E6,0x74C7,0x44A4,0x5485,0xA56A,0xB54B,0x8528,0x9509,0xE5EE,0xF5CF,0xC5AC,0xD58D,0x3653,0x2672,0x1611,0x0630,0x76D7,0x66F6,0x5695,0x46B4,0xB75B,0xA77A,0x9719,0x8738,0xF7DF,0xE7FE,0xD79D,0xC7BC,0x48C4,0x58E5,0x6886,0x78A7,0x0840,0x1861,0x2802,0x3823,0xC9CC,0xD9ED,0xE98E,0xF9AF,0x8948,0x9969,0xA90A,0xB92B,0x5AF5,0x4AD4,0x7AB7,0x6A96,0x1A71,0x0A50,0x3A33,0x2A12,0xDBFD,0xCBDC,0xFBBF,0xEB9E,0x9B79,0x8B58,0xBB3B,0xAB1A,0x6CA6,0x7C87,0x4CE4,0x5CC5,0x2C22,0x3C03,0x0C60,0x1C41,0xEDAE,0xFD8F,0xCDEC,0xDDCD,0xAD2A,0xBD0B,0x8D68,0x9D49,0x7E97,0x6EB6,0x5ED5,0x4EF4,0x3E13,0x2E32,0x1E51,0x0E70,0xFF9F,0xEFBE,0xDFDD,0xCFFC,0xBF1B,0xAF3A,0x9F59,0x8F78,0x9188,0x81A9,0xB1CA,0xA1EB,0xD10C,0xC12D,0xF14E,0xE16F,0x1080,0x00A1,0x30C2,0x20E3,0x5004,0x4025,0x7046,0x6067,0x83B9,0x9398,0xA3FB,0xB3DA,0xC33D,0xD31C,0xE37F,0xF35E,0x02B1,0x1290,0x22F3,0x32D2,0x4235,0x5214,0x6277,0x7256,0xB5EA,0xA5CB,0x95A8,0x8589,0xF56E,0xE54F,0xD52C,0xC50D,0x34E2,0x24C3,0x14A0,0x0481,0x7466,0x6447,0x5424,0x4405,0xA7DB,0xB7FA,0x8799,0x97B8,0xE75F,0xF77E,0xC71D,0xD73C,0x26D3,0x36F2,0x0691,0x16B0,0x6657,0x7676,0x4615,0x5634,0xD94C,0xC96D,0xF90E,0xE92F,0x99C8,0x89E9,0xB98A,0xA9AB,0x5844,0x4865,0x7806,0x6827,0x18C0,0x08E1,0x3882,0x28A3,0xCB7D,0xDB5C,0xEB3F,0xFB1E,0x8BF9,0x9BD8,0xABBB,0xBB9A,0x4A75,0x5A54,0x6A37,0x7A16,0x0AF1,0x1AD0,0x2AB3,0x3A92,0xFD2E,0xED0F,0xDD6C,0xCD4D,0xBDAA,0xAD8B,0x9DE8,0x8DC9,0x7C26,0x6C07,0x5C64,0x4C45,0x3CA2,0x2C83,0x1CE0,0x0CC1,0xEF1F,0xFF3E,0xCF5D,0xDF7C,0xAF9B,0xBFBA,0x8FD9,0x9FF8,0x6E17,0x7E36,0x4E55,0x5E74,0x2E93,0x3EB2,0x0ED1,0x1EF0,);
public function hash($value)
{
// CRC-CCITT-16 algorithm
$crc = 0;
$CCITT_16 = self::$CCITT_16;
$value = (string) $value;
$strlen = strlen($value);
for ($i = 0; $i < $strlen; ++$i) {
$crc = (($crc << 8) ^ $CCITT_16[($crc >> 8) ^ ord($value[$i])]) & 0xFFFF;
}
return $crc;
}
複製程式碼
這個演演算法和redis算slot的結果是一樣的,a的在PHP中算的slot也是15495
127.0.0.1:7001> set a 123
-> Redirected to slot [15495] located at 127.0.0.1:7002
OK
複製程式碼
核心邏輯原始碼
Cluster執行一條命令走的是predis\src\Connection\Aggregate\PredisCluster.php裡面的executeCommand方法
public function executeCommand(CommandInterface $command)
{
$response = $this->retryCommandOnFailure($command,__FUNCTION__);
if ($response instanceof ErrorResponseInterface) { //move或者master不可用會執行
return $this->onErrorResponse($command,$response);
}
return $response;
}
複製程式碼
retryCommandOnFailure方法就是去連線客戶端、執行命令、捕獲異常方法,這個方法居然用到了goto,這是我第一次在PHP程式裡面看到goto
private function retryCommandOnFailure(CommandInterface $command,$method)
{
$failure = false;
RETRY_COMMAND: {
try {
$response = $this->getConnection($command)->$method($command);
} catch (ConnectionException $exception) {
$connection = $exception->getConnection();
$connection->disconnect();
$this->remove($connection);
if ($failure) { //注意這裡下下面的$failure=true,所以如果第一次連線異常再次連線再異常的話,就直接不可用了
throw $exception;
} elseif ($this->useClusterSlots) {
$this->askSlotsMap();
}
$failure = true;
goto RETRY_COMMAND;
}
}
return $response;
}
複製程式碼
由於php客戶端不知道這個slot應該連線哪個redis節點,所以predis需要去猜一個節點
public function getConnection(CommandInterface $command)
{
//獲取slot
$slot = $this->strategy->getSlot($command);
if (!isset($slot)) {
throw new NotSupportedException(
"Cannot use '{$command->getId()}' with redis-cluster."
);
}
if (isset($this->slots[$slot])) {
//如果這個slot和節點有對應關係
return $this->slots[$slot];
} else {
//根據slot來猜一個節點
return $this->getConnectionBySlot($slot);
}
}
public function getConnectionBySlot($slot)
{
//判斷slot是否合法
if ($slot < 0x0000 || $slot > 0x3FFF) {
throw new \OutOfBoundsException("Invalid slot [$slot].");
}
if (isset($this->slots[$slot])) {
return $this->slots[$slot];
}
//猜一個節點
$connectionID = $this->guessNode($slot);
if (!$connection = $this->getConnectionById($connectionID)) {
$connection = $this->createConnection($connectionID);
$this->pool[$connectionID] = $connection;
}
//先存一個slot和猜出來的節點的對應關係
return $this->slots[$slot] = $connection;
}
複製程式碼
然後就是連線服務、傳送命令、處理響應操作
會先將命令格式化,處理成redis能讀懂的格式
public function writeRequest(CommandInterface $command)
{
$commandID = $command->getId();
$arguments = $command->getArguments();
$cmdlen = strlen($commandID);
$reqlen = count($arguments) + 1;
$buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";
foreach ($arguments as $argument) {
$arglen = strlen($argument);
$buffer .= "\${$arglen}\r\n{$argument}\r\n";
}
$this->write($buffer);
}
複製程式碼
然後去連線redis
protected function tcpStreamInitializer(ParametersInterface $parameters)
{
if (!filter_var($parameters->host,FILTER_VALIDATE_IP,FILTER_FLAG_IPV6)) {
$address = "tcp://$parameters->host:$parameters->port";
} else {
$address = "tcp://[$parameters->host]:$parameters->port";
}
$flags = STREAM_CLIENT_CONNECT;
if (isset($parameters->async_connect) && $parameters->async_connect) {
$flags |= STREAM_CLIENT_ASYNC_CONNECT;
}
if (isset($parameters->persistent)) {
if (false !== $persistent = filter_var($parameters->persistent,FILTER_VALIDATE_BOOLEAN,FILTER_NULL_ON_FAILURE)) {
$flags |= STREAM_CLIENT_PERSISTENT;
if ($persistent === null) {
$address = "{$address}/{$parameters->persistent}";
}
}
}
$resource = $this->createStreamSocket($parameters,$address,$flags);
return $resource;
}
protected function createStreamSocket(ParametersInterface $parameters,$flags)
{
$timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
if (!$resource = @stream_socket_client($address,$errno,$errstr,$timeout,$flags)) {
$this->onConnectionError(trim($errstr),$errno);
}
if (isset($parameters->read_write_timeout)) {
$rwtimeout = (float) $parameters->read_write_timeout;
$rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
$timeoutSeconds = floor($rwtimeout);
$timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
stream_set_timeout($resource,$timeoutSeconds,$timeoutUSeconds);
}
if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
$socket = socket_import_stream($resource);
socket_set_option($socket,SOL_TCP,TCP_NODELAY,(int) $parameters->tcp_nodelay);
}
return $resource;
}
複製程式碼
最後發命令並讀取響應
public function read()
{
$socket = $this->getResource();
$chunk = fgets($socket);
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading line from the server.');
}
$prefix = $chunk[0];
$payload = substr($chunk,1,-2);
switch ($prefix) {
case '+':
return StatusResponse::get($payload);
case '$':
$size = (int) $payload;
if ($size === -1) {
return;
}
$bulkData = '';
$bytesLeft = ($size += 2);
do {
$chunk = fread($socket,min($bytesLeft,4096));
if ($chunk === false || $chunk === '') {
$this->onConnectionError('Error while reading bytes from the server.');
}
$bulkData .= $chunk;
$bytesLeft = $size - strlen($bulkData);
} while ($bytesLeft > 0);
return substr($bulkData,-2);
case '*':
$count = (int) $payload;
if ($count === -1) {
return;
}
$multibulk = array();
for ($i = 0; $i < $count; ++$i) {
$multibulk[$i] = $this->read();
}
return $multibulk;
case ':':
$integer = (int) $payload;
return $integer == $payload ? $integer : $payload;
case '-':
return new ErrorResponse($payload);
default:
$this->onProtocolError("Unknown response prefix: '$prefix'.");
return;
}
}
複製程式碼
如果有異常,比如發生了MOVED,就會去處理報錯資訊
protected function onMovedResponse(CommandInterface $command,$details)
{
list($slot,$connectionID) = explode(' ',$details,2);
if (!$connection = $this->getConnectionById($connectionID)) {
$connection = $this->createConnection($connectionID);
}
if ($this->useClusterSlots) {
$this->askSlotsMap($connection);
}
$this->move($connection,$slot);
$response = $this->executeCommand($command);
return $response;
}
複製程式碼
然後用MOVED指向的真實節點去獲取所有節點的槽、主從對應關係,會傳送一個cluster slots命令
public function askSlotsMap(NodeConnectionInterface $connection = null)
{
if (!$connection && !$connection = $this->getRandomConnection()) {
return array();
}
$this->resetSlotsMap();
$response = $this->queryClusterNodeForSlotsMap($connection);
foreach ($response as $slots) {
// We only support master servers for now,so we ignore subsequent
// elements in the $slots array identifying slaves.
list($start,$end,$master) = $slots;
if ($master[0] === '') {
$this->setSlots($start,(string) $connection);
} else {
$this->setSlots($start,"{$master[0]}:{$master[1]}");
}
}
return $this->slotsMap;
}
複製程式碼
然後會再次處理髮送的set命令,再次走到executeCommand裡面