1. 程式人生 > >srs程式碼學習(1)--listen建立過程

srs程式碼學習(1)--listen建立過程

srs的服務偵聽的建立過程。

以rtmp服務為例 srs服務偵聽的建立依靠從上到下的三個類。分別是

SrsServer  

SrsStreamListener  

SrsTcpListener

埠偵聽過程為

1)main函式中呼叫全域性變數_srs_server的  listen()函式

if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
        return ret;
    }
在SrsServer的listen()函式如下
<pre name="code" class="cpp">int SrsServer::listen()
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_api()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}


listen_rtmp()會建立一個rtmp的steamlistener
<pre name="code" class="cpp">int SrsServer::listen_rtmp()
{
    int ret = ERROR_SUCCESS;
    
    // stream service port.
    std::vector<std::string> ip_ports = _srs_config->get_listens();
    srs_assert((int)ip_ports.size() > 0);
    
    close_listeners(SrsListenerRtmpStream);
    
    for (int i = 0; i < (int)ip_ports.size(); i++) {
        SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);
        listeners.push_back(listener);
        
        std::string ip;
        int port;
        srs_parse_endpoint(ip_ports[i], ip, port);
        
        if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
            srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
            return ret;
        }
    }
    
    return ret;
}


這個streamlistener並不是真正的底層監聽層。只是一個業務封裝層。其類的繼承順序如下

在整個程式碼中,和其相識的類有SrsRtspListener  SrsHttpFlvListener兩個類。這種類的主要作用是在底層連結建立有。給不同的上層放回連結資訊。

程式碼走到這一步,偵聽的socket還麼米有建立起來,看SrsStreamListener的listen()函式

int SrsStreamListener::listen(string i, int p)
{
    int ret = ERROR_SUCCESS;
    
    ip = i;
    port = p;

    srs_freep(listener);
    listener = new SrsTcpListener(this, ip, port);

    if ((ret = listener->listen()) != ERROR_SUCCESS) {
        srs_error("tcp listen failed. ret=%d", ret);
        return ret;
    }
    
    srs_info("listen thread current_cid=%d, "
        "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
        _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);

    srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());

    return ret;
}
在這段程式碼裡面。建立了真正的底層監聽類
 listener = new SrsTcpListener(this, ip, port);
然後呼叫其listen()函式。程式碼如下
int SrsTcpListener::listen()
{
    int ret = ERROR_SUCCESS;
    
    if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        ret = ERROR_SOCKET_CREATE;
        srs_error("create linux socket error. port=%d, ret=%d", port, ret);
        return ret;
    }
    srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
    
    int reuse_socket = 1;
    if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
        ret = ERROR_SOCKET_SETREUSE;
        srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
        return ret;
    }
    srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);
    
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(ip.c_str());
    if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
        ret = ERROR_SOCKET_BIND;
        srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
        return ret;
    }
    srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
    
    if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
        ret = ERROR_SOCKET_LISTEN;
        srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
        return ret;
    }
    srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
    
    if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
        ret = ERROR_ST_OPEN_SOCKET;
        srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
        return ret;
    }
    srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
    
    if ((ret = pthread->start()) != ERROR_SUCCESS) {
        srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
        return ret;
    }
    srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
    
    return ret;
}
到此,rtmp的監聽埠就建立起來了。

在仔細看SrsTcpListener類。繼承關係如下



這個類中很有趣的有了個執行緒類SrsReusableThread* pthread,這個執行緒是什麼時候建立的?答案是在建構函式中

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p)
{
    handler = h;
    ip = i;
    port = p;

    _fd = -1;
    _stfd = NULL;

    pthread = new SrsReusableThread("tcp", this);
}
這個類的作用是什麼呢?首先我們發現,SrsTcpListener 這個類繼承了一個執行緒回撥介面ISrsReusableThreadHandler。這個說明可以擁有可以線上程中執行的能力

觀察執行緒類的結構



很奇怪,SrsReusableThread竟然也繼承了一個handler類。這就表示。這個類還是一個封裝類,並不是底層的真正的執行緒類。果然我們發現了其一個變數 internal::SrsThread* pthread,這個應該靠近點底層了吧。先放下不講。我們是來追這個執行緒類也我的業務類啥關係的。這時另外一個有意思的變數ISrsReusableThreadHandler* handler,還記得
SrsTcpListener 類麼,其有一個ISrsReusableThreadHandler的介面。通過這個介面,把執行緒和業務連結起來。看看其cycle()函式

int SrsReusableThread::cycle()
{
    return handler->cycle();
}
這個是執行緒這執行函式。它會呼叫上層類,具體到我們的例子裡,就是SrsTcpListener 的cycle().看看是什麼
int SrsTcpListener::cycle()
{
    int ret = ERROR_SUCCESS;
    
    st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);
    
    if(client_stfd == NULL){
        // ignore error.
        if (errno != EINTR) {
            srs_error("ignore accept thread stoppped for accept client error");
        }
        return ret;
    }
    srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));
    
    if ((ret = handler->on_tcp_client(client_stfd)) != ERROR_SUCCESS) {
        srs_warn("accept client error. ret=%d", ret);
        return ret;
    }
    
    return ret;
}
原來是accept函式。在有連結後直接上調給上層,具體來講就是SrsStreamListener類的on_tcp_client()函式
int SrsStreamListener::on_tcp_client(st_netfd_t stfd)
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {
        srs_warn("accept client error. ret=%d", ret);
        return ret;
    }

    return ret;
}
歐耶,這下呼叫到最上層去了。server裡,建立一個連結。

到現在還有兩個問題要搞明白:

1)srs的執行緒模型

2)如何管理各個連結。

這個下一次在摸索