1. 程式人生 > >MongoDB連線池的實現

MongoDB連線池的實現

幾乎每一種資料庫都會有連線池, 來減少頻繁的建立刪除連線的開銷, 在MongoDB裡面是通過訊號量執行緒同步方式來對建立、銷燬進行管理。

訊號量基礎

int sem_init(sem_t *sem, int pshared, unsigned int value)

sem是要初始化的訊號量,pshared表示此訊號量是在程序間共享(=1)還是執行緒間共享(=0),value是訊號量的初始值。

int sem_destroy(sem_t *sem);

其中sem是要銷燬的訊號量。只有用sem_init初始化的訊號量才能用sem_destroy銷燬。

int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

sem_wait:等待訊號量,如果訊號量的值大於0, 將訊號量的值減1,立即返回。如果訊號量的值為0,則執行緒阻塞。相當於P操作。成功返回0,失敗返回-1。
sem_trywait:與sem_wait類似, 只是如果遞減不能立即執行,呼叫將返回錯誤(errno 設定為EAGAIN)而不是阻塞。
sem_timewait:sem_wait() 類似,只不過 abs_timeout 指定一個阻塞的時間上限,如果呼叫因不能立即執行遞減而要阻塞。

int sem_post(sem_t *sem); 

釋放訊號量,讓訊號量的值加1。相當於V操作。

連線池的實現

使用TicketHoder來封裝類執行緒控制訊號量函式:

TicketHolder::TicketHolder(int num) : _outof(num) {
    _check(sem_init(&_sem, 0, num));
}

TicketHolder::~TicketHolder() {
    _check(sem_destroy(&_sem));
}

bool TicketHolder::tryAcquire() {
    while (0 != sem_trywait(&_sem)) {
        switch (errno) {
            case EAGAIN:
                return false;
            case EINTR:
                break;
            default:
                _check(-1);
        }
    }
    return true;
}

void TicketHolder::waitForTicket() {
    while (0 != sem_wait(&_sem)) {
        switch (errno) {
            case EINTR:
                break;
            default:
                _check(-1);
        }
    }
}

void TicketHolder::release() {
    _check(sem_post(&_sem));
}

在MongoDB啟動的時候, 會先建立一個PortMessageServer,並在裡面指定一個有配置引數或者命令列引數指定的最大的連線數, 然後通過setupSockets建立一個socket並繫結, 並將其加入到Listener裡面的std::vector _socks;

static void _initAndListen(int listenPort) { 
    ...
    auto handler = std::make_shared<MyMessageHandler>();
    MessageServer* server = createServer(options, std::move(handler));
    server->setAsTimeTracker();

    if (!server->setupSockets()) {
        error() << "Failed to set up sockets during startup.";
        return;
    }
    ...
    server->run();
 }

MessagePortServer裡面, 通過Listener::initAndListen函式,最終在 PortMessageServer::accepted裡面來建立新的執行緒處理本次的操作。
這裡可以看到, 每一次新的操作, TicketHoder::tryAcquire會試圖進入訊號量的程式碼區, 如果訊號量的連線數大於1, 就會獲得訊號量鎖, 並且將連線數減少1; 如果此時的連線數為0 , tryAcquire會返回失敗, 表明連線數已滿, 無法連線。

獲得訊號量鎖之後, 會建立一下新的處理執行緒, 指定其處理函式為:
PortMessageServer::handleIncomingMsg.

class PortMessageServer : public MessageServer, public Listener {
    virtual void accepted(std::shared_ptr<Socket> psocket, long long connectionId) {
        ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2);
        std::unique_ptr<MessagingPortWithHandler> portWithHandler(
            new MessagingPortWithHandler(psocket, _handler, connectionId));

        if (!Listener::globalTicketHolder.tryAcquire()) {
            log() << "connection refused because too many open connections: "
                  << Listener::globalTicketHolder.used() << endl;
            return;
        }

        {
                stdx::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get()));
                thr.detach();
        }
  }