1. 程式人生 > >Doubango RTP包傳輸使用UDT可靠傳輸協議,解決RTP丟包問題

Doubango RTP包傳輸使用UDT可靠傳輸協議,解決RTP丟包問題

使用SIP做過VOIP通話的同學,肯定被RTP丟包弄的焦頭爛額,必定嘗試過不少的辦法,比方:
1、丟包重傳(NACK)
2、前向糾錯(FEC)
3、丟幀處理
但效果往往不盡如人意,那有沒有一勞永逸的方法?確實,網路發展到現在,家庭頻寬隨隨便便都是百兆的今天,為什麼還有讓人困擾不已的丟包問題?為何不換成TCP傳輸,假如延時容許的情況下,UDT就是為了解決這個問題的。

UDT建立於UDP之上,並引入新的擁塞控制和資料可靠性控制機制。UDT是面向連線的雙向的應用層協議。它同時支援可靠的資料流傳輸和部分可靠的資料報傳輸,UDT的特點是不用進行開發,直接利用庫的傳送函式就可以實現可靠的資料傳輸。

Doubango rtp接收和傳送函式修改:

/* use transport udt*/
#include "udt_api.h"

void *udt_transport_mainthread(void *param)
{
    trtp_manager_t *self = param;


    if(!self){
        TSK_DEBUG_ERROR("udt_transport_mainthread Invalid parameter");
        return 0;
    }
    TSK_DEBUG_INFO("ii udt_transport_mainthread enter, self->is_originated:%d", self->is_originated);

    int rcv_buf = (int)tmedia_defaults_get_rtpbuff_size();
    int snd_buf = (int)tmedia_defaults_get_rtpbuff_size();

    int clientFD = -1;               
    int socketfDUDT = -1;         
    int clientSocket = -1; 
    if (self->is_originated){
        //first listen
        int ret = init_udt(&socketfDUDT, (self->rtp.public_port), 0);
        self->serverSocket = socketfDUDT;
        TSK_DEBUG_INFO("socketfDUDT:%d listent", socketfDUDT);

        set_connect_buff_udt(&socketfDUDT, rcv_buf, snd_buf);//set buff size
        
        listen_udt(&socketfDUDT);        
        clientFD = accept_udt(&socketfDUDT);
        if (clientFD == -1){
            TSK_DEBUG_ERROR("clientFd is failed");
            tsk_safeobj_lock(self);
            self->mainThreadId[0] = NULL;
            tsk_safeobj_unlock(self);//
[email protected]
add for crash. goto FAILED; } TSK_DEBUG_INFO("clientFD:%d enter", clientFD); if (!tmedia_defaults_get_single_connect()){ //after connect 2 user ret = init_udt(&clientSocket, (self->rtp.public_port + 3), 0); set_connect_buff_udt(&clientSocket, rcv_buf, snd_buf);//set buff size if (0 != ret || clientSocket == -1){ TSK_DEBUG_ERROR("clientSocket connect init is failed"); tsk_safeobj_lock(self); self->mainThreadId[0] = NULL; tsk_safeobj_unlock(self);//
[email protected]
add for crash. goto FAILED; } ret = connect_udt(&clientSocket, self->rtp.remote_ip, self->rtp.remote_port); if (-1 == ret){ TSK_DEBUG_ERROR("clientSocket connect is failed"); tsk_safeobj_lock(self); self->mainThreadId[0] = NULL; tsk_safeobj_unlock(self);//
[email protected]
add for crash. goto FAILED; } TSK_DEBUG_INFO("clientSocket:%d enter", clientSocket); self->clientSocket = clientSocket; } }else{ //first connect 2 user int ret = init_udt(&clientSocket, (self->rtp.public_port + 3), 0); if (0 != ret || clientSocket == -1){ TSK_DEBUG_ERROR("ii clientSocket connect init is failed"); tsk_safeobj_lock(self); self->mainThreadId[0] = NULL; tsk_safeobj_unlock(self);//[email protected] add for crash. goto FAILED; } set_connect_buff_udt(&clientSocket, rcv_buf, snd_buf);//set buff size ret = connect_udt(&clientSocket, self->rtp.remote_ip, self->rtp.remote_port); if (-1 == ret){ TSK_DEBUG_ERROR("clientSocket connect is failed"); tsk_safeobj_lock(self); self->mainThreadId[0] = NULL; tsk_safeobj_unlock(self);//[email protected] add for crash. goto FAILED; } self->clientSocket = clientSocket; TSK_DEBUG_INFO("ii clientSocket:%d enter", clientSocket); if (!tmedia_defaults_get_single_connect()){ //second listen ret = init_udt(&socketfDUDT, (self->rtp.public_port), 0); self->serverSocket = socketfDUDT; TSK_DEBUG_INFO("ii socketfDUDT:%d listent", socketfDUDT); set_connect_buff_udt(&socketfDUDT, rcv_buf, snd_buf);//set buff size listen_udt(&socketfDUDT); clientFD = accept_udt(&socketfDUDT); if (clientFD == -1){ TSK_DEBUG_ERROR("clientFd is failed"); tsk_safeobj_lock(self); self->mainThreadId[0] = NULL; tsk_safeobj_unlock(self);//[email protected] add for crash. goto FAILED; } TSK_DEBUG_INFO("ii clientFD:%d enter", clientFD); }else{ tsk_safeobj_lock(self); self->mainThreadId[0] = NULL; tsk_safeobj_unlock(self);//[email protected] add for crash. } } if (tmedia_defaults_get_single_connect()){ if (self->is_originated){ char buffer[4096]; while(self->is_started){ int received = recvdata_udt(&clientFD, buffer, sizeof(buffer), 0); if (received <= -1) { continue; } _trtp_manager_recv_data(self, buffer, received, -1, NULL); } } }else{ char buffer[4096]; while(self->is_started){ int received = recvdata_udt(&clientFD, buffer, sizeof(buffer), 0); if (received <= -1) { continue; } _trtp_manager_recv_data(self, buffer, received, -1, NULL); } } FAILED: if (-1 !=clientFD){ close_udt(clientFD); } if (-1 !=socketfDUDT){ close_udt(socketfDUDT); } TSK_DEBUG_INFO("udt_transport_mainthread end."); return 0; } int udt_transport_send(trtp_manager_t* self, char* encodedData, int encodedDataLen){ TSK_DEBUG_INFO("senddata_udt START:%d", encodedDataLen); int ret = senddata_udt(&self->clientSocket, encodedData, encodedDataLen, 0); if (-1 == ret){ TSK_DEBUG_ERROR("senddata_udt failed"); return -1; } TSK_DEBUG_INFO("senddata_udt ret:%d", ret); return ret; } /* add end */

啟動:在 int trtp_manager_start(trtp_manager_t* self)中:

	if(!self->transport || !self->transport->master){
              //use other transport --udt:
              if (tmedia_defaults_get_use_udt_socket() != 0){
                    //use other transport.
                    self->clientSocket = -1;
                    self->serverSocket = -1;
                    self->is_started = tsk_true;

                    if ((ret = tsk_thread_create(self->mainThreadId, udt_transport_mainthread, self))){ /* More important than "tsk_runnable_start" ==> start it first. */
                        TSK_DEBUG_FATAL("Failed to create udt main thread [%d]", ret);
                    }else{           
		         TSK_DEBUG_INFO("udt thread create success.");      	
                 
#if !TNET_UNDER_APPLE
                        ret = tsk_thread_set_priority(self->mainThreadId[0], TSK_THREAD_PRIORITY_TIME_CRITICAL);
#endif
                        tsk_safeobj_unlock(self);
   
                        return 0;
                    }
              }
              //add end.
              
		TSK_DEBUG_ERROR("RTP/RTCP manager not prepared");
		ret = -2;
		goto bail;
	}
    



UDT協議封裝:

int init_udt(int *usock, int port, bool rendezvous);
int connect_udt(void* usocket, char *peerip, int port);
int set_connect_buff_udt(void* usocket, int send_buf_size, int recv_buf_size);
int recvdata_udt(void* usocket, char* buf, int size, int flags);
int senddata_udt(void* usocket, char* buf, int size, int flags);
int listen_udt(void* usocket);
int accept_udt(void* usocket);
int close_udt(int usock);

實現: 

#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#include <iostream>
#include <udt.h>
#include <arpa/inet.h> 
#include <android/log.h>
#define TAG "UDT_API"

extern "C" int init_udt(int *usock, int port, bool rendezvous);
extern "C" int connect_udt(void* usocket, char *peerip, int port);
extern "C" int set_connect_buff_udt(void* usocket, int send_buf_size, int recv_buf_size);
extern "C" int recvdata_udt(void* usocket, char* buf, int size, int flags);
extern "C" int senddata_udt(void* usocket, char* buf, int size, int flags);
extern "C" int listen_udt(void* usocket);
extern "C" int accept_udt(void* usocket);
extern "C" int close_udt(int usock);

using namespace std;

const int g_IP_Version = AF_INET;
const int g_Socket_Type = SOCK_DGRAM;
const char g_Localhost[] = "127.0.0.1";
  
int init_udt(int *usock, int port, bool rendezvous){
	addrinfo hints;
	addrinfo* res;
	memset(&hints, 0, sizeof(struct addrinfo));
	hints.ai_flags = AI_PASSIVE;
	hints.ai_family = AF_INET;
	hints.ai_socktype = g_Socket_Type;

	if (usock == NULL){
		return -1;
	}

	char service[16];
	sprintf(service, "%d", port);

	if (0 != getaddrinfo(NULL, service, &hints, &res))
	{
		cout << "illegal port number or port is busy.\n" << endl;        	
             __android_log_print(ANDROID_LOG_WARN, TAG, "illegal port number or port is busy:%s", UDT::getlasterror().getErrorMessage() );

		return -1;
	}

	UDTSOCKET retFD = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);

	// since we will start a lot of connections, we set the buffer size to smaller value.
	int snd_buf = 16000;
	int rcv_buf = 16000;
	UDT::setsockopt(retFD, 0, UDT_SNDBUF, &snd_buf, sizeof(int));
	UDT::setsockopt(retFD, 0, UDT_RCVBUF, &rcv_buf, sizeof(int)); 
	//int fc = 16;
	//UDT::setsockopt(retFD, 0, UDT_FC, &fc, sizeof(int));
	bool reuse = true;
	UDT::setsockopt(retFD, 0, UDT_REUSEADDR, &reuse, sizeof(bool));
	UDT::setsockopt(retFD, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool));

	if (UDT::ERROR == UDT::bind(retFD, res->ai_addr, res->ai_addrlen))
	{
        	__android_log_print(ANDROID_LOG_WARN, TAG, "err, init failed:%s", UDT::getlasterror().getErrorMessage() );
		cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
		return -1;
	} 

	*usock = retFD;//set output socket fd.

	freeaddrinfo(res);
	return 0;
}
 
int connect_udt(void* usocket, char *peerip, int port){
	if (usocket == NULL || peerip == NULL){
		return -1;
	}
	UDTSOCKET usock = *(UDTSOCKET*)usocket;
	
	addrinfo hints, *peer;
	memset(&hints, 0, sizeof(struct addrinfo));
	hints.ai_flags = AI_PASSIVE;
	hints.ai_family =  g_IP_Version;
	hints.ai_socktype = g_Socket_Type;

	char buffer[16];
	sprintf(buffer, "%d", port);
 
	if (0 != getaddrinfo(peerip, buffer, &hints, &peer))
	{
		return -1;
	}

       int newsock = UDT::connect(usock, peer->ai_addr, peer->ai_addrlen);

	freeaddrinfo(peer);
	return newsock;
}

int recvdata_udt(void* usocket, char* buf, int size, int flags){	
	if (usocket == NULL){
		return -1;
	}
	UDTSOCKET recver = *(UDTSOCKET*)usocket;  
	int rs;  
	//if (UDT::ERROR == (rs = UDT::recv(recver, buf, size, 0)))
	if (UDT::ERROR == (rs = UDT::recvmsg(recver, buf, size)))
	{
		cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;
              if (UDT::getlasterror().getErrorCode() == 2001 || UDT::getlasterror().getErrorCode() == 5004){
                  return  -1;
              }
        	__android_log_print(ANDROID_LOG_WARN, TAG, "err, recvdata_udt failed:%s", UDT::getlasterror().getErrorMessage() );
		return -1;
	} 
    return rs;
}

int set_connect_buff_udt(void* usocket, int send_buf_size, int recv_buf_size){
	if (usocket == NULL){
		return -1;
	}
	UDTSOCKET sock = *(UDTSOCKET*)usocket;   
	UDT::setsockopt(sock, 0, UDT_SNDBUF, &send_buf_size, sizeof(int));
	UDT::setsockopt(sock, 0, UDT_RCVBUF, &recv_buf_size, sizeof(int)); 
	
	return 0;
}

int senddata_udt(void* usocket, char* buf, int size, int flags){
	   
    if (usocket == NULL){
	   return -1;
    }
	UDTSOCKET client = *(UDTSOCKET*)usocket;
	
	int sent = 0; 
    while (sent < size){
        //int ret = UDT::send(client, (((const char*)buf) + sent), (int)(size - sent), flags);
        int ret = UDT::sendmsg(client, (((const char*)buf) + sent), (int)(size - sent), -1, true);
        if (sent < 0)
        {        	
            __android_log_print(ANDROID_LOG_WARN, TAG, "err, senddata_udt failed:%s", UDT::getlasterror().getErrorMessage() );

            cout << "send: " << UDT::getlasterror().getErrorMessage() << endl;
            return -1;
        } 
        else{
            __android_log_print(ANDROID_LOG_WARN, TAG, " senddata_udt ret:%d, size:%d", ret, size);

            sent += ret;
        }
    }
	
    return sent;
}
  
int listen_udt(void* usocket){
    if (usocket == NULL){
	   return -1;
    }
	UDTSOCKET serv = *(UDTSOCKET*)usocket; 
	
	return UDT::listen(serv, 1024);
}

int accept_udt(void* usocket){
    if (usocket == NULL){
	   return -1;
    }
	UDTSOCKET serv = *(UDTSOCKET*)usocket; 
	sockaddr_in  clientaddr;
	int addrlen = sizeof(clientaddr);
	UDTSOCKET new_sock = UDT::accept(serv, (sockaddr*)&clientaddr, &addrlen); 

	if (new_sock == UDT::INVALID_SOCK)
	{
		return -1;
	}
	char ip[16];
	cout << "new connection: " << inet_ntoa(clientaddr.sin_addr) << ":" << ntohs(clientaddr.sin_port) << endl;

	
	return new_sock;
}

int close_udt(int usock){
	UDT::close(usock);
	return 0;
}