1. 程式人生 > >Darwin Streaming Server 支援UDP打洞

Darwin Streaming Server 支援UDP打洞

RTSP客戶端點播Darwin 視訊時,SDP協商後的客戶端埠可能是在NAT後面,所以需要Darwin支援NAT打洞的功能,從Darwin的原始碼看,官方的原始碼是不支援這個能力的。


通過抓取VLC客戶端的包發現,VLC在播放RTSP流時,兩次SETUP(音訊流和視訊分別協商埠)之後,會發送4個UDP打洞的包,但Darwin沒有接收這些包,並且根據這些包來源的埠修改遠端RTP和RTCP的埠。


文章Darwin Streaming Server 支援UDP穿透中給出了修改方法,嘗試之後,發現有兩個問題:

1、兩次SETUP協商後,Darwin給返回的伺服器的RTP和RTCP埠兩次都一樣

2、客戶端傳送的打洞的RTP和RTCP打洞的包,並沒有接收完全。


針對問題1的修改就是將兩次SETUP協商後,Darwin返回的埠不同並且唯一

問題2的修改方法是,在接收到SETUP協商後,開啟一個執行緒接收發送到伺服器的RTP和RTCP埠的包,並根據接收到的包的源埠更新遠端的RTP和RTCP埠,即使沒有收到打洞的包,不做任何處理,還是使用之前協商的埠往外發包。


第一個問題是將RTPStream::Setup方法中的:

fSockets = QTSServerInterface::GetServer()->GetSocketPool()->GetUDPSocketPair(sourceAddr, 0, fRemoteAddr, 
                                                                                        fRemoteRTCPPort);

修改為:

fSockets = QTSServerInterface::GetServer()->GetSocketPool()->CreateUDPSocketPair(sourceAddr, 0);


並將UDPSocketPool::CreateUDPSocketPair方法中兩個變數的初值修改為如下:


    UInt16 curPort = kLowestUDPPort + usedNum++;
    UInt16 stopPort = kHighestUDPPort -1; // prevent roll over when iterating over port nums
    UInt16 socketBPort = curPort + 1;



第二個問題修改,標頭檔案增加下面的方法和變數:

        void start_thread_for_nat();
        void setRemoteRTPPort(int value){fRemoteRTPPort = value; }
        void setRemoteRTCPPort(int value){fRemoteRTCPPort = value;}
        UInt16    getRemoteRTPPort(){return  fRemoteRTPPort;}
        UInt32    getRemoteRTPAddr(){return  fRemoteAddr;}
        Bool16    getQuitValue(){return  bQuit;}
        Bool16    getRunningValue(){return  bRunning;}
        void    setRunningValue(Bool16 value){ bRunning = value;}
        UInt16    getRemoteRTCPPort(){return  fRemoteRTCPPort;}
        UDPSocketPair*  getUDPSocketPair(){ return fSockets;}

        Bool16      bQuit;
        Bool16      bRunning;


在setup方法最後啟動一個監聽執行緒: 

    this->start_thread_for_nat();
    
    //errors should only be returned if there is a routing problem, there should be none
    Assert(err == QTSS_NoErr);
    return QTSS_NoErr;
}

實現:

#include <pthread.h>
void* thread_for_nat(void *parms){
    Bool16      fUpdateRtpPort = false;
    Bool16      fUpdateRtcpPort = false;
    SInt64                  currentTime = OS::Milliseconds();
    RTPStream *pRTPStream = (RTPStream *)parms;

    if (pRTPStream == NULL){
        return NULL;
    }
    if (pRTPStream->getUDPSocketPair() == NULL){
        return NULL;
    }

    if (pRTPStream->getUDPSocketPair()->GetSocketA() == NULL ||
        pRTPStream->getUDPSocketPair()->GetSocketB() == NULL){
        return NULL;
    }
    
    qtss_printf("thread_for_nat enter, pRTPStream:%08x, rtp_port:%d\n", pRTPStream, pRTPStream->getRemoteRTPPort());
    pRTPStream->setRunningValue(true);
    while(1){
        UInt32 iRemoteAddr = 0;
        UInt16 iRemotePort = 0;
        char szBuff[64];
        UInt32 iBufLen = sizeof(szBuff);
        UInt32 iRecvLen = 0;

        if (pRTPStream->getQuitValue()){
            break;
        }
        if (!fUpdateRtpPort){
            OS_Error iRet = pRTPStream->getUDPSocketPair()->GetSocketA()->RecvFrom(&iRemoteAddr, &iRemotePort, szBuff, iBufLen, &iRecvLen);
            if (OS_NoErr == iRet){
                if (iRemoteAddr == pRTPStream->getRemoteRTPAddr()
                    && iRemotePort != pRTPStream->getRemoteRTPPort()
                    && iRecvLen > 0){
                    qtss_printf("thread_for_nat update GetSocketA iRet:%d, fRemoteRTPPort:%d, fRemoteRTCPPort:%d, iRemotePort:%d\n", iRet, pRTPStream->getRemoteRTPPort(), 
                            pRTPStream->getRemoteRTCPPort(), iRemotePort);
                    pRTPStream->setRemoteRTPPort(iRemotePort);
                    fUpdateRtpPort = true;
                }else{
                    qtss_printf("thread_for_nat update GetSocketA received the same port value, fRemoteRTPPort:%d, fRemoteRTCPPort:%d\n", pRTPStream->getRemoteRTPPort(), 
                            pRTPStream->getRemoteRTCPPort());
                    fUpdateRtpPort = true;
                }
            }else{
                //qtss_printf("Setup update GetSocketA iRet:%d, fRemoteRTPPort:%d, fRemoteRTCPPort:%d\n", iRet, fRemoteRTPPort, fRemoteRTCPPort);
            }

        }
        
        if (!fUpdateRtcpPort){
        OS_Error iRet = pRTPStream->getUDPSocketPair()->GetSocketB()->RecvFrom(&iRemoteAddr, &iRemotePort, szBuff, iBufLen, &iRecvLen);
            if (OS_NoErr == iRet){
                if (iRemoteAddr == pRTPStream->getRemoteRTPAddr()
                    && iRemotePort != pRTPStream->getRemoteRTCPPort()
                    && iRecvLen > 0){
                    qtss_printf("thread_for_nat update GetSocketB iRet:%d, fRemoteRTPPort:%d, fRemoteRTCPPort:%d, iRemotePort:%d\n", iRet, pRTPStream->getRemoteRTPPort(), 
                            pRTPStream->getRemoteRTCPPort(), iRemotePort);
                    pRTPStream->setRemoteRTCPPort(iRemotePort);
                    fUpdateRtcpPort = true;
                } else{
                    qtss_printf("thread_for_nat update GetSocketB received the same port value, fRemoteRTPPort:%d, fRemoteRTCPPort:%d\n", pRTPStream->getRemoteRTPPort(), 
                            pRTPStream->getRemoteRTCPPort());
                    fUpdateRtcpPort = true;
                }
            }else{
                //wait.
                //qtss_printf("Setup update GetSocketB iRet:%d, fRemoteRTPPort:%d, fRemoteRTCPPort:%d\n", iRet, fRemoteRTPPort, fRemoteRTCPPort);
            }

        }

        if (fUpdateRtcpPort && fUpdateRtpPort){
            qtss_printf("thread_for_nat exit for update end, break, pRTPStream:%08x, rtp_port:%d\n", pRTPStream, pRTPStream->getRemoteRTPPort());
            break;
        }
        if ( (OS::Milliseconds() - currentTime ) > 2000){
            qtss_printf("thread_for_nat exaust 2000 ms, break, pRTPStream:%08x, rtp_port:%d\n", pRTPStream, pRTPStream->getRemoteRTPPort());
            break;
        }
    }

    pRTPStream->setRunningValue(false);
    gid_thread = -1;
    return NULL;
}

void RTPStream::start_thread_for_nat(){

    int ret=pthread_create(&gid_thread, NULL, thread_for_nat, (void*)this);
    if (ret != 0){
        qtss_printf("err:%d\n", ret);
    }else{

        qtss_printf("start_thread_for_nat create OK\n");
    }
}