媒體轉發伺服器-TCP 在 EPOLL 模型中的注意細節
前段時間在公司開發了基於udp的流媒體轉發伺服器,在公網udp轉發ts,花屏比較嚴重。課下之餘寫了epoll-tcp模型的轉發伺服器作為測試,比較一下效果,其間遇到不少問題,在此做個筆記。程式碼最後附上
一、業務需求:終端錄製視訊(android編碼h264) 客戶端請求視訊 伺服器負責轉發
因為是測試用沒有考慮配置檔案,負載均衡,安全認證等
二、協議指定
1、音視訊 協議定義:總長度不大於1500 bytes, 終端啟動後就進行傳送資料
長度 short |
終端id int |
型別 byte |
包序號 short |
幀型別 byte |
資料 |
2 byte |
4 byte |
1 byte |
2 byte |
1 byte |
型別標誌:0xF0 視訊
0xF1 音訊
0x00 指令
2、指令協議 用於客戶端請求視訊
長度 short |
終端id int |
型別 byte |
指令 int |
2 byte |
4 byte |
1 byte |
4 byte |
指令:0x0010 請求報活
0x0011 停止請求
三、資料結構關聯
1、會話管理
struct Connection //終端、客戶端 會話管理
{
int term; // 終端id, 用於客戶端時表示請求目標的id
int sock; //tcp
time_t tm; //上次活動時間
int bufsize; //用於epoll 接收
int wantsize;
int recvsize;
char *recvbuf;
CBufQue bufque; //迴圈佇列,客戶端用於快取要傳送的資料
}
typedef map<int, Connection*> MAPConnection; // socket -Connection //儲存會話
typedef set<int> SETSocket; //socket
typedef map<int, SETSocket*> MAPTermClient; // 一個終端可以轉發到多個客戶端,儲存客戶端的key
2、流程管理
執行緒1-terminal:接收終端視訊 : epoll ET模式 非阻塞socket
執行緒2-media:將接受的視訊資料分發到對應的客戶端傳送緩衝佇列
執行緒3-client:接收客戶端的請求, 分發視訊資料,epoll LT模式 非阻塞 socket
執行緒4-cmd:處理客戶端的指令,請求停止等
四、程式碼注意細節
1、socket 設定非堵塞
bool NetCommon::SetSockBlock(const int &fd, bool block)
{
if(block)
{
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
}
else
{
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags|O_NONBLOCK);
}
return true;
}
2、socket 設定SO_REUSEADDR
bool NetCommon::SetReuseAddr(const int &fd, bool reuse)
{
int opt = 0;
if(reuse)
{
opt = 1;
}
else
{
opt = 0;
}
if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
{
return false;
}
return true;
}
為什麼要設定此項呢?參考 http://blog.csdn.net/liuhongxiangm/article/details/17301311
3、accept
LT模式比較清晰,就不說了
ET模式下accept存在的問題
考慮這種情況:多個連線同時到達,伺服器的TCP就緒佇列瞬間積累多個就緒連線,由於是邊緣觸發模式,epoll只會通知一次,accept只處理一個連線,導致TCP就緒佇列中剩下的連線都得不到處理。
解決辦法是用while迴圈抱住accept呼叫,處理完TCP就緒佇列中的所有連線後再退出迴圈。如何知道是否處理完就緒佇列中的所有連線呢?accept返回-1並且errno設定為EAGAIN就表示所有連線都處理完。
int ctcpserver::acceptterminal(Connection *pConn)
{
while(true)
{
int newsock = accept(pConn->sock,NULL, NULL);
if(newsock < 0)
{
if(errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
{
break;
}
return -1;
}
if( newsock > 0)
{
NetCommon::SetSockBlock(newsock,false);
NetCommon::SetReuseAddr(newsock, true);
if(m_mapConnTerminal.size() > 500)
{
close(newsock);
return 0;
}
Connection *newcon = new Connection;
newcon->sock = newsock;
newcon->wantsize = 2;
newcon->recvsize = 0;
time( &(newcon->tm));
if(m_epollTerminal.EpollAdd(newsock, EPOLLIN|EPOLLET, newcon) < 0)
{
delete newcon;
newcon = NULL;
close(newsock);
return -1;
}
m_mapConnTerminal.insert(make_pair<int, Connection*>(newsock,newcon));
}
}
return 0;
}
4、recv
Epoll ET模式 非阻塞socket,必須獨到沒有資料可讀,此處的設計是每個socket對應一個connection,connection中有一個數據包的長度wantsize,接收完一個數據包之後,會將此資料包放入佇列等待處理,迴圈接收下一個資料包
int ctcpserver::recvn(Connection *pConn)
{
int iret = 0;
//wantsize < 2048
while(pConn->recvsize < pConn->wantsize)
{
iret = recv(pConn->sock, pConn->buf+pConn->recvsize,pConn->wantsize-pConn->recvsize, 0);
if(iret == -1)
{
if(errno == EINTR)
{
break;
}
else if(errno == EWOULDBLOCK || errno == EAGAIN)
{
break;
}
else
{
return -1;
}
}
if(iret == 0)
{
return -1;
}
pConn->recvsize += iret;
}
time(&(pConn->tm));
return pConn->recvsize;
}
此函式是在epoll接到事件後呼叫,協議約定資料包前倆位元組是長度int ctcpserver::recvtrminal(Connection *pConn)
{
int iret = 0;
while(true)
{
iret = recvn(pConn);
if(iret < 0)
{
close(pConn->sock);
m_epollTerminal.EpollDel(pConn->sock,pConn);
m_mapConnTerminal.erase(pConn->sock);
delete pConn;
pConn = NULL;
return -1; //error
}
if(pConn->recvsize != pConn->wantsize) //no data recv
{
break;
}
else
{
if(pConn->wantsize == 2) //接收玩的是資料的長度資訊,去設定資料的接收大小
{
pConn->wantsize = *(short*)(pConn->buf);
if(pConn->wantsize > 2048 || pConn->wantsize < 2) //設計是不大於1500,此處為了相容其他測試
{
close(pConn->sock);
m_epollTerminal.EpollDel(pConn->sock,pConn);
m_mapConnTerminal.erase(pConn->sock);
delete pConn;
pConn = NULL;
return -1; //something error with data
}
}
else //接收完一個數據包
{
BufNode *pnode = m_bufQueMedia.AllocNode();
if(pnode != NULL )
{
memset( pnode->pBuf,0,pnode->nMaxLen);
pnode->nLen = pConn->recvsize;
memcpy(pnode->pBuf, pConn->buf,pConn->recvsize);
pnode->sock = pConn->sock;
m_bufQueMedia.PushNode(pnode); //media 執行緒負責處理資料
m_semMediaTask.Post();
}
pConn->recvsize = 0;
pConn->wantsize = 2;
}
}
}
return 0;
}
epoll 迴圈,epoll新增socket時,epoll_eveny關聯的是connection* ,在收到事件是,可以直接取來接收存放資料
int ctcpserver::terminalloop()
{
int event_count = 0;
while(true)
{
event_count = m_epollTerminal.EpollWait();
if(event_count < 0)
{
if(errno == EINTR)
{
continue;
}
return 0;
}
else if(event_count > 0)
{
m_mutexTerminal.Lock();
for(int index=0; index < event_count; index++)
{
Connection *pConn = (Connection*)(m_epollTerminal.GetEVPtr(index));
if(pConn->sock == m_terminalsock)
{
if(acceptterminal(pConn) < 0)
{
return 0;
}
}
else
{
if( m_epollTerminal.GetEVEvents(index)&EPOLLIN )
{
if(recvtrminal(pConn) < 0)
{
continue;
}
}
else if(m_epollTerminal.GetEVEvents(index)&EPOLLOUT)
{
}
}
}
m_mutexTerminal.UnLock();
}
}
return 0;
}
5、send
這個問題比較謹慎處理,EPOLLOUT 到底在什麼時候觸發??
ET模式稱為邊緣觸發模式,顧名思義,不到邊緣情況,是死都不會觸發的。
EPOLLOUT事件:
EPOLLOUT事件只有在連線時觸發一次,表示可寫,其他時候想要觸發,那你要先準備好下面條件:
1.某次write,寫滿了傳送緩衝區,返回錯誤碼為EAGAIN。
2.對端讀取了一些資料,又重新可寫了,此時會觸發EPOLLOUT。
簡單地說:EPOLLOUT事件只有在不可寫到可寫的轉變時刻,才會觸發一次,所以叫邊緣觸發,這叫法沒錯的!
其實,如果你真的想強制觸發一次,也是有辦法的,直接呼叫epoll_ctl重新設定一下event就可以了,event跟原來的設定一模一樣都行(但必須包含EPOLLOUT),關鍵是重新設定,就會馬上觸發一次EPOLLOUT事件。
EPOLLIN事件:
EPOLLIN事件則只有當對端有資料寫入時才會觸發,所以觸發一次後需要不斷讀取所有資料直到讀完EAGAIN為止。否則剩下的資料只有在下次對端有寫入時才能一起取出來了。
現在明白為什麼說epoll必須要求非同步socket了吧?如果同步socket,而且要求讀完所有資料,那麼最終就會在堵死在阻塞裡。
在此測試專案,用了LT模式,但如何避免一直觸發?
當有資料需要傳送時,新增EPOLLOUT監聽事件。當傳送完成後移除此事件
int ctcpserver::sendclient(Connection *pConn)
{
//
BufNode *pnode = pConn->bufque.FrontNode(); //
if(pnode != NULL)
{
int iret = send(pConn->sock, pnode->pBuf,pnode->nLen,0);
if(iret <= 0)
{
if(errno != EAGAIN || errno == EWOULDBLOCK || errno == EAGAIN)
{
return 0;
}
else
{
close(pConn->sock);
m_epollClient.EpollDel(pConn->sock,pConn);
m_mapConClient.erase(pConn->sock);
delete pConn;
pConn = NULL;
// cout << "m_mapConClient count:" << m_mapConClient.size() << endl;
return -1;
}
}
else
{
pConn->bufque.PopNode();
pConn->bufque.FreeNode(pnode);
}
}
if(pConn->bufque.GetUsedNodeCount() == 0)
{
m_epollClient.EpollMod(pConn->sock, EPOLLIN, pConn);//移除EPOLLOUT 監聽 }
return 0;
}
int ctcpserver::mediataskloop()
{
while (true)
{
m_semMediaTask.Wait();
BufNode *pnode = m_bufQueMedia.PopNode();
if(pnode == NULL)
{
continue;
}
//id-client
int id = *(int*)(pnode->pBuf+2);
m_mutexTermClient.Lock();
MAPTermClientIt itTermClient = m_mapTermClient.find(id);
if(itTermClient == m_mapTermClient.end())
{
SETSocket *pSetSock = new SETSocket;
pSetSock->clear();
m_mapTermClient.insert(make_pair<int, SETSocket*>(id,pSetSock));
}
else
{
SETSocket *pSetSock= itTermClient->second;
if(!pSetSock->empty())
{
SETSocketIt itSock;
m_mutexClient.Lock();
for(itSock=pSetSock->begin(); itSock != pSetSock->end(); ++ itSock)
{
int sock = *itSock;
MAPConnectionIt itConnClient = m_mapConClient.find(sock);
if(itConnClient != m_mapConClient.end())
{
Connection *pConn = itConnClient->second;
BufNode *pnodeClient = pConn->bufque.AllocNode();
if(pnodeClient == NULL)
{
continue;
}
memcpy(pnodeClient->pBuf, pnode->pBuf, pnode->nLen );
pnodeClient->nLen = pnode->nLen;
pConn->bufque.PushNode(pnodeClient);
m_epollClient.EpollMod(pConn->sock, EPOLLIN|EPOLLOUT, pConn); //新增EPOLLOUT 監聽
}
else
{
pSetSock->erase(itSock);
}
}
m_mutexClient.UnLock();
}
}
m_mutexTermClient.UnLock();
m_bufQueMedia.FreeNode(pnode);
}
return 0;
}
附上程式碼:http://download.csdn.net/detail/liuhongxiangm/6709063