1. 程式人生 > >RTMP推流及協議學習

RTMP推流及協議學習

前期準備

瞭解RTMP定義

RTMP是Real Time Messaging Protocol(實時訊息傳輸協議)的首字母縮寫。該協議基於TCP,是一個協議族,包括RTMP基本協議及RTMPT/RTMPS/RTMPE等多種變種。RTMP是一種設計用來進行實時資料通訊的網路協議,主要用來在Flash/AIR平臺和支援RTMP協議的流媒體/互動伺服器之間進行音視訊和資料通訊。

準備RTMPDump中的librtmp

  • 下載
    在librtmp的官網上面可以下載:http://rtmpdump.mplayerhq.hu,如果觀望無法訪問,也可以在GitHub上面找到相應的程式碼。最新的版本是:rtmpdump-2.3.tgz。
  • 編譯
    解壓rtmpdump-2.3.tgz,可以檢視README來獲取編譯方法:

To compile type “make” with SYS=, e.g.
make SYS=posix
for Linux, MacOSX, Unix, etc. or
make SYS=mingw
for Windows.
You can cross-compile for other platforms using the CROSS_COMPILE variable:
make CROSS_COMPILE=arm-none-linux- INC=-I/my/cross/includes

編譯完成以後,可以在librtmp目錄下面找到librtmp.a的庫檔案。在需要使用librtmp的工程裡面,將這個lib連結進去就可以了。

使用openssl中的libssl+libcrypto

  • 下載
    在OpenSSL的官網上面可以獲取最新的OpenSSL的原始碼。https://www.openssl.org/source/。最新版本的是:1.1.0e版本。
  • 編譯
    編譯openssl很簡單,如果在linux上面,直接./Configure + make 就可以完成。如果要交叉編譯,在Configure對應平臺以後,需要去修改一下Makefile中的cc ar prefix相關的變數。

./Configure android-armv7 修改makefile cc ar prefix…. (edited)

編譯完成以後,我們在libs目錄可以看到libssl.a和libcrypto.a兩個lib。在rtmp push的時候,需要這兩個libs的支援。

推流工作

整體框架圖

Created with Raphaël 2.1.0Streaming from RTSPGet Audio/Video frameConvert frameRTMP push

使用libtrmp提供的API

librtmp提供了推流的API,可以在rtmp.h檔案中檢視所有API。我們只需要使用常用的幾個API就可以將streaming推送到伺服器。
- RTMP_Init()//初始化結構體
- RTMP_Free()
- RTMP_Alloc()
- RTMP_SetupURL()//設定rtmp server地址
- RTMP_EnableWrite()//開啟可寫選項,設定為推流狀態
- RTMP_Connect()//建立NetConnection
- RTMP_Close()//關閉連線
- RTMP_ConnectStream()//建立NetStream
- RTMP_DeleteStream()//刪除NetStream
- RTMP_SendPacket()//傳送資料

流程圖

Created with Raphaël 2.1.0StartRTMP_Init()RTMP_Alloc()RTMP_Setup()RTMP_EnableWrite()RTMP_Connect()RTMP_ConnectStream()RTMP_SendPacket()End

將streaming封裝成為RTMP格式

在傳送第一幀Audio和Video的時候,需要將Audio和Video的資訊封裝成為RTMP header,傳送給rtmp server。
Audio頭有4位元組,包含:頭部標記0xaf 0x00、 profile、channel、bitrate 資訊。
Video頭有16位元組,包含IFrame、PFrame、AVC標識,除此之外,還需要將sps和pps放在header 裡面。
RTMP協議定義了message Type,其中Type ID為8,9的訊息分別用於傳輸音訊和視訊資料:

#define RTMP_PACKET_TYPE_AUDIO 0x08
#define RTMP_PACKET_TYPE_VIDEO 0x09
  • Audio 格式封裝的原始碼:
    AAC header packet:
body = (unsigned char *)malloc(4 + size);
memset(body, 0, 4);
body[0] = 0xaf;
body[1] = 0x00;

switch (profile){
 case 0:
    body[2]|=(1<<3);//main
    break;
 case 1:
    body[2]|=(1<<4);//LC
    break;
 case 2:
    body[2]|=(1<<3);//SSR
    body[2]|=(1<<4);
    break;
 default:
    ;
}
switch(this->channel){
 case 1:
    body[3]|=(1<<3);//channel1
    break;
 case 2:
    body[3]|=(1<<4);//channel2
    break;
 default:
    ;
}
switch(this->rate){
 case 48000:
    body[2]|=(1);
    body[3]|=(1<<7);
    break;
 case 44100:
    body[2]|=(1<<1);
    break;
 case 32000:
    body[2]|=(1<<1);
    body[3]|=(1<<7);
    break;
 default:
    ;
}
sendPacket(RTMP_PACKET_TYPE_AUDIO, body, 4, 0);
free(body);
  • Video 格式封裝的原始碼:
    H264 header packet:
body = (unsigned char *)malloc(16 + sps_len + pps_len);
this->videoFist = false;

memset(body, 0, 16 + sps_len + pps_len);
body[i++] = 0x17;   // 1: IFrame, 7: AVC
                    // AVC Sequence Header
body[i++] = 0x00;
body[i++] = 0x00;
body[i++] = 0x00;
body[i++] = 0x00;

// AVCDecoderConfigurationRecord
body[i++] = 0x01;
body[i++] = sps[1];
body[i++] = sps[2];
body[i++] = sps[3];
body[i++] = 0xff;
body[i++] = 0xe1;
body[i++] = (sps_len >> 8) & 0xff;
body[i++] = sps_len & 0xff;
for (size_t j = 0; j < sps_len; j++)
{
    body[i++] = sps[j];
}
body[i++] = 0x01;
body[i++] = (pps_len >> 8) & 0xff;
body[i++] = pps_len & 0xff;
for (size_t j = 0; j < pps_len; j++)
{
    body[i++] = pps[j];
}
sendPacket(RTMP_PACKET_TYPE_VIDEO, body, i, nTimeStamp);

free(body);

只有第一幀Audio和第一幀video才需要傳送header資訊。之後就直接傳送幀資料。
傳送Audio的時候,只需要在資料幀前面加上2 byte的header資訊:

spec_info[0] = 0xAF;
spec_info[1] = 0x01;

傳送Video的時候,需要在header裡面標識出I P幀的資訊,以及視訊幀的長度資訊:

body = (unsigned char *)malloc(9 + size);
memset(body, 0, 9);
i = 0;
if (bIsKeyFrame== 0) {
    body[i++] = 0x17;   // 1: IFrame, 7: AVC
}
else {
    body[i++] = 0x27;   // 2: PFrame, 7: AVC
}
// AVCVIDEOPACKET
body[i++] = 0x01;
body[i++] = 0x00;
body[i++] = 0x00;
body[i++] = 0x00;

// NALUs
body[i++] = size >> 24 & 0xff;
body[i++] = size >> 16 & 0xff;
body[i++] = size >> 8 & 0xff;
body[i++] = size & 0xff;
memcpy(&body[i], data, size);

進階

RTMP client與RTMP server互動流程

  • 簡介
    播放一個RTMP協議的流媒體需要經過:握手、建立連結、建立流、播放/傳送四個步驟。握手成功之後,需要在建立連結階段去建立客戶端和伺服器之間的“網路連結”。建立流階段用於建立客戶端和伺服器之間的“網路流”。播放階段用於傳輸音視訊資料。
  • 握手(HandsShake)
    伺服器和客戶端需要傳送大小固定的三個資料塊,步驟如下:
    1. 握手開始於客戶端傳送C0、C1塊。伺服器收到C0或C1後傳送S0和S1。
    2. 當客戶端收齊S0和S1後,開始傳送C2。當伺服器收齊C0和C1後,開始傳送S2。
    3. 當客戶端和伺服器分別收到S2和C2後,握手完成。
      使用雷霄驊所畫的握手互動圖|center|450*0
  • 建立連結(NetConnnet)
    1. 客戶端傳送命令訊息中的“連線”(connect)到伺服器,請求與一個服務應用例項建立連線。
    2. 伺服器接收到連線命令訊息後,傳送確認視窗大小(Window Acknowledgement Size)協議訊息到客戶端,同時連線到連線命令中提到的應用程式。
    3. 伺服器傳送設定頻寬()協議訊息到客戶端。
    4. 客戶端處理設定頻寬協議訊息後,傳送確認視窗大小(Window Acknowledgement Size)協議訊息到伺服器端。
    5. 伺服器傳送使用者控制訊息中的“流開始”(Stream Begin)訊息到客戶端。
    6. 伺服器傳送命令訊息中的“結果”(_result),通知客戶端連線的狀態。
      建立連線-雷霄驊|center|500*0
  • 建立流(NetStream)
    1. 客戶端傳送命令訊息中的“建立流”(createStream)命令到伺服器端。
    2. 伺服器端接收到“建立流”命令後,傳送命令訊息中的“結果”(_result),通知客戶端流的狀態。
      建立流-雷霄驊|center|450*0
  • 播放(Play)
    1. 客戶端傳送命令訊息中的“播放”(play)命令到伺服器。
    2. 接收到播放命令後,伺服器傳送設定塊大小(ChunkSize)協議訊息。
    3. 伺服器傳送使用者控制訊息中的“streambegin”,告知客戶端流ID。
    4. 播放命令成功的話,伺服器傳送命令訊息中的“響應狀態” NetStream.Play.Start & NetStream.Play.reset,告知客戶端“播放”命令執行成功。
    5. 在此之後伺服器傳送客戶端要播放的音訊和視訊資料。
      播放|center|480*0
  • 傳送(send)
    send streaming

RTMPDump原始碼分析

握手(HandsShake)

static int HandShake(RTMP * r, int FP9HandShake);

HandShake函式在:/rtmp/rtmplib/handshack.h中。
./rtmp.c:69:#define RTMP_SIG_SIZE 1536

/*client HandShake*/
 695 static int HandShake(RTMP * r, int FP9HandShake){
 709 uint8_t clientbuf[RTMP_SIG_SIZE + 4], *clientsig=clientbuf+4;

/*C0 欄位已經寫入clientsig*/
 721 if (encrypted){  
 722      clientsig[-1] = 0x06; /* 0x08 is RTMPE as well */  
 723      offalg = 1;  
 724 }else  
    //0x03代表RTMP協議的版本(客戶端要求的)  
    //陣列竟然能有“-1”下標,因為clientsig指向的是clientbuf+4,所以不存在非法地址
    //C0中的欄位(1B)  
 725   clientsig[-1] = 0x03;
 /*準備C1欄位過程略去,C1欄位的資料寫入clientsig中, clientsig的大小為1536個位元組*/

/*1st part of shakehand .......*/
/*C ------- S*/
/*c0 C1-->   */
/*  <-- S0 S1*/
/*C2 -->     */
/*send clientsig C0 和 C1一起傳送*/
 814   if (!WriteN(r, (char *)clientsig-1, RTMP_SIG_SIZE + 1))
 815     return FALSE;

/*get server response->read type, if get response type not match handshake failed*/
 817   if (ReadN(r, (char *)&type, 1) != 1)  /* 0x03 or 0x06 */
 818     return FALSE; /*encrypt type = 0x06*/

/*get server response->read serversig*/
 826 if (ReadN(r, (char *)serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE)
 827     return FALSE;

/*如果是加密協議,則需要校驗收到的serversig是否和傳送的匹配,如果沒有加密則直接傳送收到的serversig*/
 968   if (!WriteN(r, (char *)reply, RTMP_SIG_SIZE))
 969     return FALSE;

/*2nd part of shakehand .....*/
/*C ----- S*/
/*   <-- S2*/
 972   if (ReadN(r, (char *)serversig, RTMP_SIG_SIZE) != RTMP_SIG_SIZE)
 973     return FALSE;
/* compare info between serversig and clientsig*/
 1060  if (memcmp(serversig, clientsig, RTMP_SIG_SIZE) != 0)
/*如果相等,則握手成功*/
}

建立連結(NetConnnet)

RTMP_Connect(RTMP *r, RTMPPacket *cp);

建立連線的程式碼位於:librtmp/rtmp.c中,定義函式:RTMP_Connect()。RTMP_Conncet()裡面又分別呼叫了兩個函式:RTMP_Connect0(), RTMP_Connect1()。RTMP_Connect0()主要進行的是socket的連線,RTMP_Connct1()進行的是RTMP相關的連線動作。

1031 int RTMP_Connect(RTMP *r, RTMPPacket *cp)
1032 {
1033   struct sockaddr_in service;
1034   if (!r->Link.hostname.av_len)
1035     return FALSE;
1036 
1037   memset(&service, 0, sizeof(struct sockaddr_in));
1038   service.sin_family = AF_INET;
1039 
1040   if (r->Link.socksport)
1041     {
1042       /* Connect via SOCKS */
1043       if (!add_addr_info(&service, &r->Link.sockshost, r->Link.socksport))
1044   return FALSE;
1045     }
1046   else
1047     {
1048       /* Connect directly */
1049       if (!add_addr_info(&service, &r->Link.hostname, r->Link.port))
1050   return FALSE;
1051     }
1052 
1053   if (!RTMP_Connect0(r, (struct sockaddr *)&service))
1054     return FALSE;
1055 
1056   r->m_bSendCounter = TRUE;
1057 
1058   return RTMP_Connect1(r, cp);
1059 }
int RTMP_Connect0(RTMP r, struct sockaddr service);

RTMP_Connect0函式分析:

 905 int RTMP_Connect0(RTMP *r, struct sockaddr * service){
     /*建立socket*/
 913   r->m_sb.sb_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
     /*通過socket連線到伺服器地址*/
 916   if (connect(r->m_sb.sb_socket, service, sizeof(struct sockaddr)) < 0)
     /*如果指定了socket埠到,則進行socks Negotiate*/
 928     if (!SocksNegotiate(r)){}
     /*連線成功之後,返回TRUE*/
 956   return TRUE;
    }
int RTMP_Connect1(RTMP *r, RTMPPacket *cp);

RTMP_Connect1函式分析:
根據不同的傳輸協議,選擇傳送資料的方式。之後進行HandShake,最後呼叫SendConnectPacket()送Connect packet。

int
RTMP_Connect1(RTMP *r, RTMPPacket *cp)
{
/*if crypto use tls_conncet*/
  if (r->Link.protocol & RTMP_FEATURE_SSL){
#if defined(CRYPTO) && !defined(NO_SSL)
      TLS_client(RTMP_TLS_ctx, r->m_sb.sb_ssl);
      TLS_setfd(r->m_sb.sb_ssl, r->m_sb.sb_socket);
      if (TLS_connect(r->m_sb.sb_ssl) < 0){...}
#else
      return FALSE;
#endif
    }
  /*if no crypto, use http post*/  
  if (r->Link.protocol & RTMP_FEATURE_HTTP){
      HTTP_Post(r, RTMPT_OPEN, "", 1);
      if (HTTP_read(r, 1) != 0){...}
        ...
    }
    /*進行HandShake*/
  if (!HandShake(r, TRUE)){...}
    /*握手成功之後,傳送Connect Packet*/
  if (!SendConnectPacket(r, cp)){...}
  return TRUE;
}

SendConnectPacket() 裡面主要對RTMP資訊進行打包,然後呼叫RTMP_SendPacket函式,將內容傳送出去。

static int
SendConnectPacket(RTMP *r, RTMPPacket *cp)
{
  RTMPPacket packet;
  char pbuf[4096], *pend = pbuf + sizeof(pbuf);
  char *enc;

  if (cp)
    return RTMP_SendPacket(r, cp, TRUE);

  packet.m_nChannel = 0x03; /* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
  packet.m_packetType = RTMP_PACKET_TYPE_INVOKE;
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_connect);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_OBJECT;

  /*encrypto 部分省略 主要就是呼叫AMF函式進行*/
  ...

  packet.m_nBodySize = enc - packet.m_body;

  return RTMP_SendPacket(r, &packet, TRUE);
}

建立流(NetStream)

RTMP_ConnectStream()函式主要用於在NetConnection基礎上面建立一個NetStream。

int RTMP_ConnectStream(RTMP *r, int seekTime);
int RTMP_ConnectStream(RTMP *r, int seekTime)
{
  RTMPPacket packet = { 0 };
  /* seekTime was already set by SetupStream / SetupURL.
   * This is only needed by ReconnectStream.
   */
  if (seekTime > 0)
    r->Link.seekTime = seekTime;

  r->m_mediaChannel = 0;

  // 接收到的實際上是塊(Chunk),而不是訊息(Message),因為訊息在網上傳輸的時候要分割成塊.
  while (!r->m_bPlaying && RTMP_IsConnected(r) && RTMP_ReadPacket(r, &packet)){
      // 一個訊息可能被封裝成多個塊(Chunk),只有當所有塊讀取完才處理這個訊息包
      if (RTMPPacket_IsReady(&packet)){
        if (!packet.m_nBodySize)
          continue;
        // 讀取到flv資料包,則繼續讀取下一個包
        if ((packet.m_packetType == RTMP_PACKET_TYPE_AUDIO) ||
            (packet.m_packetType == RTMP_PACKET_TYPE_VIDEO) ||
            (packet.m_packetType == RTMP_PACKET_TYPE_INFO)){
            RTMP_Log(RTMP_LOGWARNING, "Received FLV packet before play()! Ignoring.");
            RTMPPacket_Free(&packet);
            continue;
          }
        RTMP_ClientPacket(r, &packet);// 處理收到的資料包
        RTMPPacket_Free(&packet);// 處理完畢,清除資料
      }
    }
  return r->m_bPlaying;
}

簡單的一個邏輯判斷,重點在while迴圈裡。首先,必須要滿足三個條件。其次,進入迴圈以後只有出錯或者建立流(NetStream)完成後,才能退出迴圈。
有兩個重要的函式:

int RTMP_ReadPacket(RTMP *r, RTMPPacket *packet);

塊格式:

basic header(1-3位元組) chunk msg header(0/3/7/11位元組) Extended Timestamp(0/4位元組) chunk data

訊息格式:

timestamp(3位元組) msg length(3位元組) msg type id(1位元組,小端) msg stream id(4位元組)
/**
 * @brief 讀取接收到的訊息塊(Chunk),存放在packet中. 對接收到的訊息不做任何處理。 塊的格式為:
 *
 *   | basic header(1-3位元組)| chunk msg header(0/3/7/11位元組) | Extended Timestamp(0/4位元組) | chunk data |
 *
 *   其中 basic header還可以分解為:| fmt(2位) | cs id (3 <= id <= 65599) |
 *   RTMP協議支援65597種流,ID從3-65599。ID 0、1、2作為保留。
 *      id = 0,表示ID的範圍是64-319(第二個位元組 + 64);
 *      id = 1,表示ID範圍是64-65599(第三個位元組*256 + 第二個位元組 + 64);
 *      id = 2,表示低層協議訊息。
 *   沒有其他的位元組來表示流ID。3 -- 63表示完整的流ID。
 *
 *    一個完整的chunk msg header 還可以分解為 :
 *     | timestamp(3位元組) | msg length(3位元組) | msg type id(1位元組,小端) | msg stream id(4位元組) |
 */
int  
RTMP_ReadPacket(RTMP *r, RTMPPacket *packet)  
{
  uint8_t hbuf[RTMP_MAX_HEADER_SIZE] = { 0 };
  // Chunk Header長度最大值為3 + 11 + 4 = 18
  char *header = (char *)hbuf;
  // header指向從socket接收到的資料
  int   nSize, hSize, nToRead, nChunk;
  // nSize是塊訊息頭長度,hSize是塊頭長度
  int   didAlloc = FALSE;

  RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d", __FUNCTION__, r->m_sb.sb_socket);

  // 讀取1個位元組存入 hbuf[0]
  if (ReadN(r, (char *)hbuf, 1) == 0)
  {
    RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header", __FUNCTION__);
    return FALSE;
  }

  packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
  // 塊型別fmt
  packet->m_nChannel    = (hbuf[0] & 0x3f);
  // 塊流ID(2 - 63)
  header++;

  // 塊流ID第一個位元組為0,表示塊流ID佔2個位元組,表示ID的範圍是64-319(第二個位元組 + 64)
  if (packet->m_nChannel == 0)
  {
    // 讀取接下來的1個位元組存放在hbuf[1]中
    if (ReadN(r, (char *)&hbuf[1], 1) != 1)
    {
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 2nd byte", __FUNCTION__);
      return FALSE;
    }

    // 塊流ID = 第二個位元組 + 64 = hbuf[1] + 64
    packet->m_nChannel = hbuf[1];
    packet->m_nChannel += 64;
    header++;
  }
  // 塊流ID第一個位元組為1,表示塊流ID佔3個位元組,表示ID範圍是64 -- 65599(第三個位元組*256 + 第二個位元組 + 64)
  else if (packet->m_nChannel == 1){
    int tmp;
    // 讀取2個位元組存放在hbuf[1]和hbuf[2]中
    if (ReadN(r, (char *)&hbuf[1], 2) != 2)
    {
      RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header 3nd byte", __FUNCTION__);
      return FALSE;
    }

    // 塊流ID = 第三個位元組*256 + 第二個位元組 + 64
    tmp = (hbuf[2] << 8) + hbuf[1];
    packet->m_nChannel = tmp + 64;
    RTMP_Log(RTMP_LOGDEBUG, "%s, m_nChannel: %0x", __FUNCTION__, packet->m_nChannel);
    header += 2;
  }

  // 塊訊息頭(ChunkMsgHeader)有四種類型,大小分別為11、7、3、0,每個值加1 就得到該陣列的值
  // 塊頭 = BasicHeader(1-3位元組) + ChunkMsgHeader + ExtendTimestamp(0或4位元組)
  nSize = packetSize[packet->m_headerType];

  // 塊型別fmt為0的塊,在一個塊流的開始和時間戳返回的時候必須有這種塊
  // 塊型別fmt為1、2、3的塊使用與先前塊相同的資料
  // 關於塊型別的定義,可參考官方協議:流的分塊 --- 6.1.2節
  if (nSize == RTMP_LARGE_HEADER_SIZE)
  /* if we get a full header the timestamp is absolute */
  {
    packet->m_hasAbsTimestamp = TRUE;
    // 11個位元組的完整ChunkMsgHeader的TimeStamp是絕對時間戳
  }else if (nSize < RTMP_LARGE_HEADER_SIZE){
    /* using values from the last message of this channel */
    if (r->m_vecChannelsIn[packet->m_nChannel])
    memcpy(packet, r->m_vecChannelsIn[packet->m_nChannel], sizeof(RTMPPacket));
  }

  nSize--;
  // 真實的ChunkMsgHeader的大小,此處減1是因為前面獲取包型別的時候多加了1

  // 讀取nSize個位元組存入header
  if (nSize > 0 && ReadN(r, header, nSize) != nSize){
    RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet header. type: %x", 
     __FUNCTION__, (unsigned int)hbuf[0]);
    return FALSE;
  }

  // 目前已經讀取的位元組數 = chunk msg header + basic header
  hSize = nSize + (header - (char *)hbuf);
  // chunk msg header為11、7、3位元組,fmt型別值為0、1、2
  if (nSize >= 3){
    // 首部前3個位元組為timestamp
    packet->m_nTimeStamp = AMF_DecodeInt24(header);

    /* RTMP_Log(RTMP_LOGDEBUG, "%s, reading RTMP packet chunk on channel %x, 
    headersz %i, timestamp %i, abs timestamp %i", __FUNCTION__, 
    packet.m_nChannel, nSize, packet.m_nTimeStamp, packet.m_hasAbsTimestamp); */

    // chunk msg header為11或7位元組,fmt型別值為0或1
    if (nSize >= 6)
    {
      packet->m_nBodySize = AMF_DecodeInt24(header + 3);
      packet->m_nBytesRead = 0;
      RTMPPacket_Free(packet);

      if (nSize > 6)
      {
        packet->m_packetType = header[6];
        // msg type id
        if (nSize == 11)
          packet->m_nInfoField2 = DecodeInt32LE(header + 7); // msg stream id,小端位元組序
      }
    }

  // Extend Tiemstamp,佔4個位元組
    if (packet->m_nTimeStamp == 0xffffff){
      if (ReadN(r, header + nSize, 4) != 4)
      {
        RTMP_Log(RTMP_LOGERROR, "%s, failed to read extended timestamp", __FUNCTION__);
        return FALSE;
      }
      packet->m_nTimeStamp = AMF_DecodeInt32(header + nSize);
      hSize += 4;
    }
  }

  RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)hbuf, hSize);

  // 如果訊息長度非0,且訊息資料緩衝區為空,則為之申請空間
  if (packet->m_nBodySize > 0 && packet->m_body == NULL){
    if (!RTMPPacket_Alloc(packet, packet->m_nBodySize)){
      RTMP_Log(RTMP_LOGDEBUG, "%s, failed to allocate packet", __FUNCTION__);
      return FALSE;
    }
    didAlloc = TRUE;
    packet->m_headerType = (hbuf[0] & 0xc0) >> 6;
  }

  // 剩下的訊息資料長度如果比塊尺寸大,則需要分塊,否則塊尺寸就等於剩下的訊息資料長度
  nToRead = packet->m_nBodySize - packet->m_nBytesRead;
  nChunk = r->m_inChunkSize;
  if (nToRead < nChunk)
  nChunk = nToRead;

  /* Does the caller want the raw chunk? */
  if (packet->m_chunk){
    packet->m_chunk->c_headerSize = hSize;
    // 塊頭大小
    memcpy(packet->m_chunk->c_header, hbuf, hSize);
    // 填充塊頭資料
    packet->m_chunk->c_chunk = packet->m_body + packet->m_nBytesRead;
    // 塊訊息資料緩衝區指標
    packet->m_chunk->c_chunkSize = nChunk;
    // 塊大小
  }

  // 讀取一個塊大小的資料存入塊訊息資料緩衝區
  if (ReadN(r, packet->m_body + packet->m_nBytesRead, nChunk) != nChunk){
    RTMP_Log(RTMP_LOGERROR, "%s, failed to read RTMP packet body. len: %u",
     __FUNCTION__, packet->m_nBodySize);
    return FALSE;
  }

  RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)packet->m_body + packet->m_nBytesRead, nChunk);

  // 更新已讀資料位元組個數
  packet->m_nBytesRead += nChunk;

  /* keep the packet as ref for other packets on this channel */
  // 將這個包作為通道中其他包的參考
  if (!r->m_vecChannelsIn[packet->m_nChannel])
   r->m_vecChannelsIn[packet->m_nChannel] = malloc(sizeof(RTMPPacket));
  memcpy(r->m_vecChannelsIn[packet->m_nChannel], packet, sizeof(RTMPPacket));

  // 包讀取完畢
  if (RTMPPacket_IsReady(packet)){
    /* make packet's timestamp absolute,絕對時間戳 = 上一次絕對時間戳 + 時間戳增量 */
    if (!packet->m_hasAbsTimestamp)
      /* timestamps seem to be always relative!! */
      packet->m_nTimeStamp += r->m_channelTimestamp[packet->m_nChannel];

    // 當前絕對時間戳儲存起來,供下一個包轉換時間戳使用
    r->m_channelTimestamp[packet->m_nChannel] = packet->m_nTimeStamp;

    /* reset the data from the stored packet.  we keep the header since we may use it later if 
                       a new packet for this channel arrives and requests to re-use some info (small packet header) */
    // 重置儲存的包。保留塊頭資料,因為通道中新到來的包(更短的塊頭)可能需要使用前面塊頭的資訊.
    r->m_vecChannelsIn[packet->m_nChannel]->m_body = NULL;
    r->m_vecChannelsIn[packet->m_nChannel]->m_nBytesRead = 0;
    r->m_vecChannelsIn[packet->m_nChannel]->m_hasAbsTimestamp = FALSE; // can only be false if we reuse header
  }
  else{
  packet->m_body = NULL;
  /* so it won't be erased on free */
  }

  return TRUE;
}
int ReadN(RTMP *r, char *buffer, int n);
/**
 * @brief 從HTTP或SOCKET中讀取n個數據存放在buffer中.
 */
static int ReadN(RTMP *r, char *buffer, int n)
{
    int  nOriginalSize = n;
    int  avail;
    char *ptr;

    r->m_sb.sb_timedout = FALSE;
    #ifdef _DEBUG
    memset(buffer, 0, n);
    #endif

    ptr = buffer;
    while (n > 0){
    int nBytes = 0, nRead;
    if (r->Link.protocol & RTMP_FEATURE_HTTP)
                   {
    while (!r->m_resplen)
    {
        if (r->m_sb.sb_size < 144)
        {
            if (!r->m_unackd)
            HTTP_Post(r, RTMPT_IDLE, "", 1);
            if (RTMPSockBuf_Fill(r, &r->m_sb) < 1){
                if (!r->m_sb.sb_timedout)
                RTMP_Close(r);
                return 0;
            }
        }

        if (HTTP_read(r, 0) == -1){
            RTMP_Log(RTMP_LOGDEBUG, "%s, No valid HTTP response found", __FUNCTION__);
            RTMP_Close(r);
            return 0;
        }
    }

    if (r->m_resplen && !r->m_sb.sb_size)
    RTMPSockBuf_Fill(r, &r->m_sb);

    avail = r->m_sb.sb_size;
    if (avail > r->m_resplen)
        avail = r->m_resplen;
    }else{
        avail = r->m_sb.sb_size;
        if (avail == 0){
            if (RTMPSockBuf_Fill(r, &r->m_sb) < 1){
                if (!r->m_sb.sb_timedout)
                    RTMP_Close(r);
                return 0;
            }
            avail = r->m_sb.sb_size;
        }
    }

    nRead = ((n < avail) ? n : avail);
    if (nRead > 0){
        memcpy(ptr, r->m_sb.sb_start, nRead);
        r->m_sb.sb_start += nRead;
        r->m_sb.sb_size -= nRead;
        nBytes = nRead;
        r->m_nBytesIn += nRead;
        if (r->m_bSendCounter && r->m_nBytesIn > ( r->m_nBytesInSent + r->m_nClientBW / 10))
        if (!SendBytesReceived(r))
        return FALSE;
    }
    /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d bytes\n", __FUNCTION__, nBytes); */
    //#ifdef _DEBUG
    //      fwrite(ptr, 1, nBytes, netstackdump_read);
    //#endif

    if (nBytes == 0){
        RTMP_Log(RTMP_LOGDEBUG, "%s, RTMP socket closed by peer", __FUNCTION__);
        /*goto again; */
        RTMP_Close(r);
        break;
    }

    if (r->Link.protocol & RTMP_FEATURE_HTTP){
        r->m_resplen -= nBytes;
        n -= nBytes;
        ptr += nBytes;
    }

    return nOriginalSize - n;
}
int RTMPSockBuf_Fill(RTMP *r, RTMPSockBuf *sb);
/**
 * @brief 呼叫Socket程式設計中的recv()函式,接收資料
 */
int RTMPSockBuf_Fill(RTMP *r, RTMPSockBuf *sb)
{
    int nBytes;
    if  (!sb->sb_size)
        sb->sb_start = sb->sb_buf;

    while (1)
    {
        // 緩衝區長度:總長-未處理位元組-已處理位元組  
        // |-----已處理--------|-----未處理--------|---------緩衝區----------|  
        // sb_buf        sb_start    sb_size  
        nBytes = sizeof(sb->sb_buf) - sb->sb_size - (sb->sb_start - sb->sb_buf);
        {
            // int recv( SOCKET s, char * buf, int len, int flags);  
            // s    :一個標識已連線套介面的描述字。  
            // buf  :用於接收資料的緩衝區。   
            // len  :緩衝區長度。  
            // flags:指定呼叫方式。  
            // 從sb_start(待處理的下一位元組) + sb_size()還未處理的位元組開始buffer為空,可以儲存
            nBytes = r->m_sock.recv(sb->sb_socket, sb->sb_start + sb->sb_size, nBytes, 0);
        }

        if (nBytes != -1){
            // 未處理的位元組又多了
            sb->sb_size += nBytes;
        }else{
            int sockerr = r->m_sock.getsockerr();
            RTMP_Log(RTMP_LOGDEBUG, "%s, recv returned %d. GetSockError(): %d (%s)", 
            __FUNCTION__, nBytes, sockerr, strerror(sockerr));
            if (sockerr == EINTR && !RTMP_ctrlC)
                continue;

            if (sockerr == EWOULDBLOCK || sockerr == EAGAIN){
                sb->sb_timedout = TRUE;
                nBytes = 0;
            }
        }
        break;
    }
    return nBytes;
}

參考資料