1. 程式人生 > >【轉】Darwin Streaming Server 核心程式碼分析

【轉】Darwin Streaming Server 核心程式碼分析

基本概念

首先,我針對的程式碼是Darwin StreamingServer 6.0.3未經任何改動的版本。

DarwinStreaming Server從設計模式上看,採用了Reactor的併發伺服器設計模式,如果對Reactor有一定的瞭解會有助於對DarwinStreaming Server核心程式碼的理解。

Reactor模式是典型的事件觸發模式,當有事件發生時則完成相應的Task,Task的完成是通過呼叫相應的handle來實現的,對於handle的呼叫是由有限個數的Thread來完成的。

DarwinStreaming Server中定義了一個Task類。Task類有兩個典型的方法,一個是Signal,一個是Run。呼叫Signal把一個Task加入到TaskThread的Task佇列中,等待完成,Run就是完成一個任務的handle。基於Task類,定義了三種類型的Task,分別是IdleTask,TimeoutTask,以及普通的Task。

在Darwin StreamingServer中,除了主執行緒以外,有三種類型的執行緒,分別是TaskThread,EventThread以及IdleTaskThread:

1.    TaskThread,TaskThread通過執行Task型別物件的Run方法來完成相應Task的處理。典型的Task型別是RTSPSession和RTPSession。TaskThread的個數是可配置的,預設情況下TaskThread的個數與處理器的個數一致。等待被TaskThread呼叫並執行的Task放在佇列或者堆中。

2.    EventThread,EventThread負責偵聽套介面事件,在DarwinStreaming Server中,有兩種被偵聽的事件,分別是建立RTSP連線請求的到達和RTSP請求的到達。對於RTSP連線請求的事件,EventThread建立一個RTSPSession,並啟動針對相應的socket的偵聽。對於RTSP請求的事件,EventThread把對應的RTSPSession型別的Task加入到TaskThread的佇列中,等待RTSP請求被處理。

3.    IdleTaskThread,IdleTaskThread管理IdleTask型別物件的佇列,根據預先設定的定時器觸發IdleTask的排程。TCPListenerSocket就是一個IdleTask的派生類,當併發連線數達到設定的最大值時,會把派生自TCPListenerSocket的RTSPListenerSocket加入到IdleTaskThread管理的IdleTask佇列中,暫時停止對RTSP埠的偵聽,直到被設定好的定時器觸發。

核心架構

下圖是DarwinStreaming Server核心架構的示意圖。在這個示意圖中有三種類型的要素,分別是執行緒,Task佇列或者堆,被偵聽的事件。圖中的文字都是從原始碼中copy出來的,便於讀者通過查詢與原始碼對應起來。

 【圖片】

圖中給出了三個執行緒,分別是TaskThread::Entry,EventThread::Entry以及IdleTaskThread::Entry。前文已經對這三種執行緒進行了概要描述。

除了三個執行緒,圖中還有另外五個矩形塊。與TaskThread::Entry執行緒相關聯的有兩個,分別是TaskThread::fTaskQueue佇列和TaskThread::fHeap堆,通過呼叫Signal被排程等待完成的Task就放在佇列或者堆中。與IdleTaskThread::Entry執行緒相關聯的有一個,是IdleTaskThread::IdleHeap堆。與EventThread::Entry相關聯的是EventContext::fEventReq,是被偵聽的埠。還有一個是TimeoutTaskThread::fQueue佇列,它事實上是通過TimeoutTask與TaskThread::Entry相關聯。

圖中指向執行緒的連線線表明從佇列或者堆中取出Task,而對於EventThread::Entry執行緒來說,則是被偵聽事件的發生。指向被偵聽的埠的連線線表明把埠加入偵聽,指向Task的佇列或堆的連線線,表明把Task加入到佇列或者堆中。連線線的文字給出的是相應的函式呼叫,可以直接在原始碼中搜索到。

EventThread

系統啟動的時候呼叫QTSServer::StartTasks()把RTSP服務埠加入到偵聽佇列中。此時便開始接收客戶端的RTSP連線請求了。

在EventThread::Entry中呼叫select_waitevent函式等待事件的發生,當有事件發生的時候,就通過呼叫ProcessEvent方法對事件進行相應的處理。注意ProcessEvent是一個虛擬函式,共有兩個實現。EventContext類中實現了ProcessEvent方法,EventContext的派生類TCPListenerSocket中也實現了ProcessEvent方法。

對於建立RTSP連線的請求,呼叫TCPListenerSocket::ProcessEvent方法來處理,此方法呼叫RTSPListenerSocket的GetSessionTask方法建立一個RTSPSession,並把相應的套介面加入偵聽佇列,等待RTSP請求。然後還需呼叫this->RequestEvent(EV_RE)把建立RTSP連線的請求加入到偵聽佇列。

對於RTSP連線上的RTSP請求事件,呼叫的是EventContext::ProcessEvent方法,通過Task的Signal把對應的RTSPSession型別的Task加入到TaskThread::fTaskQueue中等待TaskThread處理。

TaskThread與Task

TaskThread::Entry呼叫TaskThread::WaitForTask()方法獲得下一個需要處理的Task。TaskThread::WaitForTask()首先從TaskThread::fHeap中獲得Task,如果TaskThread::fHeap中沒有滿足條件的Task,則從TaskThread::fTaskQueue中獲得Task。

TaskThread::Entry呼叫Task::Run方法來完成對應的Task,Task::Run方法的返回值型別是SInt64,也即signedlong long int型別。TaskThread::Entry根據Task::Run方法的返回值進行不同的處理。對於小於0的返回值,需delete這個Task;對於大於0的返回值,返回值代表了下次處理這個Task需等待的時間,TaskThread::Entry呼叫fHeap.Insert(&theTask->fTimerHeapElem)把Task插入到堆裡,並設定等待時間。對於等於0的返回值,TaskThread::Entry不再理會該Task。

TimeoutTask

從程式碼中看,TimeoutTaskThread是IdleTask的派生類,分析後發現從TimeoutTaskThread與IdleTask沒有任何關係,完全可以從Task派生,修改程式碼後驗證了這個想法。因此TimeoutTaskThread就是一個普通的Task,TimeoutTaskThread通過其Run方法監控一組超時任務,具體的比如RTSP協議或者RTP協議超時。

在系統啟動的時候TimeoutTaskThread被加入到TaskThread的佇列中,這是通過在StartServer函式中呼叫TimeoutTask::Initialize()來實現的。TimeoutTaskThread::Run函式的返回值是intervalMilli= kIntervalSeconds * 1000,也就是一個正數,於是TimeoutTaskThread這個Task會加入到TaskThread::fHeap中被週期性的呼叫。

TimeoutTaskThread::Run方法發現有超時的任務,則通過Signal方法排程這個Task,event為Task::kTimeoutEvent。被管理的這組任務,要有RefreshTimeout的機制。

一次點播請求的處理

為了更好的理解DarwinStreaming Server的架構,我們從客戶端發起點播,觸發伺服器的建立RTSP連線事件的發生開始,看看DSS的工作流程是什麼樣的。

針對RTSP協議,DarwingStreaming Server在554埠上偵聽,當有連線請求到達時,通過accept呼叫返回一個socket,對應的後續RTSP請求都是通過這個socket來傳送的。我們把RTSP相關的事件分成兩類,一類是RTSP連線請求,一類是RTSP請求。先來看RTSP連線請求的過程:

1.    RTSP連線請求到達後,被select_waitevent函式捕獲,程式碼在EventContext.cpp的EventThread::Entry中232行。

2.    查詢EventThread::fRefTable,獲取對應的EventContext。得到的是EventContext型別的派生類RTSPListenerSocket。相應的程式碼在EventContext.cpp中的249到253行。

3.    呼叫ProcessEvent,處理事件。相應的程式碼在EventContext.cpp中的257行。注意,由於對應的EventContext類其實是RTSPListenerSocket,因此呼叫的應該是TCPListenerSocket::ProcessEvent。

4.    在TCPListenerSocket.cpp的106行TCPListenerSocket::ProcessEvent方法中,呼叫accept得到socket,在160行呼叫了GetSessionTask方法,對應的是RTSPListenerSocket::GetSessionTask,在QTSServer.cpp中定義。

5.    在RTSPListenerSocket::GetSessionTask方法中,QTSServer.cpp的1077行,呼叫NEWRTSPSession建立了一個新的RTSPSession。

6.    回到TCPListenerSocket.cpp檔案中的TCPListenerSocket::ProcessEvent方法,注意189行,把剛剛建立好的RTSP連線加入到偵聽佇列,等待RTSP請求的到來。

RTSP請求的處理流程步驟如下,注意前面第一步是一樣的:

1.    RTSP連線請求到達後,被select_waitevent函式捕獲,程式碼在EventContext.cpp的EventThread::Entry中232行。

2.    查詢EventThread::fRefTable,獲取對應的EventContext。得到的是EventContext類。相應的程式碼在EventContext.cpp中的249到253行。

3.    呼叫ProcessEvent,處理事件。相應的程式碼在EventContext.cpp中的257行。注意,此時呼叫的是EventContext::ProcessEvent。

4.    EventContext::ProcessEvent方法在EventContext.h中實現,在127行。在138行呼叫了fTask->Signal(Task::kReadEvent),fTask就是RTSPSession類。把RTSPSession加入到TaskThread的佇列等待RTSPSession::Run()被呼叫。

5.    後續就是RTSPSession::Run()對RTSP請求的具體的處理。

(全文完)