dubbo-php-framework的資料包的解包和組包邏輯解析
阿新 • • 發佈:2018-12-10
前面分析了框架的請求和回覆實體的封裝,而在進行底層網路通訊時,需要進行解包(收到資料)和組包(傳送資料),框架底層使用了dubbo協議,資料序列化方式是json格式,這篇文章我們就重點看看解包和組包的邏輯,這部分邏輯在DubboParser類中完成,這個類定義在dubbo-php-framework-master/common/protocol/fsof/DubboParser.php檔案中。
下面程式碼註釋中明確列了dubbo協議的格式,大家在看解包組包邏輯時,對照著協議看會清楚很多。另外組包和解包是個相互的過程,也就是組包是將業務資料(DubboRequest,DubboResponse)按規則拼接為二進位制串,而解包是將二進位制串按規則解析後set屬性到業務資料中(DubboRequest,DubboResponse)。
/** * * Dubbo網路協議 * +------------------------------------------------------------------------------------------+ * | 包頭(二進位制資料 16bit) | 包體 | * +-----------------------------------------------------------------------------------------+ * | 版本號 | 命令&serialize | 空白 | 包序號| 長度 | 資料 | * +-----------------------------------------------------------------------------------------+ * | magic(2) | cmd&serialize(1)|(1) |sn(8) | len(4) | data(N)| * +-----------------------------------------------------------------------------------------+ * * magic:協議包起始標識, 0xdabb * -------------------------------------------------------------------------------------------- * cmd:命令型別:FLAG_REQUEST為0x80, FLAG_TWOWAY為0x40, FLAG_EVENT為0x20 * serialize:序列化方案編號:與cmd共用一個位元組,採用json,對應dubbo中編號為6 * -------------------------------------------------------------------------------------------- * sn:請求序號,consumer會為每個請求編制一個程序內唯一序號 * ,provider處理完請求後在返回的資料包中會攜帶該sn號,供consumer判斷當前的資料是對應哪個請求 * --------------------------------------------------------------------------------------------* * len:資料報文長度 * -------------------------------------------------------------------------------------------- * data:資料報文,目前採用json進行序列化 * -------------------------------------------------------------------------------------------- */ class DubboParser { //dubbo協議基本資訊 const PACKAGE_HEDA_LEN = 16; const MAX_RECV_LEN = 1048576;//1024*1024; const RESPONSE_TCP_SEGMENT_LEN = 1048576;//1*1024*1024; //fsof協議ver欄位,ver欄位既指示協議版本資訊,也作為magic使用 const DUBBO_PROTOCOL_MAGIC = 0xdabb; //serialize 方案編號 const DUBBO_PROTOCOL_SERIALIZE_FAST_JSON = 6; //serialization code //fsof協議包頭cmd欄位含義 const FLAG_REQUEST = 0x80; //request const FLAG_TWOWAY = 0x40; //two_way const FLAG_HEARTBEAT_EVENT = 0x20; //heart_event const SERIALIZATION_MASK = 0x1f; //serialization_mask const UPPER_MASK = 0xffffffff00000000; const LOWER_MASK = 0x00000000ffffffff; const RESPONSE_WITH_EXCEPTION = 0; const RESPONSE_VALUE = 1; const RESPONSE_NULL_VALUE = 2; private $logger; public function __construct() { $this->logger = \Logger::getLogger(__CLASS__); } //將DubboRequest編碼為二進位制字串 public function packRequest(DubboRequest &$request) { //資料按PHP_EOL的分隔符拼接 $reqData = json_encode($request->getDubboVersion()) . PHP_EOL . json_encode($request->getService()) . PHP_EOL . json_encode($request->getVersion()) . PHP_EOL . json_encode($request->getMethod()) . PHP_EOL ; $typeStr = ""; for($i=0;$i < count($request->getTypes());$i++){ $typeStr = $typeStr.$request->getTypes()[$i]; } $reqData = $reqData.json_encode($typeStr) . PHP_EOL; for ($x = 0; $x < count($request->getParams()); $x++) { $reqData = $reqData . json_encode($request->getParams()[$x]) . PHP_EOL; } $attach = array(); $attach['path'] = $request->getService(); $attach['interface'] = $request->getService(); $attach['group'] = $request->getGroup(); $attach['timeout'] = $request->getTimeout(); $attach['version'] = $request->getVersion(); $request->setAttach($attach); $reqData = $reqData . json_encode($request->getAttach()); $upper = ($request->getSn() & self::UPPER_MASK) >> 32; $lower = $request->getSn() & self::LOWER_MASK; $flag = (self::FLAG_REQUEST | self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON); if ($request->isTwoWay()) $flag |= self::FLAG_TWOWAY; if ($request->isHeartbeatEvent()) $flag |= self::FLAG_HEARTBEAT_EVENT; //呼叫PHP的系統函式pack,按格式編碼資料 $out = pack("n1C1a1N1N1N1", self::DUBBO_PROTOCOL_MAGIC, $flag, "", $upper, $lower, strlen($reqData)); return $out . $reqData; } //解析回覆包的頭部資訊 public function parseResponseHeader(DubboResponse &$response) { //頭部資料長度為16byte,這裡擷取16個byte後進行解析 $res_header = substr($response->getFullData(), 0, self::PACKAGE_HEDA_LEN); $format = 'n1magic/C1flag/C1status/N1upper/N1lower/N1len'; //呼叫PHP函式按上述格式解析資料 $_arr = unpack($format, $res_header); $response->setStatus($_arr['status']); $response->setSn($_arr["upper"] << 32 | $_arr["lower"]); $flag = $_arr["flag"]; if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) { $response->setHeartbeatEvent(true); } $response->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON); $response->setLen($_arr["len"]); return $response; } //解包回覆包的body部分 public function parseResponseBody(DubboResponse &$response) { //從第16byte開始的所有資料當前body部分 $_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN); $response->setResponseBody($_data); $dataArr = array(); //資料不為空時,按PHP_EOL分隔符拆分資料 if ($_data){ $dataArr = explode(PHP_EOL,$_data); } if ($response->getStatus() == DubboResponse::OK) { if ($response->isHeartbeatEvent()) { $response->setResult(json_decode($dataArr[0], true)); } else { switch ($dataArr[0]) { case self::RESPONSE_NULL_VALUE: break; case self::RESPONSE_VALUE: $response->setResult(json_decode($dataArr[1], true)); break; case self::RESPONSE_WITH_EXCEPTION: $exception = json_decode($dataArr[1], true); if(is_array($exception) && array_key_exists('message', $exception)){ throw new \Exception($exception['message']); }else if(is_string($exception)){ throw new \Exception($exception); }else{ throw new \Exception("provider occur error"); } break; default: return false; } } return $response; } else { throw new \Exception(json_decode($dataArr[0], true)); } } //解包請求包的頭部部分 public function parseRequestHeader(DubboRequest &$request) { $_data = substr($request->getFullData(), 0, self::PACKAGE_HEDA_LEN); $format = 'n1magic/C1flag/C1blank/N1upper/N1lower/N1len'; $_arr = unpack($format, $_data); $flag = $_arr['flag']; $request->setTwoWay(($flag & self::FLAG_TWOWAY) != 0); if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) { $request->setHeartbeatEvent(true); } $request->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON); $request->setSn($_arr['upper'] << 32 | $_arr['lower']); $request->setDataLen($_arr['len']); $request->setRequestLen($request->getDataLen() + self::PACKAGE_HEDA_LEN); return $request; } //解包請求包的body部分 public function parseRequestBody(DubboRequest &$request) { if ($request->getSerialization() != self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON) { $this->logger->error("unknown serialization type :" . $request->getSerialization()); return false; } $cliData = substr($request->getFullData(), self::PACKAGE_HEDA_LEN); if ($cliData) { if ($request->isHeartbeatEvent()) { //心跳請求,不需要資料回送 } else { $dataArr = explode(PHP_EOL, $cliData); $request->setDubboVersion(json_decode($dataArr[0], true)); $request->setService(json_decode($dataArr[1], true)); $request->setVersion(json_decode($dataArr[2], true)); $methodName = json_decode($dataArr[3],true); if ($methodName == "\$invoke") { //泛化呼叫 $request->setMethod(json_decode($dataArr[5],true)); $request->setParams(json_decode($dataArr[7],true)); $attach = json_decode($dataArr[8],true); } else { //非泛化呼叫 $request->setMethod($methodName); $paramTypes = json_decode($dataArr[4], true); if ($paramTypes == "") { //呼叫沒有引數的方法 $request->setTypes(NULL); $request->setParams(NULL); $attach = json_decode($dataArr[5],true); } else { $typeArr = explode(";", $paramTypes); $typeArrLen = count($typeArr); $request->setParamNum($typeArrLen - 1); $params = array(); for ($i = 0; $i < $typeArrLen - 1; $i++) { $params[$i] = json_decode($dataArr[5 + $i],true); } $request->setParams($params); $attach = json_decode($dataArr[5 + ($typeArrLen - 1)],true); } } $request->setAttach($attach); if (array_key_exists('group', $attach)) { $request->setGroup($attach['group']); } return $request; } } return false; } //組包回覆包部分 public function packResponse(DubboResponse &$response) { if ($response->getStatus() != DubboResponse::OK) { $resData = json_encode($response->getErrorMsg()) ; } else { if($response->getErrorMsg() != NULL && $response->getErrorMsg() != ""){ $resData = json_encode(self::RESPONSE_WITH_EXCEPTION) . PHP_EOL . json_encode($response->getErrorMsg()); }else if ($response->getResult() == NULL) { $resData = json_encode(self::RESPONSE_NULL_VALUE); } else { $resData = json_encode(self::RESPONSE_VALUE) . PHP_EOL . json_encode($response->getResult()); } } $resData = $resData.PHP_EOL; $upper = ($response->getSn() & self::UPPER_MASK) >> 32; $lower = $response->getSn() & self::LOWER_MASK; $flag = self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON; if ($response->isHeartbeatEvent()) { $flag |= self::FLAG_HEARTBEAT_EVENT; } $out = pack("n1C1C1N1N1N1", self::DUBBO_PROTOCOL_MAGIC, $flag, $response->getStatus(), $upper, $lower, strlen($resData)); return $out . $resData; } //判斷是否是業務回包,這裡判斷邏輯也比較簡單,非心跳類的包都當做業務包處理。 public function isNormalResponse(DubboResponse $response) { return !($response->isHeartbeatEvent()); } //判斷是否是業務請求包 public function isNormalRequest(DubboRequest $request) { return !($request->isHeartbeatEvent()); } public function isOneWayRequest(DubboRequest $request) { return !($request->isTwoWay()); } //心跳請求包 public function isHearBeatRequest(DubboRequest $request) { return $request->isHeartbeatEvent(); } //心跳回復包 public function isHearBeatResponse(DubboResponse $response) { return $response->isHeartbeatEvent(); } //請求在佇列中的時間,這個主要是做監控用的 public static function getReqInQueueTime(DubboRequest $request) { $ret = 0; if (!empty($request->reqInfo)) { $ret = isset($request->reqInfo['inqueue_time']) ? $request->reqInfo['inqueue_time'] : 0; } return $ret; } }