live555 RTP資料讀取流程
Source:資料來源、獲取資料資料傳遞功能
Sink:消耗資料,傳送資料
Source->Sink:Source獲取網路資料,Sink消耗資料寫入檔案
注意:live555兩個迴圈一個是傳送RTP資料,一個就是讀取RTP即 SingleStep();
- void BasicTaskScheduler0::doEventLoop(char* watchVariable)
- {
- // Repeatedly loop, handling readble sockets and timed events:
- while (1)
- {
- if (watchVariable != NULL && *watchVariable != 0)
- SingleStep();
- }
- }
從這裡可知,live555在客戶端處理資料實際上是單執行緒的程式,不斷執行SingleStep()函式中的程式碼。通過檢視該函式程式碼裡,下面一句程式碼為重點
- (*handler->handlerProc)(handler->clientData, resultConditionSet);
其中該條程式碼出現了兩次,通過除錯跟蹤它的執行軌跡,第一次出現呼叫的函式是為了處理和RTSP伺服器的通訊協議的商定,而第二次出現呼叫的函式才是處理真正的視訊和音訊資料。對於RTSP通訊協議的分析我們暫且不討論,而直接進入第二次呼叫該函式的部分。
在我們的除錯過程中在執行到上面的函式時就直接呼叫到livemedia目錄下的如下函式
- void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int/*mask*/)
- {
- source->networkReadHandler1();
- }
//下面這個函式實現的主要功能就是從socket端讀取資料並存儲資料
- void MultiFramedRTPSource::networkReadHandler1()
- {
- BufferedPacket* bPacket = fPacketReadInProgress;
- if (bPacket == NULL)
- {
- // Normal case: Get a free BufferedPacket descriptor to hold the new network packet:
- //分配一塊新的儲存空間來儲存從socket端讀取的資料
- bPacket = fReorderingBuffer->getFreePacket(this);
- }
- // Read the network packet, and perform sanity checks on the RTP header:
- Boolean readSuccess = False;
- do
- {
- Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;
- //fillInData()函式封裝了從socket端獲取資料的過程,到此函式執行完已經將資料儲存到了bPacket物件中
- if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete))
- {
- if (bPacket->bytesAvailable() == 0)
- {
- envir() << "MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase \"MAX_PACKET_SIZE\"\n";
- }
- break;
- }
- if (packetReadWasIncomplete)
- {
- // We need additional read(s) before we can process the incoming packet:
- fPacketReadInProgress = bPacket;
- return;
- } else
- {
- fPacketReadInProgress = NULL;
- }
- //省略關於RTP包的處理
- ...
- ...
- ...
- //fReorderingBuffer為MultiFramedRTPSource類中的物件,該物件建立了一個儲存Packet資料包物件的連結串列
- //下面的storePacket()函式即將上面獲取的資料包儲存在連結串列中
- if (!fReorderingBuffer->storePacket(bPacket)) break;
- readSuccess = True;
- } while (0);
- if (!readSuccess) fReorderingBuffer->freePacket(bPacket);
- doGetNextFrame1();
- // If we didn't get proper data this time, we'll get another chance
- }
//下面的這個函式則實現從上面函式中介紹的儲存資料包連結串列的物件(即fReorderingBuffer)中取出資料包並呼叫相應函式使用它
//程式碼1.1
- void MultiFramedRTPSource::doGetNextFrame1()
- {
- while (fNeedDelivery)
- {
- // If we already have packet data available, then deliver it now.
- Boolean packetLossPrecededThis;
- //從fReorderingBuffer物件中取出一個數據包
- BufferedPacket* nextPacket
- = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);
- if (nextPacket == NULL) break;
- fNeedDelivery = False;
- if (nextPacket->useCount() == 0)
- {
- // Before using the packet, check whether it has a special header
- // that needs to be processed:
- unsigned specialHeaderSize;
- if (!processSpecialHeader(nextPacket, specialHeaderSize))
- {
- // Something's wrong with the header; reject the packet:
- fReorderingBuffer->releaseUsedPacket(nextPacket);
- fNeedDelivery = True;
- break;
- }
- nextPacket->skip(specialHeaderSize);
- }
- // Check whether we're part of a multi-packet frame, and whether
- // there was packet loss that would render this packet unusable:
- if (fCurrentPacketBeginsFrame)
- {
- if (packetLossPrecededThis || fPacketLossInFragmentedFrame)
- {
- // We didn't get all of the previous frame.
- // Forget any data that we used from it:
- fTo = fSavedTo; fMaxSize = fSavedMaxSize;
- fFrameSize = 0;
- }
- fPacketLossInFragmentedFrame = False;
- } elseif (packetLossPrecededThis)
- {
- // We're in a multi-packet frame, with preceding packet loss
- fPacketLossInFragmentedFrame = True;
- }
- if (fPacketLossInFragmentedFrame)
- {
- // This packet is unusable; reject it:
- fReorderingBuffer->releaseUsedPacket(nextPacket);
- fNeedDelivery = True;
- break;
- }
- // The packet is usable. Deliver all or part of it to our caller:
- unsigned frameSize;
- //將上面取出的資料包拷貝到fTo指標所指向的地址
- nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
- fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
- fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
- fCurPacketMarkerBit);
- fFrameSize += frameSize;
- if (!nextPacket->hasUsableData())
- {
- // We're completely done with this packet now
- fReorderingBuffer->releaseUsedPacket(nextPacket);
- }
- if (fCurrentPacketCompletesFrame) //如果完整的取出了一幀資料,則可呼叫需要該幀資料的函式去處理它
- {
- // We have all the data that the client wants.
- if (fNumTruncatedBytes > 0)
- {
- envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("
- << fSavedMaxSize << "). "
- << fNumTruncatedBytes << " bytes of trailing data will be dropped!\n";
- }
- // Call our own 'after getting' function, so that the downstream object can consume the data:
- if (fReorderingBuffer->isEmpty())
- {
- // Common case optimization: There are no more queued incoming packets, so this code will not get
- // executed again without having first returned to the event loop. Call our 'after getting' function
- // directly, because there's no risk of a long chain of recursion (and thus stack overflow):
- afterGetting(this); //呼叫函式去處理取出的資料幀
- } else
- {
- // Special case: Call our 'after getting' function via the event loop.
- nextTask() = envir().taskScheduler().scheduleDelayedTask(0,
- (TaskFunc*)FramedSource::afterGetting, this);
- }
- }
- else
- {
- // This packet contained fragmented data, and does not complete
- // the data that the client wants. Keep getting data:
- fTo += frameSize; fMaxSize -= frameSize;
- fNeedDelivery = True;
- }
- }
- }
//下面這個函式即開始呼叫執行需要該幀資料的函式
- void FramedSource::afterGetting(FramedSource* source)
- {
- source->fIsCurrentlyAwaitingData = False;
- // indicates that we can be read again
- // Note that this needs to be done here, in case the "fAfterFunc"
- // called below tries to read another frame (which it usually will)
- if (source->fAfterGettingFunc != NULL)
- {
- (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
- source->fFrameSize, source->fNumTruncatedBytes,
- source->fPresentationTime,
- source->fDurationInMicroseconds);
- }
- }
上面的fAfterGettingFunc為我們自己註冊的函式,如果執行的是testProgs中的openRTSP例項,則該函式指向下列程式碼中通過呼叫getNextFrame()註冊的afterGettingFrame()函式
- Boolean FileSink::continuePlaying()
- {
- if (fSource == NULL) return False;
- fSource->getNextFrame(fBuffer, fBufferSize,
- afterGettingFrame, this,
- onSourceClosure, this);
- return True;
- }
如果執行的是testProgs中的testRTSPClient中的例項,則該函式指向這裡註冊的afterGettingFrame()函式
- Boolean DummySink::continuePlaying()
- {
- if (fSource == NULL) return False; // sanity check (should not happen)
- // Request the next frame of data from our input source. "afterGettingFrame()" will get called later, when it arrives:
- fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,
- afterGettingFrame, this,
- onSourceClosure, this);
- return True;
- }
從上面的程式碼中可以看到getNextFrame()函式的第一個引數為分別在各自類中定義的buffer,我們繼續以openRTSP為執行程式來分析,fBuffer為FileSink類裡定義的指標:unsigned char* fBuffer;
這裡我們先繞一個彎,看看getNextFrame()函式裡做了什麼
- void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
- afterGettingFunc* afterGettingFunc,
- void* afterGettingClientData,
- onCloseFunc* onCloseFunc,
- void* onCloseClientData)
- {
- // Make sure we're not already being read:
- if (fIsCurrentlyAwaitingData)
- {
- envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!\n";
- envir().internalError();
- }
- fTo = to;
- fMaxSize = maxSize;
- fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()
- fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
- fAfterGettingFunc = afterGettingFunc;
- fAfterGettingClientData = afterGettingClientData;
- fOnCloseFunc = onCloseFunc;
- fOnCloseClientData = onCloseClientData;
- fIsCurrentlyAwaitingData = True;
- doGetNextFrame();
- }
從程式碼可以知道上面getNextFrame()中傳入的第一個引數fBuffer指向了指標fTo,而我們在前面分析程式碼1.1中的void MultiFramedRTPSource::doGetNextFrame1()函式中有下面一段程式碼:
- //將上面取出的資料包拷貝到fTo指標所指向的地址
- nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,
- fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,
- fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,
- fCurPacketMarkerBit);
實際上現在應該明白了,從getNextFrame()函式中傳入的第一個引數fBuffer最終儲存的即是從資料包連結串列物件中取出的資料,並且在呼叫上面的use()函式後就可以使用了。
而在void MultiFramedRTPSource::doGetNextFrame1()函式中程式碼顯示的最終呼叫我們註冊的void FileSink::afterGettingFrame()正好是在use()函式呼叫之後的afterGetting(this)中呼叫。我們再看看afterGettingFrame()做了什麼處理:
- void FileSink::afterGettingFrame(void* clientData, unsigned frameSize,
- unsigned numTruncatedBytes,
- struct timeval presentationTime,
- unsigned /*durationInMicroseconds*/)
- {
- FileSink* sink = (FileSink*)clientData;
- sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime);
- }
- void FileSink::afterGettingFrame(unsigned frameSize,
- unsigned numTruncatedBytes,
- struct timeval presentationTime)
- {
- if (numTruncatedBytes > 0)
- {
- envir() << "FileSink::afterGettingFrame(): The input frame data was too large for our buffer size ("
- << fBufferSize << "). "
- << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing the \"bufferSize\" parameter in the \"createNew()\" call to at least "
- << fBufferSize + numTruncatedBytes << "\n";
- }
- addData(fBuffer, frameSize, presentationTime);
- if (fOutFid == NULL || fflush(fOutFid) == EOF)
- {
- // The output file has closed. Handle this the same way as if the
- // input source had closed:
- onSourceClosure(this);
- stopPlaying();
- return;
- }
- if (fPerFrameFileNameBuffer != NULL)
- {
- if (fOutFid != NULL) { fclose(fOutFid); fOutFid = NULL; }
- }
- // Then try getting the next frame:
- continuePlaying();
- }
從上面程式碼可以看到呼叫了addData()函式將資料儲存到檔案中,然後繼續continuePlaying()又去獲取下一幀資料然後處理,直到遇到迴圈結束然後依次退出呼叫函式。最後看看addData()函式的實現即可知:
- void FileSink::addData(unsigned charconst* data, unsigned dataSize,
- struct timeval presentationTime)
- {
- if (fPerFrameFileNameBuffer != NULL)
- {
- // Special case: Open a new file on-the-fly for this frame
- sprintf(fPerFrameFileNameBuffer, "%s-%lu.%06lu", fPerFrameFileNamePrefix,
- presentationTime.tv_sec, presentationTime.tv_usec);
- fOutFid = OpenOutputFile(envir(), fPerFrameFileNameBuffer);
- }
- // Write to our file:
- #ifdef TEST_LOSS
- static unsigned const framesPerPacket = 10;
- static unsigned const frameCount = 0;
- static Boolean const packetIsLost;
- if ((frameCount++)%framesPerPacket == 0)
- {
- packetIsLost = (our_random()%10 == 0); // simulate 10% packet loss #####
- }
- if (!packetIsLost)
- #endif
- if (fOutFid != NULL && data != NULL)
- {
- fwrite(data, 1, dataSize, fOutFid);
- }
- }
最後呼叫系統函式fwrite()實現寫入檔案功能。
總結:從上面的分析可知,如果要取得從RTSP伺服器端接收並儲存的資料幀,我們只需要定義一個類並實現如下格式兩個的函式,並宣告一個指標地址buffer用於指向資料幀,再在continuePlaying()函式中呼叫getNextFrame(buffer,...)即可。
- typedefvoid (afterGettingFunc)(void* clientData, unsigned frameSize,
- unsigned numTruncatedBytes,
- struct timeval presentationTime,
- unsigned durationInMicroseconds);
- typedefvoid (onCloseFunc)(void* clientData);
然後再在afterGettingFunc的函式中即可使用buffer。.