1. 程式人生 > 程式設計 >PRedis的cluster模式原始碼詳解

PRedis的cluster模式原始碼詳解

核心邏輯原始碼部分可以不看,因為predis原始碼很繞,我就是把程式碼貼過來而已,有興趣的童鞋可以自己去追下

更多原始碼詳解已釋出到https://github.com/Zhucola/php_frameworks_analysis,歡迎star~

目錄

整體執行流程

  • 初始化專案(原始碼很繞,有興趣的同學可以自己去追一下,就是Client的構造方法)
  • 假如客戶端執行set操作,會根據key做slot,並且根據slot去獲取一個節點
    1. 如果節點配置中指定了slots,就獲取指定的slot對應的節點(如50的slot會分配到7000埠,注意這個可能不是真實的redis槽的分配關係,比如redis做了槽的修改而PHP程式沒有更新)
    2. 如果節點配置中沒有指定slots或者指定的slots不匹配,就去猜節點(guessNode),猜節點會認為你槽分配是平均的(如有3個節點,配置列表中的第一個節點會認為是0-5469,第二個是5461-10922,第三個是10923-16383,然後用slot去匹配節點)

指定slots對節點對應關係的配置如下

$redis_list = [
        'redis://192.168.124.10:7000?slots=1-100,500-1000','redis://192.168.124.10:7001?slots=101-499','redis://192.168.124.10:7002'
];
$redis = new Client($redis_list
,['cluster'=>'redis']); 複製程式碼

猜節點演演算法如下

$count = count($this->pool);
$index = min((int) ($slot / (int) (16384 / $count)),$count - 1);
$nodes = array_keys($this->pool);
return $nodes[$index];
複製程式碼
  • 將set命令格式化,如果命令是
$redis->set("a",1234);
複製程式碼

會變成

*3
$3
SET
$1
a
$4
1234
複製程式碼
  • 建立與節點的連線,並且傳送密碼和資料庫號(如果配置的話)
    1. 如果連線失敗,將該節點從配置列表中刪除,然後從配置列表中隨機取一個節點去做cluster slots操作,獲取所有節點真實的slot資訊、主從關係資訊,然後重新去用真實的對應關係去匹配節點,用真實的節點重新連線
    2. 然後節點再次連線失敗,就直接失敗了,程式碼寫死了只有一次重試機會
  • 將格式化後的命令發給redis節點
    1. 如果命令傳送失敗,將該節點從配置列表中刪除,然後從配置列表中隨機取一個節點重新連線,做cluster slots,獲取所有節點真實的slot資訊、主從關係資訊,然後重新去用真實的對應關係去匹配節點,用真實的節點重新連線,可能重新選擇的節點和發生失敗的節點是一個,如果再次失敗就直接異常
      • 這裡第一次傳送命令失敗其實連線的是主節點,如果主節點宕機,redis會將從提升為主,然後PHP獲取cluster slots資訊的時候,會獲取新提升為主節點的這個資訊(比如配置了7000埠的master,連線上了7000埠但是在傳送命令時候7000宕機,就會獲取cluster slots,然後發現7000已經變成了slave,將7005提升成了master,就會去連線7005)
  • 讀取redis節點的響應
    1. 有MOVED情況,就是節點與slot的對應關係和真實的redis伺服器不一樣,處理MOVED資訊,獲取真實的slot和節點資訊(從響應資訊中獲取)
      • 給MOVED的節點發cluster slots獲取叢集所有節點的slot資訊、主從關係資訊,然後重新用真實的槽節點去發資訊
      • 如果給MOVED節點發cluster slots失敗,就配置的節點列表中刪除此節點,並且隨機返回一個節點去做cluster slots,然後重新發命令(有可能再次發的節點還是MOVED失敗的節點,然後再次失敗就是不可用了;或者再次發的節點是已經被提升為master的slave節點)
    2. 響應成功,收到OK MOVED的響應
- MOVED 15495 127.0.0.1:7002 
複製程式碼

成功的響應

+ OK
複製程式碼

CRC16演演算法原始碼

根據key獲取slot的方法在predis\src\Cluster\ClusterStrategy.php裡面,getSlot

public function getSlot(CommandInterface $command)
{
    $slot = $command->getSlot();
    if (!isset($slot) && isset($this->commands[$cmdID = $command->getId()])) {

        $key = call_user_func($this->commands[$cmdID],$command);
        if (isset($key)) {
            $slot = $this->getSlotByKey($key);
            $command->setSlot($slot);
        }
    }

    return $slot;
}
複製程式碼

會呼叫getSlotByKey方法

public function getSlotByKey($key)
{
    $key = $this->extractKeyTag($key);  //獲取key
    $slot = $this->hashGenerator->hash($key) & 0x3FFF;   //做hash後和16383取餘

    return $slot;
}
複製程式碼

核心的CRC16演演算法在predis\src\Cluster\Hash\CRC16.php,可以直接拿來用

private static $CCITT_16 = array(
    0x0000,0x1021,0x2042,0x3063,0x4084,0x50A5,0x60C6,0x70E7,0x8108,0x9129,0xA14A,0xB16B,0xC18C,0xD1AD,0xE1CE,0xF1EF,0x1231,0x0210,0x3273,0x2252,0x52B5,0x4294,0x72F7,0x62D6,0x9339,0x8318,0xB37B,0xA35A,0xD3BD,0xC39C,0xF3FF,0xE3DE,0x2462,0x3443,0x0420,0x1401,0x64E6,0x74C7,0x44A4,0x5485,0xA56A,0xB54B,0x8528,0x9509,0xE5EE,0xF5CF,0xC5AC,0xD58D,0x3653,0x2672,0x1611,0x0630,0x76D7,0x66F6,0x5695,0x46B4,0xB75B,0xA77A,0x9719,0x8738,0xF7DF,0xE7FE,0xD79D,0xC7BC,0x48C4,0x58E5,0x6886,0x78A7,0x0840,0x1861,0x2802,0x3823,0xC9CC,0xD9ED,0xE98E,0xF9AF,0x8948,0x9969,0xA90A,0xB92B,0x5AF5,0x4AD4,0x7AB7,0x6A96,0x1A71,0x0A50,0x3A33,0x2A12,0xDBFD,0xCBDC,0xFBBF,0xEB9E,0x9B79,0x8B58,0xBB3B,0xAB1A,0x6CA6,0x7C87,0x4CE4,0x5CC5,0x2C22,0x3C03,0x0C60,0x1C41,0xEDAE,0xFD8F,0xCDEC,0xDDCD,0xAD2A,0xBD0B,0x8D68,0x9D49,0x7E97,0x6EB6,0x5ED5,0x4EF4,0x3E13,0x2E32,0x1E51,0x0E70,0xFF9F,0xEFBE,0xDFDD,0xCFFC,0xBF1B,0xAF3A,0x9F59,0x8F78,0x9188,0x81A9,0xB1CA,0xA1EB,0xD10C,0xC12D,0xF14E,0xE16F,0x1080,0x00A1,0x30C2,0x20E3,0x5004,0x4025,0x7046,0x6067,0x83B9,0x9398,0xA3FB,0xB3DA,0xC33D,0xD31C,0xE37F,0xF35E,0x02B1,0x1290,0x22F3,0x32D2,0x4235,0x5214,0x6277,0x7256,0xB5EA,0xA5CB,0x95A8,0x8589,0xF56E,0xE54F,0xD52C,0xC50D,0x34E2,0x24C3,0x14A0,0x0481,0x7466,0x6447,0x5424,0x4405,0xA7DB,0xB7FA,0x8799,0x97B8,0xE75F,0xF77E,0xC71D,0xD73C,0x26D3,0x36F2,0x0691,0x16B0,0x6657,0x7676,0x4615,0x5634,0xD94C,0xC96D,0xF90E,0xE92F,0x99C8,0x89E9,0xB98A,0xA9AB,0x5844,0x4865,0x7806,0x6827,0x18C0,0x08E1,0x3882,0x28A3,0xCB7D,0xDB5C,0xEB3F,0xFB1E,0x8BF9,0x9BD8,0xABBB,0xBB9A,0x4A75,0x5A54,0x6A37,0x7A16,0x0AF1,0x1AD0,0x2AB3,0x3A92,0xFD2E,0xED0F,0xDD6C,0xCD4D,0xBDAA,0xAD8B,0x9DE8,0x8DC9,0x7C26,0x6C07,0x5C64,0x4C45,0x3CA2,0x2C83,0x1CE0,0x0CC1,0xEF1F,0xFF3E,0xCF5D,0xDF7C,0xAF9B,0xBFBA,0x8FD9,0x9FF8,0x6E17,0x7E36,0x4E55,0x5E74,0x2E93,0x3EB2,0x0ED1,0x1EF0,);
public function hash($value)
{
    // CRC-CCITT-16 algorithm
    $crc = 0;
    $CCITT_16 = self::$CCITT_16;

    $value = (string) $value;
    $strlen = strlen($value);

    for ($i = 0; $i < $strlen; ++$i) {
        $crc = (($crc << 8) ^ $CCITT_16[($crc >> 8) ^ ord($value[$i])]) & 0xFFFF;
    }

    return $crc;
}
複製程式碼

這個演演算法和redis算slot的結果是一樣的,a的在PHP中算的slot也是15495

127.0.0.1:7001> set a 123
-> Redirected to slot [15495] located at 127.0.0.1:7002
OK
複製程式碼

核心邏輯原始碼

Cluster執行一條命令走的是predis\src\Connection\Aggregate\PredisCluster.php裡面的executeCommand方法

public function executeCommand(CommandInterface $command)
{
    $response = $this->retryCommandOnFailure($command,__FUNCTION__);

    if ($response instanceof ErrorResponseInterface) {   //move或者master不可用會執行
        return $this->onErrorResponse($command,$response);
    }

    return $response;
}
複製程式碼

retryCommandOnFailure方法就是去連線客戶端、執行命令、捕獲異常方法,這個方法居然用到了goto,這是我第一次在PHP程式裡面看到goto

private function retryCommandOnFailure(CommandInterface $command,$method)
{
    $failure = false;
    RETRY_COMMAND: {
        try {
            $response = $this->getConnection($command)->$method($command);
        } catch (ConnectionException $exception) {
            $connection = $exception->getConnection();
            $connection->disconnect();

            $this->remove($connection);

            if ($failure) {   //注意這裡下下面的$failure=true,所以如果第一次連線異常再次連線再異常的話,就直接不可用了
                throw $exception;
            } elseif ($this->useClusterSlots) {
                $this->askSlotsMap();
            }

            $failure = true;

            goto RETRY_COMMAND;
        }
    }

    return $response;
}
複製程式碼

由於php客戶端不知道這個slot應該連線哪個redis節點,所以predis需要去猜一個節點

public function getConnection(CommandInterface $command)
{
    //獲取slot
    $slot = $this->strategy->getSlot($command);
    if (!isset($slot)) {
        throw new NotSupportedException(
            "Cannot use '{$command->getId()}' with redis-cluster."
        );
    }

    if (isset($this->slots[$slot])) {
        //如果這個slot和節點有對應關係
        return $this->slots[$slot];
    } else {
        //根據slot來猜一個節點
        return $this->getConnectionBySlot($slot);
    }
}
public function getConnectionBySlot($slot)
{
    //判斷slot是否合法
    if ($slot < 0x0000 || $slot > 0x3FFF) {
        throw new \OutOfBoundsException("Invalid slot [$slot].");
    }

    if (isset($this->slots[$slot])) {
        return $this->slots[$slot];
    }
    //猜一個節點
    $connectionID = $this->guessNode($slot);
    if (!$connection = $this->getConnectionById($connectionID)) {
        $connection = $this->createConnection($connectionID);
        $this->pool[$connectionID] = $connection;
    }
    //先存一個slot和猜出來的節點的對應關係
    return $this->slots[$slot] = $connection;
}
複製程式碼

然後就是連線服務、傳送命令、處理響應操作
會先將命令格式化,處理成redis能讀懂的格式

public function writeRequest(CommandInterface $command)
{
    $commandID = $command->getId();
    $arguments = $command->getArguments();

    $cmdlen = strlen($commandID);
    $reqlen = count($arguments) + 1;

    $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$commandID}\r\n";

    foreach ($arguments as $argument) {
        $arglen = strlen($argument);
        $buffer .= "\${$arglen}\r\n{$argument}\r\n";
    }
    $this->write($buffer);
}
複製程式碼

然後去連線redis

protected function tcpStreamInitializer(ParametersInterface $parameters)
{
    if (!filter_var($parameters->host,FILTER_VALIDATE_IP,FILTER_FLAG_IPV6)) {
        $address = "tcp://$parameters->host:$parameters->port";
    } else {
        $address = "tcp://[$parameters->host]:$parameters->port";
    }

    $flags = STREAM_CLIENT_CONNECT;

    if (isset($parameters->async_connect) && $parameters->async_connect) {
        $flags |= STREAM_CLIENT_ASYNC_CONNECT;
    }
    if (isset($parameters->persistent)) {
        if (false !== $persistent = filter_var($parameters->persistent,FILTER_VALIDATE_BOOLEAN,FILTER_NULL_ON_FAILURE)) {
            $flags |= STREAM_CLIENT_PERSISTENT;

            if ($persistent === null) {
                $address = "{$address}/{$parameters->persistent}";
            }
        }
    }
    $resource = $this->createStreamSocket($parameters,$address,$flags);

    return $resource;
}
protected function createStreamSocket(ParametersInterface $parameters,$flags)
{
    $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout : 5.0);
    if (!$resource = @stream_socket_client($address,$errno,$errstr,$timeout,$flags)) {
        $this->onConnectionError(trim($errstr),$errno);
    }

    if (isset($parameters->read_write_timeout)) {
        $rwtimeout = (float) $parameters->read_write_timeout;
        $rwtimeout = $rwtimeout > 0 ? $rwtimeout : -1;
        $timeoutSeconds = floor($rwtimeout);
        $timeoutUSeconds = ($rwtimeout - $timeoutSeconds) * 1000000;
        stream_set_timeout($resource,$timeoutSeconds,$timeoutUSeconds);
    }

    if (isset($parameters->tcp_nodelay) && function_exists('socket_import_stream')) {
        $socket = socket_import_stream($resource);
        socket_set_option($socket,SOL_TCP,TCP_NODELAY,(int) $parameters->tcp_nodelay);
    }

    return $resource;
}
複製程式碼

最後發命令並讀取響應

public function read()
{
    $socket = $this->getResource();
    $chunk = fgets($socket);
    if ($chunk === false || $chunk === '') {
        $this->onConnectionError('Error while reading line from the server.');
    }
    $prefix = $chunk[0];
    $payload = substr($chunk,1,-2);

    switch ($prefix) {
        case '+':
            return StatusResponse::get($payload);

        case '$':
            $size = (int) $payload;

            if ($size === -1) {
                return;
            }

            $bulkData = '';
            $bytesLeft = ($size += 2);

            do {
                $chunk = fread($socket,min($bytesLeft,4096));

                if ($chunk === false || $chunk === '') {
                    $this->onConnectionError('Error while reading bytes from the server.');
                }

                $bulkData .= $chunk;
                $bytesLeft = $size - strlen($bulkData);
            } while ($bytesLeft > 0);

            return substr($bulkData,-2);

        case '*':
            $count = (int) $payload;

            if ($count === -1) {
                return;
            }

            $multibulk = array();

            for ($i = 0; $i < $count; ++$i) {
                $multibulk[$i] = $this->read();
            }

            return $multibulk;

        case ':':
            $integer = (int) $payload;
            return $integer == $payload ? $integer : $payload;

        case '-':
            return new ErrorResponse($payload);

        default:
            $this->onProtocolError("Unknown response prefix: '$prefix'.");

            return;
    }
}
複製程式碼

如果有異常,比如發生了MOVED,就會去處理報錯資訊

protected function onMovedResponse(CommandInterface $command,$details)
{
    list($slot,$connectionID) = explode(' ',$details,2);
    if (!$connection = $this->getConnectionById($connectionID)) {
        $connection = $this->createConnection($connectionID);
    }

    if ($this->useClusterSlots) {
        $this->askSlotsMap($connection);
    }
    $this->move($connection,$slot);
    $response = $this->executeCommand($command);

    return $response;
}
複製程式碼

然後用MOVED指向的真實節點去獲取所有節點的槽、主從對應關係,會傳送一個cluster slots命令

public function askSlotsMap(NodeConnectionInterface $connection = null)
{
    if (!$connection && !$connection = $this->getRandomConnection()) {
        return array();
    }
    $this->resetSlotsMap();

    $response = $this->queryClusterNodeForSlotsMap($connection);
    foreach ($response as $slots) {
        // We only support master servers for now,so we ignore subsequent
        // elements in the $slots array identifying slaves.
        list($start,$end,$master) = $slots;
        if ($master[0] === '') {
            $this->setSlots($start,(string) $connection);
        } else {
            $this->setSlots($start,"{$master[0]}:{$master[1]}");
        }
    }
    return $this->slotsMap;
}
複製程式碼

然後會再次處理髮送的set命令,再次走到executeCommand裡面