1. 程式人生 > >媒體轉發伺服器-TCP 在 EPOLL 模型中的注意細節

媒體轉發伺服器-TCP 在 EPOLL 模型中的注意細節

前段時間在公司開發了基於udp的流媒體轉發伺服器,在公網udp轉發ts,花屏比較嚴重。課下之餘寫了epoll-tcp模型的轉發伺服器作為測試,比較一下效果,其間遇到不少問題,在此做個筆記。程式碼最後附上

一、業務需求:終端錄製視訊(android編碼h264)  客戶端請求視訊  伺服器負責轉發

因為是測試用沒有考慮配置檔案,負載均衡,安全認證等

二、協議指定

1、音視訊 協議定義:總長度不大於1500 bytes, 終端啟動後就進行傳送資料

長度 short

終端id  int

型別 byte

包序號 short

幀型別 byte

資料

2 byte

4 byte

1 byte

2 byte

1 byte


型別標誌:0xF0  視訊

                    0xF1  音訊

                    0x00  指令

2、指令協議  用於客戶端請求視訊

長度 short

終端id  int

型別 byte

指令 int

2 byte

4 byte

1 byte

4 byte


 指令:0x0010  請求報活

             0x0011   停止請求

三、資料結構關聯

1、會話管理

struct Connection       //終端、客戶端 會話管理
{

    int          term;          // 終端id, 用於客戶端時表示請求目標的id
    int          sock;         //tcp

    time_t   tm;             //上次活動時間

    int         bufsize;       //用於epoll 接收
    int         wantsize;
    int         recvsize;
    char    *recvbuf;

    CBufQue bufque;   //迴圈佇列,客戶端用於快取要傳送的資料

}

typedef map<int, Connection*> MAPConnection;  // socket -Connection   //儲存會話

typedef set<int> SETSocket;                                    //socket

typedef map<int, SETSocket*> MAPTermClient;  // 一個終端可以轉發到多個客戶端,儲存客戶端的key

2、流程管理

     執行緒1-terminal:接收終端視訊 : epoll  ET模式  非阻塞socket

     執行緒2-media:將接受的視訊資料分發到對應的客戶端傳送緩衝佇列

     執行緒3-client:接收客戶端的請求, 分發視訊資料,epoll LT模式 非阻塞 socket

     執行緒4-cmd:處理客戶端的指令,請求停止等 

四、程式碼注意細節

1、socket 設定非堵塞

bool NetCommon::SetSockBlock(const int &fd, bool block)
{
    if(block)
    {
        int flags = fcntl(fd, F_GETFL, 0);
        fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
    }
    else
    {
        int flags = fcntl(fd, F_GETFL, 0);
        fcntl(fd, F_SETFL, flags|O_NONBLOCK);
    }
    return true;
}

2、socket 設定SO_REUSEADDR
bool NetCommon::SetReuseAddr(const int &fd, bool reuse)
 {
     int opt = 0;
     if(reuse)
     {
         opt = 1;
     }
     else
     {
         opt = 0;
     }

     if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
     {
         return false;
     }
     return true;
 }

為什麼要設定此項呢?參考  http://blog.csdn.net/liuhongxiangm/article/details/17301311

3、accept 

 LT模式比較清晰,就不說了     

 ET模式下accept存在的問題
 考慮這種情況:多個連線同時到達,伺服器的TCP就緒佇列瞬間積累多個就緒連線,由於是邊緣觸發模式,epoll只會通知一次,accept只處理一個連線,導致TCP就緒佇列中剩下的連線都得不到處理。

解決辦法是用while迴圈抱住accept呼叫,處理完TCP就緒佇列中的所有連線後再退出迴圈。如何知道是否處理完就緒佇列中的所有連線呢?accept返回-1並且errno設定為EAGAIN就表示所有連線都處理完。

int ctcpserver::acceptterminal(Connection *pConn)
{
    while(true)
    {
        int newsock = accept(pConn->sock,NULL, NULL);

        if(newsock < 0)
        {
            if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
            {
                break;
            }
            return -1;
        }
        if( newsock > 0)
        {
            NetCommon::SetSockBlock(newsock,false);
            NetCommon::SetReuseAddr(newsock, true);

           if(m_mapConnTerminal.size() > 500)
           {
                close(newsock);
                return 0;
           }

            Connection *newcon = new Connection;
            newcon->sock = newsock;
            newcon->wantsize = 2;
            newcon->recvsize = 0;
            time( &(newcon->tm));

            if(m_epollTerminal.EpollAdd(newsock, EPOLLIN|EPOLLET, newcon) < 0)
            {
                delete newcon;
                newcon = NULL;
                close(newsock);
                return -1;
            }
            m_mapConnTerminal.insert(make_pair<int, Connection*>(newsock,newcon));
        }
    }

    return 0;
}


4、recv

 Epoll ET模式 非阻塞socket,必須獨到沒有資料可讀,此處的設計是每個socket對應一個connection,connection中有一個數據包的長度wantsize,接收完一個數據包之後,會將此資料包放入佇列等待處理,迴圈接收下一個資料包

int ctcpserver::recvn(Connection *pConn)
{
    int iret = 0;
    //wantsize < 2048
    while(pConn->recvsize < pConn->wantsize)
    {
        iret = recv(pConn->sock, pConn->buf+pConn->recvsize,pConn->wantsize-pConn->recvsize, 0);
        if(iret == -1)
        {
            if(errno == EINTR)
            {
                 break;
            }
            else if(errno == EWOULDBLOCK || errno == EAGAIN)
            {
                break;
            }
            else
            {
                return -1;
            }
        }
        if(iret == 0)
        {
            return -1;
        }

        pConn->recvsize += iret;
    }

    time(&(pConn->tm));

    return  pConn->recvsize;
}
此函式是在epoll接到事件後呼叫,協議約定資料包前倆位元組是長度
int ctcpserver::recvtrminal(Connection *pConn)
{
    int iret = 0;
    while(true)
    {
        iret = recvn(pConn);
        if(iret < 0)
        {
            close(pConn->sock);
            m_epollTerminal.EpollDel(pConn->sock,pConn);
            m_mapConnTerminal.erase(pConn->sock);
            delete pConn;
            pConn = NULL;

            return -1; //error
        }
        if(pConn->recvsize != pConn->wantsize) //no data recv
        {
            break;
        }
        else
        {
            if(pConn->wantsize == 2) //接收玩的是資料的長度資訊,去設定資料的接收大小
            {
                pConn->wantsize = *(short*)(pConn->buf);
                if(pConn->wantsize > 2048 || pConn->wantsize < 2) //設計是不大於1500,此處為了相容其他測試
                {
                    close(pConn->sock);
                    m_epollTerminal.EpollDel(pConn->sock,pConn);
                    m_mapConnTerminal.erase(pConn->sock);
                    delete pConn;
                    pConn = NULL;

                    return -1; //something error with data
                }
            }
            else            //接收完一個數據包
            {
                BufNode *pnode = m_bufQueMedia.AllocNode();
                if(pnode != NULL )
                {
                    memset( pnode->pBuf,0,pnode->nMaxLen);
                    pnode->nLen = pConn->recvsize;
                    memcpy(pnode->pBuf, pConn->buf,pConn->recvsize);
                    pnode->sock = pConn->sock;
                    m_bufQueMedia.PushNode(pnode);   //media 執行緒負責處理資料
                    m_semMediaTask.Post();

                }
                pConn->recvsize = 0;
                pConn->wantsize = 2;
            }
        }
    }

    return 0;
}

epoll 迴圈,epoll新增socket時,epoll_eveny關聯的是connection* ,在收到事件是,可以直接取來接收存放資料
int ctcpserver::terminalloop()
{
    int event_count = 0;
    while(true)
    {
        event_count = m_epollTerminal.EpollWait();
        if(event_count < 0)
        {
            if(errno == EINTR)
            {
                continue;
            }
            return 0;
        }
        else if(event_count > 0)
        {
            m_mutexTerminal.Lock();
            for(int index=0; index < event_count; index++)
            {
                Connection *pConn  = (Connection*)(m_epollTerminal.GetEVPtr(index));
                if(pConn->sock == m_terminalsock)
                {
                    if(acceptterminal(pConn) < 0)
                    {
                        return 0;
                    }
                }
                else
                {
                    if( m_epollTerminal.GetEVEvents(index)&EPOLLIN )
                    {
                        if(recvtrminal(pConn) < 0)
                        {
                            continue;
                        }
                    }
                    else if(m_epollTerminal.GetEVEvents(index)&EPOLLOUT)
                    {

                    }
                }
            }
            m_mutexTerminal.UnLock();
        }
    }
    return 0;
}

5、send

這個問題比較謹慎處理,EPOLLOUT 到底在什麼時候觸發??

ET模式稱為邊緣觸發模式,顧名思義,不到邊緣情況,是死都不會觸發的。

EPOLLOUT事件:
EPOLLOUT事件只有在連線時觸發一次,表示可寫,其他時候想要觸發,那你要先準備好下面條件:
1.某次write,寫滿了傳送緩衝區,返回錯誤碼為EAGAIN。
2.對端讀取了一些資料,又重新可寫了,此時會觸發EPOLLOUT。
簡單地說:EPOLLOUT事件只有在不可寫到可寫的轉變時刻,才會觸發一次,所以叫邊緣觸發,這叫法沒錯的!

其實,如果你真的想強制觸發一次,也是有辦法的,直接呼叫epoll_ctl重新設定一下event就可以了,event跟原來的設定一模一樣都行(但必須包含EPOLLOUT),關鍵是重新設定,就會馬上觸發一次EPOLLOUT事件。

EPOLLIN事件:
EPOLLIN事件則只有當對端有資料寫入時才會觸發,所以觸發一次後需要不斷讀取所有資料直到讀完EAGAIN為止。否則剩下的資料只有在下次對端有寫入時才能一起取出來了。
現在明白為什麼說epoll必須要求非同步socket了吧?如果同步socket,而且要求讀完所有資料,那麼最終就會在堵死在阻塞裡。

LT模式 比較明瞭,只要可寫就一直觸發EPOLLOUT

在此測試專案,用了LT模式,但如何避免一直觸發?

當有資料需要傳送時,新增EPOLLOUT監聽事件。當傳送完成後移除此事件

int ctcpserver::sendclient(Connection *pConn)
{
   //
    BufNode *pnode = pConn->bufque.FrontNode(); //
    if(pnode != NULL)
    {
      int  iret =   send(pConn->sock, pnode->pBuf,pnode->nLen,0);
      if(iret <= 0)
       {
          if(errno != EAGAIN || errno == EWOULDBLOCK || errno == EAGAIN)
          {
                 return 0;
          }
          else
          {
              close(pConn->sock);
              m_epollClient.EpollDel(pConn->sock,pConn);
              m_mapConClient.erase(pConn->sock);
              delete pConn;
              pConn = NULL;

        //      cout << "m_mapConClient count:" << m_mapConClient.size() << endl;
              return -1;
          }

       }
      else
      {
          pConn->bufque.PopNode();
          pConn->bufque.FreeNode(pnode);
      }

    }
    if(pConn->bufque.GetUsedNodeCount() == 0)
    {
        m_epollClient.EpollMod(pConn->sock, EPOLLIN, pConn);//移除EPOLLOUT 監聽    }

    return 0;
}


int ctcpserver::mediataskloop()
{
    while (true)
    {
        m_semMediaTask.Wait();
        BufNode *pnode = m_bufQueMedia.PopNode();
        if(pnode == NULL)
        {
            continue;
        }
        //id-client
        int id = *(int*)(pnode->pBuf+2);

        m_mutexTermClient.Lock();
        MAPTermClientIt itTermClient = m_mapTermClient.find(id);
        if(itTermClient == m_mapTermClient.end())
        {
            SETSocket *pSetSock = new  SETSocket;
            pSetSock->clear();
            m_mapTermClient.insert(make_pair<int, SETSocket*>(id,pSetSock));
        }
        else
        {
            SETSocket *pSetSock= itTermClient->second;
            if(!pSetSock->empty())
            {
                SETSocketIt itSock;
                m_mutexClient.Lock();
                for(itSock=pSetSock->begin(); itSock != pSetSock->end(); ++ itSock)
                {
                    int sock = *itSock;
                    MAPConnectionIt itConnClient = m_mapConClient.find(sock);
                    if(itConnClient != m_mapConClient.end())
                    {
                        Connection *pConn = itConnClient->second;
                        BufNode *pnodeClient = pConn->bufque.AllocNode();
                        if(pnodeClient == NULL)
                        {
                            continue;
                        }
                        memcpy(pnodeClient->pBuf, pnode->pBuf, pnode->nLen );
                        pnodeClient->nLen = pnode->nLen;
                        pConn->bufque.PushNode(pnodeClient);
                        m_epollClient.EpollMod(pConn->sock, EPOLLIN|EPOLLOUT, pConn); //新增EPOLLOUT 監聽
                    }
                    else
                    {
                        pSetSock->erase(itSock);
                    }
                }
                m_mutexClient.UnLock();
            }
        }
        m_mutexTermClient.UnLock();

        m_bufQueMedia.FreeNode(pnode);
    }

    return 0;
}

附上程式碼:http://download.csdn.net/detail/liuhongxiangm/6709063