1. 程式人生 > >伺服器端程式設計心得(三)—— 一個伺服器程式的架構介紹

伺服器端程式設計心得(三)—— 一個伺服器程式的架構介紹

本文將介紹我曾經做過的一個專案的伺服器架構和伺服器程式設計的一些重要細節。

一、程式執行環境

作業系統:centos 7.0

編譯器:gcc/g++ 4.8.3     cmake 2.8.11

mysql資料庫:5.5.47

專案程式碼管理工具:VS2013

一、程式結構

該程式總共有17個執行緒,其中分為9個數據庫工作執行緒D和一個日誌執行緒L,6個普通工作執行緒W,一個主執行緒M。(以下會用這些字母來代指這些執行緒)

(一)、資料庫工作執行緒的用途

 9個數據庫工作執行緒線上程啟動之初,與mysql建立連線,也就是說每個執行緒都與mysql保持一路連線,共9個數據庫連線。

 每個資料庫工作執行緒同時存在兩個任務佇列,第一個佇列A存放需要執行資料庫增刪查改操作的任務sqlTask,第二個佇列B存放sqlTask執行完成後的結果。sqlTask執行完成後立即放入結果佇列中,因而結果佇列中任務也是一個個的需要執行的任務。大致虛擬碼如下:

void db_thread_func()
{
	while (!m_bExit)
	{
		if (NULL != (pTask = m_sqlTask.Pop()))
		{
			//從m_sqlTask中取出的任務先執行完成後,pTask將攜帶結果資料
			pTask->Execute();			
			//得到結果後,立刻將該任務放入結果任務佇列
			m_resultTask.Push(pTask);
			continue;
		}

		sleep(1000);
	}
}

現在的問題來了:

1. 任務佇列A中的任務從何而來,目前只有消費者,沒有生產者,那麼生產者是誰?

2. 任務佇列B中的任務將去何方,目前只有生產者沒有消費者。

這兩個問題先放一會兒,等到後面我再來回答。

(二)工作執行緒和主執行緒

在介紹主執行緒和工作執行緒具體做什麼時,我們介紹下伺服器程式設計中常常抽象出來的幾個概念(這裡以tcp連線為例):

1.  TcpServer 即Tcp服務,伺服器需要繫結ip地址和埠號,並在該埠號上偵聽客戶端的連線(往往由一個成員變數TcpListener來管理偵聽細節)。所以一個TcpServer要做的就是這些工作。除此之外,每當有新連線到來時,TcpServer需要接收新連線,當多個新連線存在時,TcpServer需要有條不紊地管理這些連線:連線的建立、斷開等,即產生和管理下文中說的TcpConnection物件。

2.一個連線對應一個TcpConnection物件,TcpConnection物件管理著這個連線的一些資訊:如連線狀態、本端和對端的ip地址和埠號等。

3.資料通道物件Channel,Channel記錄了socket的控制代碼,因而是一個連線上執行資料收發的真正執行者,Channel物件一般作為TcpConnection的成員變數。

4. TcpSession物件,是將Channel收取的資料進行解包,或者對準備好的資料進行裝包,並傳給Channel傳送。

歸納起來:一個TcpServer依靠TcpListener對新連線的偵聽和處理,依靠TcpConnection物件對連線上的資料進行管理,TcpConnection實際依靠Channel對資料進行收發,依靠TcpSession對資料進行裝包和解包。也就是說一個TcpServer存在一個TcpListener,對應多個TcpConnection,有幾個TcpConnection就有幾個TcpSession,同時也就有幾個Channel。

以上說的TcpServer、TcpListener、TcpConnection、Channel和TcpSession是伺服器框架的網路層。一個好的網路框架,應該做到與業務程式碼脫耦。即上層程式碼只需要拿到資料,執行業務邏輯,而不用關注資料的收發和網路資料包的封包和解包以及網路狀態的變化(比如網路斷開與重連)。

拿資料的傳送來說:

當業務邏輯將資料交給TcpSession,TcpSession將資料裝好包後(裝包過程後可以有一些加密或壓縮操作),交給TcpConnection::SendData(),而TcpConnection::SendData()實際是呼叫Channel::SendData(),因為Channel含有socket控制代碼,所以Channel::SendData()真正呼叫send()/sendto()/write()方法將資料發出去。

對於資料的接收,稍微有一點不同:

通過select()/poll()/epoll()等IO multiplex技術,確定好了哪些TcpConnection上有資料到來後,啟用該TcpConnection的Channel物件去呼叫recv()/recvfrom()/read()來收取資料。資料收到以後,將資料交由TcpSession來處理,最終交給業務層。注意資料收取、解包乃至交給業務層是一定要分開的。我的意思是:最好不要解包並交給業務層和資料收取的邏輯放在一起。因為資料收取是IO操作,而解包和交給業務層是邏輯計算操作。IO操作一般比邏輯計算要慢。到底如何安排要根據伺服器業務來取捨,也就是說你要想好你的伺服器程式的效能瓶頸在網路IO還是邏輯計算,即使是網路IO,也可以分為上行操作和下行操作,上行操作即客戶端發資料給伺服器,下行即伺服器發資料給客戶端。有時候資料上行少,下行大。(如遊戲伺服器,一個npc移動了位置,上行是該客戶端通知伺服器自己最新位置,而下行確是伺服器要告訴在場的每個客戶端)。

while (!m_bQuit)
{
    epoll_or_select_func();

    handle_io_events();

    handle_other_things();
}

其中epoll_or_select_func()即是上文所說的通過select()/poll()/epoll()等IO multiplex技術,確定好了哪些TcpConnection上有資料到來。我的伺服器程式碼中一般只會監測socket可讀事件,而不會監測socket可寫事件。至於如何發資料,文章後面會介紹。所以對於可讀事件,以epoll為例,這裡需要設定的標識位是:

EPOLLIN 普通可讀事件(當連線正常時,產生這個事件,recv()/read()函式返回收到的位元組數;當連線關閉,這兩個函式返回0,也就是說我們設定這個標識已經可以監測到新來資料和對端關閉事件)

EPOLLRDHUP 對端關閉事件(linux man手冊上說這個事件可以監測對端關閉,但我實際除錯時傳送即使對端關閉也沒觸發這個事件,仍然是EPOLLIN,只不過此時呼叫recv()/read()函式,返回值會為0,所以實際專案中是否可以通過設定這個標識來監測對端關閉,仍然待考證)

EPOLLPRI 帶外資料

muduo裡面將epoll_wait的超時事件設定為1毫秒,我的另一個專案將epoll_wait超時時間設定為10毫秒。這兩個數值供大家參考。

這個專案中,工作執行緒和主執行緒都是上文程式碼中的邏輯,主執行緒監聽偵聽socket上的可讀事件,也就是監測是否有新連線來了。主執行緒和每個工作執行緒上都存在一個epollfd。如果新連線來了,則在主執行緒的handle_io_events()中接收新連線。產生的新連線的socket控制代碼掛接到哪個執行緒的epollfd上呢?這裡採取的做法是round-robin演算法,即存在一個物件CWorkerThreadManager記錄了各個工作執行緒上工作狀態。偽碼大致如下:

void attach_new_fd(int newsocketfd)
{
	workerthread = get_next_worker_thread(next);
	workerthread.attach_to_epollfd(newsocketfd);
	++next;
	if (next > max_worker_thread_num)
		next = 0;
}

即先從第一個工作執行緒的epollfd開始掛接新來socket,接著累加索引,這樣下次就是第二個工作執行緒了。如果所以超出工作執行緒數目,則從第一個工作重新開始。這裡解決了新連線socket“負載均衡”的問題。在實際程式碼中還有個需要注意的細節就是:epoll_wait的函式中的struct epoll_event 數量開始到底要設定多少個才合理?存在的顧慮是,多了浪費,少了不夠用,我在曾經一個專案中直接用的是4096:
const int EPOLL_MAX_EVENTS = 4096;
const int dwSelectTimeout = 10000;
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeout / 1000);

我在陳碩的muduo網路庫中發現作者才用了一個比較好的思路,即動態擴張數量:開始是n個,當發現有事件的fd數量已經到達n個後,將struct epoll_event數量調整成2n個,下次如果還不夠,則變成4n個,以此類推,作者巧妙地利用stl::vector在記憶體中的連續性來實現了這種思路:
//初始化程式碼
std::vector<struct epoll_event> events_(16);

//執行緒迴圈裡面的程式碼
while (m_bExit)
{
	int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), 1);
	if (numEvents > 0)
	{
		if (static_cast<size_t>(numEvents) == events_.size())
		{
			events_.resize(events_.size() * 2);
		}
	}
}

讀到這裡,你可能覺得工作執行緒所做的工作也不過就是呼叫handle_io_events()來接收網路資料,其實不然,工作執行緒也可以做程式業務邏輯上的一些工作。也就是在handle_other_things()裡面。那如何將這些工作加到handle_other_things()中去做呢?寫一個佇列,任務先放入佇列,再讓handle_other_things()從佇列中取出來做?我在該專案中也借鑑了muduo庫的做法。即handle_other_things()中呼叫一系列函式指標,偽碼如下:
void do_other_things()
{
	somefunc();
}


//m_functors是一個stl::vector,其中每一個元素為一個函式指標
void somefunc()
{
	for (size_t i = 0; i < m_functors.size(); ++i)
	{
		m_functors[i]();
	}

	m_functors.clear();
}

當任務產生時,只要我們將執行任務的函式push_back到m_functors這個stl::vector物件中即可。但是問題來了,如果是其他執行緒產生的任務,兩個執行緒同時操作m_functors,必然要加鎖,這也會影響效率。muduo是這樣做的:
void add_task(const Functor& cb)
{
	std::unique_lock<std::mutex> lock(mutex_);
	m_functors.push_back(cb);	
}

void do_task()
{
	std::vector<Functor> functors;
	{
		std::unique_lock<std::mutex> lock(mutex_);
		functors.swap(m_functors);
	}

	for (size_t i = 0; i < functors.size(); ++i)
	{
		functors[i]();
	}
}
看到沒有,利用一個棧變數functors將m_functors中的任務函式指標倒換(swap)過來了,這樣大大減小了對m_functors操作時的加鎖粒度。前後變化:變化前,相當於原來A給B多少東西,B消耗多少,A給的時候,B不能消耗;B消耗的時候A不能給。現在變成A將東西放到籃子裡面去,B從籃子裡面拿,B如果拿去一部分後,只有消耗完了才會來拿,或者A通知B去籃子裡面拿,而B忙碌時,A是不會通知B來拿,這個時候A只管將東西放在籃子裡面就可以了。
bool bBusy = false;
void add_task(const Functor& cb)
{
	std::unique_lock<std::mutex> lock(mutex_);
	m_functors_.push_back(cb);

	//B不忙碌時只管往籃子裡面加,不要通知B
	if (!bBusy)
	{
		wakeup_to_do_task();
	}
}

void do_task()
{
	bBusy = true;
	std::vector<Functor> functors;
	{
		std::unique_lock<std::mutex> lock(mutex_);
		functors.swap(pendingFunctors_);
	}

	for (size_t i = 0; i < functors.size(); ++i)
	{
		functors[i]();
	}

	bBusy = false;
}

看,多巧妙的做法!

因為每個工作執行緒都存在一個m_functors,現在問題來了,如何將產生的任務均衡地分配給每個工作執行緒。這個做法類似上文中如何將新連線的socket控制代碼掛載到工作執行緒的epollfd上,也是round-robin演算法。上文已經描述,此處不再贅述。

還有種情況,就是希望任務產生時,工作執行緒能夠立馬執行這些任務,而不是等epoll_wait超時返回之後。這個時候的做法,就是使用一些技巧喚醒epoll_wait,linux系統可以使用socketpair或timerevent、eventfd等技巧(這個細節在我的博文《伺服器端程式設計心得(一)—— 主執行緒與工作執行緒的分工》已經詳細介紹過了)。

上文中留下三個問題:

1. 資料庫執行緒任務佇列A中的任務從何而來,目前只有消費者,沒有生產者,那麼生產者是誰?

2.資料庫執行緒任務佇列B中的任務將去何方,目前只有生產者沒有消費者。

3.業務層的資料如何傳送出去?

問題1的答案是:業務層產生任務可能會交給資料庫任務佇列A,這裡的業務層程式碼可能就是工作執行緒中do_other_things()函式執行體中的呼叫。至於交給這個9個數據庫執行緒的哪一個的任務佇列,同樣採用了round-robin演算法。所以就存在一個物件CDbThreadManager來管理這九個資料庫執行緒。下面的偽碼是向資料庫工作執行緒中加入任務:

bool CDbThreadManager::AddTask(IMysqlTask* poTask )
{
    if (m_index >= m_dwThreadsCount)
    {
        m_index = 0;
    }

    return m_aoMysqlThreads[m_index++].AddTask(poTask);
}


同理問題2中的消費者也可能就是do_other_things()函式執行體中的呼叫。

現在來說問題3,業務層的資料產生後,經過TcpSession裝包後,需要傳送的話,產生任務丟給工作執行緒的do_other_things(),然後在相關的Channel裡面傳送,因為沒有監測該socket上的可寫事件,所以該資料可能呼叫send()或者write()時會阻塞,沒關係,sleep()一會兒,繼續傳送,一直嘗試,到資料發出去。偽碼如下:

bool Channel::Send()
{
	int offset = 0;
	while (true)
	{
		int n = ::send(socketfd, buf + offset, length - offset);
		if (n == -1)
		{
			if (errno == EWOULDBLOCK)
			{
				::sleep(100);
				continue;
			}
		}
		//對方關閉了socket,這端建議也關閉
		else if (n == 0)
		{
			close(socketfd);
			return false;
		}

		offset += n;
		if (offset >= length)
			break;

	}

	return true;	
}

最後,還有一個日誌執行緒沒有介紹,高效能的日誌實現方案目前並不常見。限於文章篇幅,下次再介紹。

zhangyl 2016.12.02晚12:35