1. 程式人生 > >dubbo-php-framework的資料包的解包和組包邏輯解析

dubbo-php-framework的資料包的解包和組包邏輯解析

前面分析了框架的請求和回覆實體的封裝,而在進行底層網路通訊時,需要進行解包(收到資料)和組包(傳送資料),框架底層使用了dubbo協議,資料序列化方式是json格式,這篇文章我們就重點看看解包和組包的邏輯,這部分邏輯在DubboParser類中完成,這個類定義在dubbo-php-framework-master/common/protocol/fsof/DubboParser.php檔案中。

下面程式碼註釋中明確列了dubbo協議的格式,大家在看解包組包邏輯時,對照著協議看會清楚很多。另外組包和解包是個相互的過程,也就是組包是將業務資料(DubboRequest,DubboResponse)按規則拼接為二進位制串,而解包是將二進位制串按規則解析後set屬性到業務資料中(DubboRequest,DubboResponse)。

/**
 *
 * Dubbo網路協議
 * +------------------------------------------------------------------------------------------+
 * |                        包頭(二進位制資料 16bit)   |  包體  |
 * +-----------------------------------------------------------------------------------------+
 * | 版本號  |   命令&serialize   | 空白 | 包序號|  長度  |  資料  |
 * +-----------------------------------------------------------------------------------------+
 * | magic(2) |  cmd&serialize(1)|(1)    |sn(8) | len(4)  | data(N)|
 * +-----------------------------------------------------------------------------------------+
 *
 * magic:協議包起始標識, 0xdabb
 * --------------------------------------------------------------------------------------------
 * cmd:命令型別:FLAG_REQUEST為0x80, FLAG_TWOWAY為0x40, FLAG_EVENT為0x20
 * serialize:序列化方案編號:與cmd共用一個位元組,採用json,對應dubbo中編號為6
 * --------------------------------------------------------------------------------------------
 * sn:請求序號,consumer會為每個請求編制一個程序內唯一序號
 *    ,provider處理完請求後在返回的資料包中會攜帶該sn號,供consumer判斷當前的資料是對應哪個請求
 * --------------------------------------------------------------------------------------------*
 * len:資料報文長度
 * --------------------------------------------------------------------------------------------
 * data:資料報文,目前採用json進行序列化
 * --------------------------------------------------------------------------------------------
 */
class DubboParser
{
    //dubbo協議基本資訊
    const PACKAGE_HEDA_LEN = 16;
    const MAX_RECV_LEN = 1048576;//1024*1024;
    const RESPONSE_TCP_SEGMENT_LEN = 1048576;//1*1024*1024;

    //fsof協議ver欄位,ver欄位既指示協議版本資訊,也作為magic使用
    const DUBBO_PROTOCOL_MAGIC = 0xdabb;

    //serialize 方案編號
    const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6;     //serialization code


    //fsof協議包頭cmd欄位含義
    const FLAG_REQUEST = 0x80;           //request
    const FLAG_TWOWAY = 0x40;            //two_way
    const FLAG_HEARTBEAT_EVENT = 0x20;  //heart_event
    const SERIALIZATION_MASK = 0x1f;     //serialization_mask

    const UPPER_MASK = 0xffffffff00000000;
    const LOWER_MASK = 0x00000000ffffffff;

    const RESPONSE_WITH_EXCEPTION = 0;
    const RESPONSE_VALUE = 1;
    const RESPONSE_NULL_VALUE = 2;

    private $logger;

    public function  __construct()
    {
        $this->logger = \Logger::getLogger(__CLASS__);
    }

    //將DubboRequest編碼為二進位制字串
    public function packRequest(DubboRequest &$request)
    {
        //資料按PHP_EOL的分隔符拼接
        $reqData = json_encode($request->getDubboVersion()) . PHP_EOL .
            json_encode($request->getService()) . PHP_EOL .
            json_encode($request->getVersion()) . PHP_EOL .
            json_encode($request->getMethod()) . PHP_EOL ;
        $typeStr = "";
        for($i=0;$i < count($request->getTypes());$i++){
           $typeStr = $typeStr.$request->getTypes()[$i];
        }
        $reqData = $reqData.json_encode($typeStr) . PHP_EOL;
        for ($x = 0; $x < count($request->getParams()); $x++) {
            $reqData = $reqData . json_encode($request->getParams()[$x]) . PHP_EOL;
        }

        $attach = array();
        $attach['path'] = $request->getService();
        $attach['interface'] = $request->getService();
        $attach['group'] = $request->getGroup();
        $attach['timeout'] = $request->getTimeout();
        $attach['version'] = $request->getVersion();
        $request->setAttach($attach);
        $reqData = $reqData . json_encode($request->getAttach());

        $upper = ($request->getSn() & self::UPPER_MASK) >> 32;
        $lower = $request->getSn() & self::LOWER_MASK;
        $flag = (self::FLAG_REQUEST | self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
        if ($request->isTwoWay()) $flag |= self::FLAG_TWOWAY;
        if ($request->isHeartbeatEvent()) $flag |= self::FLAG_HEARTBEAT_EVENT;
        //呼叫PHP的系統函式pack,按格式編碼資料
        $out = pack("n1C1a1N1N1N1",
            self::DUBBO_PROTOCOL_MAGIC,
            $flag,
            "",
            $upper,
            $lower,
            strlen($reqData));
        return $out . $reqData;
    }

    //解析回覆包的頭部資訊
    public function parseResponseHeader(DubboResponse &$response)
    {
        //頭部資料長度為16byte,這裡擷取16個byte後進行解析
        $res_header = substr($response->getFullData(), 0, self::PACKAGE_HEDA_LEN);
        $format = 'n1magic/C1flag/C1status/N1upper/N1lower/N1len';
        //呼叫PHP函式按上述格式解析資料
        $_arr = unpack($format, $res_header);
        $response->setStatus($_arr['status']);
        $response->setSn($_arr["upper"] << 32 | $_arr["lower"]);
        $flag = $_arr["flag"];
        if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
        {
            $response->setHeartbeatEvent(true);
        }
        $response->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
        $response->setLen($_arr["len"]);
        return $response;
    }

    //解包回覆包的body部分
    public function parseResponseBody(DubboResponse &$response)
    {
        //從第16byte開始的所有資料當前body部分
        $_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN);
        $response->setResponseBody($_data);
        $dataArr = array();
        //資料不為空時,按PHP_EOL分隔符拆分資料
        if ($_data){
            $dataArr = explode(PHP_EOL,$_data);
        }

        if ($response->getStatus() == DubboResponse::OK)
        {
            if ($response->isHeartbeatEvent())
            {
                $response->setResult(json_decode($dataArr[0], true));
            } else
            {
                switch ($dataArr[0])
                {
                    case self::RESPONSE_NULL_VALUE:
                        break;
                    case self::RESPONSE_VALUE:
                        $response->setResult(json_decode($dataArr[1], true));
                        break;
                    case self::RESPONSE_WITH_EXCEPTION:
                        $exception = json_decode($dataArr[1], true);
                        if(is_array($exception) && array_key_exists('message', $exception)){
                            throw new \Exception($exception['message']);
                        }else if(is_string($exception)){
                            throw new \Exception($exception);
                        }else{
                            throw new \Exception("provider occur error");
                        }
                        break;
                    default:
                        return false;
                }
            }
            return $response;
        } else {
            throw new \Exception(json_decode($dataArr[0], true));
        }
    }

    //解包請求包的頭部部分
    public function parseRequestHeader(DubboRequest &$request)
    {
        $_data = substr($request->getFullData(), 0, self::PACKAGE_HEDA_LEN);
        $format = 'n1magic/C1flag/C1blank/N1upper/N1lower/N1len';
        $_arr = unpack($format, $_data);
        $flag = $_arr['flag'];
        $request->setTwoWay(($flag & self::FLAG_TWOWAY) != 0);
        if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0)
        {
            $request->setHeartbeatEvent(true);
        }

        $request->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON);
        $request->setSn($_arr['upper'] << 32 | $_arr['lower']);
        $request->setDataLen($_arr['len']);
        $request->setRequestLen($request->getDataLen() + self::PACKAGE_HEDA_LEN);
        return $request;
    }
    //解包請求包的body部分
    public function parseRequestBody(DubboRequest &$request)
    {
        if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON)
        {
            $this->logger->error("unknown serialization type :" . $request->getSerialization());
            return false;
        }
        $cliData = substr($request->getFullData(), self::PACKAGE_HEDA_LEN);
        if ($cliData)
        {
            if ($request->isHeartbeatEvent())
            {
                //心跳請求,不需要資料回送
            } else
            {
                $dataArr = explode(PHP_EOL, $cliData);
                $request->setDubboVersion(json_decode($dataArr[0], true));
                $request->setService(json_decode($dataArr[1], true));
                $request->setVersion(json_decode($dataArr[2], true));
                $methodName = json_decode($dataArr[3],true);
                if ($methodName == "\$invoke")
                {
                    //泛化呼叫
                    $request->setMethod(json_decode($dataArr[5],true));
                    $request->setParams(json_decode($dataArr[7],true));
                    $attach = json_decode($dataArr[8],true);
                } else
                {
                    //非泛化呼叫
                    $request->setMethod($methodName);
                    $paramTypes = json_decode($dataArr[4], true);
                    if ($paramTypes == "")
                    {
                        //呼叫沒有引數的方法
                        $request->setTypes(NULL);
                        $request->setParams(NULL);
                        $attach = json_decode($dataArr[5],true);
                    } else
                    {
                        $typeArr = explode(";", $paramTypes);
                        $typeArrLen = count($typeArr);
                        $request->setParamNum($typeArrLen - 1);
                        $params = array();
                        for ($i = 0; $i < $typeArrLen - 1; $i++) {
                            $params[$i] = json_decode($dataArr[5 + $i],true);
                        }
                        $request->setParams($params);
                        $attach = json_decode($dataArr[5 + ($typeArrLen - 1)],true);
                    }
                }
                $request->setAttach($attach);
                if (array_key_exists('group', $attach))
                {
                    $request->setGroup($attach['group']);
                }
                return $request;
            }
        }
        return false;
    }

    //組包回覆包部分
    public function packResponse(DubboResponse &$response)
    {
        if ($response->getStatus() != DubboResponse::OK) {
            $resData = json_encode($response->getErrorMsg()) ;
        } else {
            if($response->getErrorMsg() != NULL && $response->getErrorMsg() != ""){
                $resData = json_encode(self::RESPONSE_WITH_EXCEPTION) . PHP_EOL . json_encode($response->getErrorMsg());
            }else if ($response->getResult() == NULL) {
                $resData = json_encode(self::RESPONSE_NULL_VALUE);
            } else {
                $resData = json_encode(self::RESPONSE_VALUE) . PHP_EOL . json_encode($response->getResult());
            }
        }
        $resData =  $resData.PHP_EOL;
        $upper = ($response->getSn() & self::UPPER_MASK) >> 32;
        $lower = $response->getSn() & self::LOWER_MASK;
        $flag = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON;
        if ($response->isHeartbeatEvent()) {
            $flag |= self::FLAG_HEARTBEAT_EVENT;
        }
        $out = pack("n1C1C1N1N1N1",
            self::DUBBO_PROTOCOL_MAGIC,
            $flag,
            $response->getStatus(),
            $upper,
            $lower,
            strlen($resData));

        return $out . $resData;
    }
    
    //判斷是否是業務回包,這裡判斷邏輯也比較簡單,非心跳類的包都當做業務包處理。
    public function isNormalResponse(DubboResponse $response)
    {
        return !($response->isHeartbeatEvent());
    }
    //判斷是否是業務請求包
    public function isNormalRequest(DubboRequest $request)
    {
        return !($request->isHeartbeatEvent());
    }

    public function isOneWayRequest(DubboRequest $request)
    {
        return !($request->isTwoWay());
    }
    //心跳請求包
    public function isHearBeatRequest(DubboRequest $request)
    {
        return $request->isHeartbeatEvent();
    }
    //心跳回復包
    public function isHearBeatResponse(DubboResponse $response)
    {
        return $response->isHeartbeatEvent();
    }
    //請求在佇列中的時間,這個主要是做監控用的
    public static function getReqInQueueTime(DubboRequest $request)
    {
        $ret = 0;
        if (!empty($request->reqInfo)) {
            $ret = isset($request->reqInfo['inqueue_time']) ? $request->reqInfo['inqueue_time'] : 0;
        }
        return $ret;
    }
}