SRS(simple-rtmp-server)流媒體伺服器原始碼分析--系統啟動
SRS(simple-rtmp-server)流媒體伺服器原始碼分析--系統啟動
一、前言
小卒最近看SRS原始碼,隨手寫下部落格,其一為了整理思路,其二也是為日後翻看方便。如果不足之處,請指教!
首先總結一下SRS原始碼的優點:
1、輕量級,程式碼結構清楚,目前SRS3.0程式碼8萬行左右,但幾乎滿足直播業務的所有要求。
2、SRS採用State Threads,支援高併發量,高效能。
3、SRS支援rtmp和hls,滿足PC和移動直播要求。
4、SRS支援叢集部署。小叢集Forward,大叢集edge。
程式碼分析可分為兩個階段:
一:分析程式碼框架,理清楚組織流程
二:分析程式碼細節,熟悉SRS工作原理
二、程式碼分析
相關SRS原始碼其他總結:
SRS(simple-rtmp-server)流媒體伺服器原始碼分析--系統啟動
SRS(simple-rtmp-server)流媒體伺服器原始碼分析--RTMP訊息play
SRS(simple-rtmp-server)流媒體伺服器原始碼分析--RTMP資訊Publish
SRS(simple-rtmp-server)流媒體伺服器原始碼分析--HLS切片
現階段,我主要以程式碼框架梳理為主。Srs原始碼框架如下圖:
- int run_master()
- {
- int ret = ERROR_SUCCESS;
- if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {
- return ret;
- }
- //將pid程序號寫進檔案
- if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {
- return ret;
- }
- //客戶端監聽
- if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->http_handle()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {
- return ret;
- }
- return 0;
- }
進入客戶監聽
監聽內容: 不同的連線請求,有不同的監聽 。
- if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
- return ret;
- }
- int SrsServer::listen()
- {
- int ret = ERROR_SUCCESS;
- // 建立一個rtmp的Streamlistener
- if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = listen_http_api()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
- return ret;
- }
- if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
- return ret;
- }
- return ret;
- }
1、首先分析RTMP連線
-
int SrsServer::listen_rtmp()
-
{
-
int ret = ERROR_SUCCESS;
-
-
// stream service port.
-
std::vector
<std::string> ip_ports = _srs_config->get_listens();
-
srs_assert((int)ip_ports.size() > 0);
-
-
close_listeners(SrsListenerRtmpStream);
-
-
for (int i = 0; i
< (int)ip_ports.size(); i++) {
-
SrsListener*
listener =
new
SrsStreamListener(
this,
SrsListenerRtmpStream);
-
listeners.push_back(
listener);
-
-
std::string
ip;
-
int
port;
-
srs_parse_endpoint(
ip_ports[
i],
ip,
port);
-
-
if ((
ret =
listener->listen(ip, port)) != ERROR_SUCCESS) {
-
srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
-
return ret;
-
}
-
}
-
-
return ret;
-
}
這裡是listen_rtmp()函式,你也可以去看看listen_http_api()函式、listen_http_stream()函式,其實結構都很相似,只是在建立SrsStreamListener物件時,傳入了不同的引數SrsListenerRtmpStream、SrsListenerHttpApi、SrsListenerHttpStream,代表了不同型別的監聽物件。
注意,這裡有大量純虛擬函式,不要走錯路了。進入TCP監聽程式碼
- // listen_rtmp 中listen監聽走這裡了。
- int SrsStreamListener::listen(string i, int p)
- {
- int ret = ERROR_SUCCESS;
- ip = i;
- port = p;
- srs_freep(listener);
- listener = new SrsTcpListener(this, ip, port);
- if ((ret = listener->listen()) != ERROR_SUCCESS) {
- srs_error("tcp listen failed. ret=%d", ret);
- return ret;
- }
- srs_info("listen thread current_cid=%d, "
- "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
- _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
- srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
- return ret;
- }
此程式碼為C++ TCP Socket程式碼,思路比較清晰,可以看到,每接受到一個rtmp訪問請求,建立一個”執行緒“,這裡暫時將其稱為執行緒,後面再做具體介紹。建立執行緒程式碼如下:
- // rtmp tcp監聽
- int SrsTcpListener::listen()
- {
- //C++ Socket程式設計
- int ret = ERROR_SUCCESS;
- // 1、建立套接字,流式Socket(SOCK_STREAM)
- if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
- ret = ERROR_SOCKET_CREATE;
- srs_error("create linux socket error. port=%d, ret=%d", port, ret);
- return ret;
- }
- srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
- int reuse_socket = 1;
- if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
- ret = ERROR_SOCKET_SETREUSE;
- srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
- return ret;
- }
- srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);
- sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = inet_addr(ip.c_str());
- // 2、繫結套接字到一個IP地址和一個埠上
- if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
- ret = ERROR_SOCKET_BIND;
- srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- // 3、將套接字設定為監聽模式等待連線請求
- if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
- ret = ERROR_SOCKET_LISTEN;
- srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
- ret = ERROR_ST_OPEN_SOCKET;
- srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
- // 4、等到連線一個客戶之後,開啟一個新的執行緒
- if ((ret = pthread->start()) != ERROR_SUCCESS) {
- srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
- return ret;
- }
- srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
- return ret;
- }
- int SrsReusableThread::start()
- {
- return pthread->start();
- }
-
int SrsThread::start()
-
{
-
int ret = ERROR_SUCCESS;
-
-
if(tid) {
-
srs_info(
"thread %s already running.", _name);
-
return ret;
-
}
-
if((tid = st_thread_create(thread_fun,
this, (_joinable?
1:
0),
0)) ==
NULL){
-
ret = ERROR_ST_CREATE_CYCLE_THREAD;
-
srs_error(
"st_thread_create failed. ret=%d", ret);
-
return ret;
-
}
-
disposed =
false;
-
// we set to loop to true for thread to run.
-
loop =
true;
-
// wait for cid to ready, for parent thread to get the cid.
-
while (_cid <
0) {
-
st_usleep(
10 *
1000);
-
}
-
// now, cycle thread can run.
-
can_run =
true;
-
return ret;
-
}
來到了st_thread_create,這裡要注意,這是SRS開源專案具有高併發,高效能的重要一步。這裡建立的是協程,不是執行緒。協程是有別於程序和執行緒的一種元件,具有程序的獨立性和執行緒的輕量級,聽說微信能夠支援8億使用者量,也是採用協程這種網路服務框架:http://www.infoq.com/cn/articles/CplusStyleCorourtine-At-Wechat。
從這裡可以看出,srs是一個單執行緒的伺服器,採用協程,主持高併發,高效能。
建立協程,協程函式為:thread_fun()
-
// 每連連結一個使用者,建立一個協程程,該函式為協程函式
-
void* SrsThread::thread_fun(void* arg)
-
{
-
SrsThread* obj = (SrsThread*)arg;
-
srs_assert(obj);
-
// 進入執行緒迴圈
-
obj->thread_cycle();
-
-
// for valgrind to detect.
-
SrsThreadContext* ctx = dynamic_cast
<SrsThreadContext*>(_srs_context);
-
if (ctx) {
-
ctx->clear_cid();
-
}
-
-
st_thread_exit(NULL);
-
-
return NULL;
-
}
此時,真正進入了協程迴圈處理
- void SrsThread::thread_cycle()
- {
- int ret = ERROR_SUCCESS;
- _srs_context->generate_id();
- srs_info("thread %s cycle start", _name);
- _cid = _srs_context->get_id();
- srs_assert(handler);
- handler->on_thread_start();
- // thread is running now.
- really_terminated = false;
- // wait for cid to ready, for parent thread to get the cid.
- while (!can_run && loop) {
- st_usleep(10 * 1000);
- }
- while (loop) {
- if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
- srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
- goto failed;
- }
- srs_info("thread %s on before cycle success", _name); <