MongoDB連線池的實現
幾乎每一種資料庫都會有連線池, 來減少頻繁的建立刪除連線的開銷, 在MongoDB裡面是通過訊號量執行緒同步方式來對建立、銷燬進行管理。
訊號量基礎
int sem_init(sem_t *sem, int pshared, unsigned int value)
sem是要初始化的訊號量,pshared表示此訊號量是在程序間共享(=1)還是執行緒間共享(=0),value是訊號量的初始值。
int sem_destroy(sem_t *sem);
其中sem是要銷燬的訊號量。只有用sem_init初始化的訊號量才能用sem_destroy銷燬。
int sem_wait(sem_t *sem); int sem_trywait(sem_t *sem); int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
sem_wait:等待訊號量,如果訊號量的值大於0, 將訊號量的值減1,立即返回。如果訊號量的值為0,則執行緒阻塞。相當於P操作。成功返回0,失敗返回-1。
sem_trywait:與sem_wait類似, 只是如果遞減不能立即執行,呼叫將返回錯誤(errno 設定為EAGAIN)而不是阻塞。
sem_timewait:sem_wait() 類似,只不過 abs_timeout 指定一個阻塞的時間上限,如果呼叫因不能立即執行遞減而要阻塞。
int sem_post(sem_t *sem);
釋放訊號量,讓訊號量的值加1。相當於V操作。
連線池的實現
使用TicketHoder來封裝類執行緒控制訊號量函式:
TicketHolder::TicketHolder(int num) : _outof(num) { _check(sem_init(&_sem, 0, num)); } TicketHolder::~TicketHolder() { _check(sem_destroy(&_sem)); } bool TicketHolder::tryAcquire() { while (0 != sem_trywait(&_sem)) { switch (errno) { case EAGAIN: return false; case EINTR: break; default: _check(-1); } } return true; } void TicketHolder::waitForTicket() { while (0 != sem_wait(&_sem)) { switch (errno) { case EINTR: break; default: _check(-1); } } } void TicketHolder::release() { _check(sem_post(&_sem)); }
在MongoDB啟動的時候, 會先建立一個PortMessageServer,並在裡面指定一個有配置引數或者命令列引數指定的最大的連線數, 然後通過setupSockets建立一個socket並繫結, 並將其加入到Listener裡面的std::vector _socks;
static void _initAndListen(int listenPort) {
...
auto handler = std::make_shared<MyMessageHandler>();
MessageServer* server = createServer(options, std::move(handler));
server->setAsTimeTracker();
if (!server->setupSockets()) {
error() << "Failed to set up sockets during startup.";
return;
}
...
server->run();
}
MessagePortServer裡面, 通過Listener::initAndListen函式,最終在 PortMessageServer::accepted裡面來建立新的執行緒處理本次的操作。
這裡可以看到, 每一次新的操作, TicketHoder::tryAcquire會試圖進入訊號量的程式碼區, 如果訊號量的連線數大於1, 就會獲得訊號量鎖, 並且將連線數減少1; 如果此時的連線數為0 , tryAcquire會返回失敗, 表明連線數已滿, 無法連線。
獲得訊號量鎖之後, 會建立一下新的處理執行緒, 指定其處理函式為:
PortMessageServer::handleIncomingMsg.
class PortMessageServer : public MessageServer, public Listener {
virtual void accepted(std::shared_ptr<Socket> psocket, long long connectionId) {
ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2);
std::unique_ptr<MessagingPortWithHandler> portWithHandler(
new MessagingPortWithHandler(psocket, _handler, connectionId));
if (!Listener::globalTicketHolder.tryAcquire()) {
log() << "connection refused because too many open connections: "
<< Listener::globalTicketHolder.used() << endl;
return;
}
{
stdx::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get()));
thr.detach();
}
}