1. 程式人生 > >Live555-基類研究二

Live555-基類研究二

TaskSecheduler類是一個任務排程器,它是整個Live555的任務排程中心,程式在任何時刻的任何動作,均由該類完成。其核心函式是SingleStep。Live555的任務主要分為Delayed Event、Socket Event以及Trigger Event。其類層次結構如下圖所示:

從類圖中可以看到,其類層次結構和第二節中的UsageEnvironment類極其相似。下面讓我們對每個類進行一個簡單的認識。

TaskSecheduler定義了一些介面,用來對各種任務進行管理。該類中最為重要的一個函式莫過於doEventLoop了,整個任務的排程便是通過呼叫該函式實現的。每一次迴圈,該函式都會執行如下的幾步:

1.       首先處理IO事件。程式通過select函式選擇那些已經準備好的IO檔案描述符,對其進行讀、寫或者異常處理。與該任務相關的介面為setBackgroundHandling、disableBackgroundHandling以及moveSocketHandling。IO事件會在任務中反覆執行。

2.       然後將處理觸發事件。由於作者採用了一個機器位元組中的位來儲存觸發事件的數量,所以觸發事件的數量受機器限制。對X86系統則32個觸發事件。這樣做的好處是效率高,但缺點就是數量受限制。與該任務相關的介面為createEventTrigger、deleteEventTrigger和triggerEvent。事件一旦被觸發後,將會立即刪除,避免事件再次觸發。

3.       最後將執行延遲任務。延遲任務儲存在一個延遲佇列中,通過時間來指定何時執行。有關延遲任務,我們放在後面的小節中來專門學習。延遲任務執行後也將從延遲佇列事刪除。

BasicTaskScheduler0實現了延遲任務和觸發事件。觸發事件是通過一個機器字長所能表示的位來處理的,在內部被定義為fTriggersAwaitingHandling,是int型別。從最高位開始儲存。比如,假設系統是32位機,並且只有一個待觸發事件,那麼fTriggersAwaitingHandling的值為0x80000000(對應二進位制為10000000 0000 0000 0000 0000 0000 0000)。該類中還儲存了上一次觸發事件的ID以及mask,作為下一次排程的起點。這樣保證了排程程式能夠有序的執行所有待觸發事件,而不是每次都從頭開始掃描。它們的關係如下圖所示:

至於延遲任務的實現,我們放在單獨的小節中來學習。

BasicTaskScheduler實現了最後一個任務,即IO事件和核心排程程式SingleStep。IO任務的實現也很簡單,它被定義為一個雙向迴圈連結串列,連結串列的節點為HandlerDiscriptor。其實現類為HandlerSet,該類實現了對連結串列的增刪改查操作。與之對應的,作者還定義了一個迭代器類HandlerIterator,用於遍歷連結串列。而排程程式則實現上面所提到的三步操作,來依次執行各類任務。現在給出該函式的實現(為了幫助你的理解,我添加了一些中文註釋):

void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) {
	fd_set readSet = fReadSet;
	fd_set writeSet = fWriteSet;
	fd_set exceptionSet = fExceptionSet;

	DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm();
	struct timeval tv_timeToDelay;

	tv_timeToDelay.tv_sec = timeToDelay.seconds();
	tv_timeToDelay.tv_usec = timeToDelay.useconds();

	/**
	 * 一個太大的tv_sec將導致select函式失敗
	 * 因此,確保它不大於一萬秒,即11.5天
	 */
	const long MAX_TV_SEC = MILLION;

	if (tv_timeToDelay.tv_sec > MAX_TV_SEC) {
		tv_timeToDelay.tv_sec = MAX_TV_SEC;
	}

	/**
	 * 檢查最大延遲時間是否大於一萬秒
	 * 以及微秒數是否大於一百萬微秒,即1秒
	 */
	if (maxDelayTime > 0 && (tv_timeToDelay.tv_sec > (long) maxDelayTime / MILLION
			|| (tv_timeToDelay.tv_sec == (long) maxDelayTime / MILLION
					&& tv_timeToDelay.tv_usec > (long) maxDelayTime % MILLION))) {
		tv_timeToDelay.tv_sec = maxDelayTime / MILLION;
		tv_timeToDelay.tv_usec = maxDelayTime % MILLION;
	}

	int selectResult = select(fMaxNumSockets, &readSet, &writeSet,
			&exceptionSet, &tv_timeToDelay);
	if (selectResult < 0) {
		if (errno != EINTR && errno != EAGAIN) {
			/**
			 * 哭了,錯誤
			 */
			internalError();
		}
	}

	/**
	 * 迭代器
	 */
	HandlerIterator iter(*fHandlers);
	HandlerDescriptor* handler;

	/**
	 * 找到上次執行後的處理程式佇列中的下一個
	 * 這裡先找到上次執行時的socket號
	 */
	if (fLastHandledSocketNum >= 0) {
		while ((handler = iter.next()) != NULL) {
			if (handler->socketNum == fLastHandledSocketNum)
				break;
		}
		/**
		 * 沒有找到,可能已經被移除,重置延時佇列
		 */
		if (handler == NULL) {
			fLastHandledSocketNum = -1;
			iter.reset();
		}
	}

	/**
	 * 從找到的handler開始,執行其下一個,不管其狀態是什麼,皆執行
	 * 當然,也可能是從佇列頭開始執行的
	 */
	while ((handler = iter.next()) != NULL) {
		int sock = handler->socketNum;
		int resultConditionSet = 0;

		/**
		 * 檢查
		 */
		if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet))
			resultConditionSet |= SOCKET_READABLE;

		if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet))
			resultConditionSet |= SOCKET_WRITABLE;

		if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet))
			resultConditionSet |= SOCKET_EXCEPTION;

		if ((resultConditionSet & handler->conditionSet) != 0
				&& handler->handlerProc != NULL) {
			/**
			 * 儲存sock號,排程程式下次將從該位置繼續執行,下同
			 */
			fLastHandledSocketNum = sock;

			/**
			 * 呼叫事件處理函式
			 */
			(*handler->handlerProc)(handler->clientData, resultConditionSet);
			break;
		}
	}

	/**
	 * 我們沒有呼叫處理程式
	 * 因此,從重再來一次
	 * 造成這樣的原因可能是從上一次執行處理程式的位置向後沒有找到任何可執行的處理程式了
	 * 於是從頭開始尋找處理程式
	 */
	if (handler == NULL && fLastHandledSocketNum >= 0) {
		iter.reset();

		while ((handler = iter.next()) != NULL) {
			int sock = handler->socketNum;
			int resultConditionSet = 0;

			if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet))
				resultConditionSet |= SOCKET_READABLE;

			if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet))
				resultConditionSet |= SOCKET_WRITABLE;

			if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet))
				resultConditionSet |= SOCKET_EXCEPTION;

			if ((resultConditionSet & handler->conditionSet) != 0
					&& handler->handlerProc != NULL) {
				fLastHandledSocketNum = sock;

				(*handler->handlerProc)(handler->clientData, resultConditionSet);
				break;
			}
		}

		/**
		 * 依然沒有找到任務何執行的handler
		 * 將其值置為-1
		 * 以告訴處理程式,下次應該從頭開始尋找handler
		 */
		if (handler == NULL)
			fLastHandledSocketNum = -1;
	}

	/**
	 * 響應新觸發的事件
	 */
	if (fTriggersAwaitingHandling != 0) {
		/**
		 * 首先檢查是否只有一個待觸發事件
		 */
		if (fTriggersAwaitingHandling == fLastUsedTriggerMask) {
			fTriggersAwaitingHandling = 0;
			if (fTriggeredEventHandlers[fLastUsedTriggerNum] != NULL) {
				/**
				 * 執行事件處理函式
				 */
				(*fTriggeredEventHandlers[fLastUsedTriggerNum])(
						fTriggeredEventClientDatas[fLastUsedTriggerNum]);
			}
		} else {
			/**
			 * 尋找待執行的觸發事件
			 */
			unsigned i = fLastUsedTriggerNum;
			EventTriggerId mask = fLastUsedTriggerMask;

			do {
				i = (i + 1) % MAX_NUM_EVENT_TRIGGERS;
				mask >>= 1;
				if (mask == 0)
					mask = 0x80000000;

				if ((fTriggersAwaitingHandling & mask) != 0) {
					fTriggersAwaitingHandling &= ~mask;
					if (fTriggeredEventHandlers[i] != NULL) {
						/**
						 * 響應事件
						 */
						(*fTriggeredEventHandlers[i])(
								fTriggeredEventClientDatas[i]);
					}

					fLastUsedTriggerMask = mask;
					fLastUsedTriggerNum = i;
					break;
				}
			} while (i != fLastUsedTriggerNum);
		}
	}

	/**
	 * 執行一個最迫切的延遲任務
	 */
	fDelayQueue.handleAlarm();
}

好了,到目前為止,我們已經瞭解了這三個類的作用及其工作原理,下面讓我們來寫一段測試程式碼,分別對三類任務進行測試。下面給出我的實現:
/*
 * TestTaskScheduler.cpp
 *
 *  Created on: 2012-4-10
 *      Author: lovey599
 *
 *  通過以下測試程式,我們可以看到:
 *  	除了IO任務以外的其它任務
 *  	執行後將從佇列中刪除
 *  	不再執行
 *  通過以下測試程式碼,你是否對doEventLoop函式有了更加清晰的認識?
 *  對整個框架的啟動機制有了清晰的瞭解?
 *  同時,我們也可以看到,排程中心就是一個永真迴圈,並未對多執行緒提供更多的支援
 */

#include <iostream>

#include "BasicUsageEnvironment.hh"

using namespace std;

/**
 * 後臺IO任務處理函式(包括socket)
 * 此處將資料輸出到控制檯
 */
void taskFun(void* clientData, int mask) {
	/**
	 * 檢查
	 */
	do {
		if (mask & SOCKET_EXCEPTION) {
			cout << "IO Event:Oh,my god!" << endl;
			break;
		}

		if (mask & SOCKET_WRITABLE) {
			cout << "IO Event(Writable):" <<  (char*) clientData << endl;
		}

		if (mask & SOCKET_READABLE) {
			cout << "IO Event(Readable):" <<  (char*) clientData << endl;
		}

		sleep(1);
	} while (0);
}

/**
 * 觸發事件回撥函式
 */
void eventFun(void* clientData) {
	cout << "Event Trigger:" << (char*) clientData << endl;
	sleep(1);
}

/**
 * 延遲任務回撥函式
 */
void delayedFun(void* clientData) {
	cout << "Delayed Task:" << (char*) clientData << endl;
	sleep(1);
}

int main(int argc, char* argv[]) {
	TaskScheduler* scheduler = BasicTaskScheduler::createNew();
	UsageEnvironment* env = BasicUsageEnvironment::createNew(*scheduler);

	char taskFunData[] = "do task...";

	/**
	 * 觸發事件測試
	 */
	EventTriggerId id = scheduler->createEventTrigger(&eventFun);

	if ((id & ~0) == 0) {
		cout << "Create Event Trigger Failed.\n";
	} else {
		/**
		 * 將其加入排程佇列
		 * 不要被名字迷惑了
		 * 這裡僅僅是使其成為可排程狀態,並不是立即執行
		 * 以便doEventLoop()執行它
		 */
		scheduler->triggerEvent(id, (void*) taskFunData);
	}

	/**
	 * 測試後臺IO任務
	 */
	scheduler->setBackgroundHandling(STDOUT_FILENO, SOCKET_WRITABLE, &taskFun,
			(void*) taskFunData);

	/**
	 * 延遲任務測試
	 * 延遲5秒後執行
	 */
	scheduler->scheduleDelayedTask(5000000, delayedFun, (void*) taskFunData);

	/**
	 * 啟動任務排程
	 */
	env->taskScheduler().doEventLoop();

	return 0;
}