1. 程式人生 > >Qt一步步搭建TcpServer2——執行緒池

Qt一步步搭建TcpServer2——執行緒池

本章將在TcpServer的基礎上新增執行緒池,管理Session。

1、封裝TcpThread

正所謂工欲善其事,必先利其器。要實現一個執行緒池,按照Qt的執行緒用法(如果不懂用法的同學,可以參看我這篇部落格:效能特性測試系列4——QT執行緒與std::thread(下)之QThread)。要想在這裡好好用上去,還是得自己重寫一個執行緒類:
標頭檔案:

#ifndef TCPTHREAD_H
#define TCPTHREAD_H
#include <QThread>
#include <atomic>

class TcpThread : public
QThread { public: TcpThread(); ~TcpThread(); virtual void run()override; //會話數 std::atomic_uint32_t SessionCount = 0; }; #endif // TCPTHREAD_H

cpp:

#include "TcpThread.h"

TcpThread::TcpThread()
{

}

TcpThread::~TcpThread()
{

}

void TcpThread::run()
{
    exec();
}

又是個看起來很簡單的類,彆著急,以後還要加東西的。
關於run裡的exec()解釋下:
這裡寫圖片描述


新增進事件迴圈,這樣就能確保執行緒不執行完,且加進來的Session能夠順利接收自己的事件。

2、修改TcpSession

既然QThread都出來了,那麼TcpSession也要相應的調整調整了:
標頭檔案:
新增TcpThread的指標:

private:
    TcpThread *Thread_ = nullptr;

建構函式修改下:

TcpSession(TcpThread *thread);

添加個訊號函式和槽函式:

void SignalDisConnected(void *);
void SlotDisConnected();

cpp:

TcpSession::TcpSession(TcpThread *thread)
{
    this->Thread_ = thread;
    connect(this, &TcpSession::readyRead,
            this, &TcpSession::SlotStartRead);
    connect(this, &TcpSession::disconnected,
            this, &TcpSession::SlotDisConnected);
}

TcpSession::~TcpSession()
{
    disconnect(this, &TcpSession::readyRead,
               this, &TcpSession::SlotStartRead);
    disconnect(this, &TcpSession::disconnected,
               this, &TcpSession::SlotDisConnected);
}

void TcpSession::Disconnect()
{
    qDebug() << "TcpSession::Disconnect threadID:"<< QThread::currentThreadId();
    this->disconnectFromHost();
}

qint64 TcpSession::Write(const char *data, qint64 len)
{
    return this->write(data, len);
}

qint64 TcpSession::Write(const char *data)
{
    return this->write(data);
}

void TcpSession::SlotStartRead()
{
    qDebug() << "TcpSession::SlotStartRead threadID:"<< QThread::currentThreadId();
    QByteArray buffer;
    buffer = this->readAll();
    emit this->SignalRead(buffer.toStdString().c_str(), buffer.length());
}

void TcpSession::SlotDisConnected()
{
    if(Thread_)
        --Thread_->SessionCount;
    //通知會話斷開連線
    if(OnDisConnected)
        OnDisConnected(this);
    emit this->SignalDisConnected(this);
}

簡單來說,就是添加了TcpThread的指標,並且在斷開連線時,減少TcpThread上的會話計數。同時發個訊號,通知斷開。

3、新增SessionThreads

準備材料已經弄完,現在就可以正式新增執行緒池了:
標頭檔案:

#ifndef SESSIONTHREADS_H
#define SESSIONTHREADS_H
#include <vector>
#include <unordered_map>
#include <stdint.h>
#include <memory>
#include <mutex>
#include "TcpThread.h"
#include "TcpSession.h"

class SessionThreads : public QObject
{
    Q_OBJECT
public:
    SessionThreads();
    ~SessionThreads();

    //啟動執行緒池
    bool Start(uint32_t threadNum);
    //關閉
    void Stop();
    //獲取最小會話數執行緒
    TcpThread *PickMinThread();
    //獲取會話數
    std::vector<uint32_t> GetSessionSize()const;
    //新增會話
    std::shared_ptr<TcpSession> CreateSession(qintptr handle);

private slots:
    //會話斷開
    void SlotSessionDisConnected(void *id);

private:
    std::vector<TcpThread*> ThreadList_;
    //互斥量
    std::mutex Lock_;
    //會話列表
    std::unordered_map<void*, std::shared_ptr<TcpSession>> SessionList_;

    bool IsRunning_ = false;

};

#endif // SESSIONTHREADS_H

cpp檔案:

#include "SessionThreads.h"

SessionThreads::SessionThreads()
{

}

SessionThreads::~SessionThreads()
{
    this->Stop();
}


bool SessionThreads::Start(uint32_t threadNum)
{
    if(IsRunning_)
        return true;
    for(uint32_t i = 0; i < threadNum; ++i)
    {
        TcpThread *thread = new TcpThread();
        ThreadList_.push_back(thread);
        thread->start();
    }
    IsRunning_ = true;
    return true;
}

void SessionThreads::Stop()
{
    if(!IsRunning_)
        return;
    //TODO  待完善
    for(TcpThread *thread : this->ThreadList_)
    {
        thread->exit();
        thread->wait();
    }
    for(TcpThread *thread : this->ThreadList_)
        delete thread;
    this->ThreadList_.clear();
    std::unordered_map<void*, std::shared_ptr<TcpSession>>::iterator itor = SessionList_.begin();
    //關閉連線
    for(itor = SessionList_.begin(); itor != SessionList_.end(); ++itor)
    {
        std::shared_ptr<TcpSession> session = itor->second;
        if(session.get())
            session.get()->Disconnect();
    }
    this->SessionList_.clear();
    IsRunning_ = false;
}

TcpThread *SessionThreads::PickMinThread()
{
    TcpThread *thread = nullptr;
    uint32_t mincount = 0;
    for(TcpThread *tmpthread : this->ThreadList_)
    {
        uint32_t tmpcount = tmpthread->SessionCount;
        if(mincount == 0 || tmpcount < mincount)
        {
            mincount = tmpcount;
            thread = tmpthread;
        }
    }
    return thread;
}

std::vector<uint32_t> SessionThreads::GetSessionSize() const
{
    std::vector<uint32_t> vec;
    for(TcpThread *thread : this->ThreadList_)
        vec.push_back(thread->SessionCount);
    return vec;
}

std::shared_ptr<TcpSession> SessionThreads::CreateSession(qintptr handle)
{
    TcpThread *thread = this->PickMinThread();
    std::shared_ptr<TcpSession> session = std::make_shared<TcpSession>(thread);
    connect(session.get(), &TcpSession::SignalDisConnected,
            this, &SessionThreads::SlotSessionDisConnected);
    session->setSocketDescriptor(handle);
    session->moveToThread(thread);
    ++thread->SessionCount;
    //加鎖
    std::lock_guard<std::mutex> locker(this->Lock_);
    this->SessionList_[session.get()] = session;
    return session;
}

//會話斷開
void SessionThreads::SlotSessionDisConnected(void *id)
{
    //加鎖
    std::lock_guard<std::mutex> locker(this->Lock_);
    std::unordered_map<void*, std::shared_ptr<TcpSession>>::iterator itor = SessionList_.begin();
    itor = SessionList_.find(id);
    if(itor != SessionList_.end())
    {
        SessionList_.erase(itor);
        qDebug()<< "SessionThreads::SlotSessionDisConnected" << QThread::currentThread();
    }
}

其實這裡不應該叫執行緒池,準確說,應該是會話執行緒池,主要職責就是負責管理會話,和執行緒。

而且我們可以看到,這個結構其實和TcpServer很像,一樣啟動,關閉,這裡啟動需要的引數是執行緒數量。因為標頭檔案的註釋比較詳細了,我相信多閱讀的話,應該都能看懂,我就不費太多的筆墨了。

4、修改TcpServer

執行緒池都好了,我們也就可以把TcpServer裡的雜七雜八的東西刪掉了,讓他專心負責監聽埠,接受連線就行了:
標頭檔案:

class TcpServer : public QTcpServer
{
    Q_OBJECT
public:
    TcpServer();
    ~TcpServer();

    bool Start(int port, int threadnum);
    void Stop();

    std::vector<uint32_t> GetSessionSize() const ;

public:
    //新連接回調
    std::function<void(std::shared_ptr<TcpSession> &)> OnAccepted = nullptr;

protected:
    virtual void incomingConnection(qintptr handle);

private:
    bool IsRunning_ = false;
    SessionThreads SessionThreads_;
};

這一下就很清爽了,TcpServer的事情很簡單,至於執行緒,會話這些東西,統統都丟給執行緒池去做了。
cpp:

#include "TcpServer.h"

TcpServer::TcpServer()
{
}

TcpServer::~TcpServer()
{
    this->Stop();
}

bool TcpServer::Start(int port, int threadnum)
{
    if(IsRunning_)
        return true;
     //啟動執行緒池
    SessionThreads_.Start(threadnum);
    //監聽埠
    if(!this->listen(QHostAddress::Any, port))
        return false;

    IsRunning_ = true;
    qDebug() << "TcpServer::Start threadID:"<< QThread::currentThreadId();
    return true;
}

void TcpServer::Stop()
{
    if(!IsRunning_)
        return;
    //關閉監聽
    this->close();
    //關閉執行緒池
    SessionThreads_.Stop();
    IsRunning_ = false;
}

std::vector<uint32_t> TcpServer::GetSessionSize() const
{
    return this->SessionThreads_.GetSessionSize();
}

void TcpServer::incomingConnection(qintptr handle)
{
    qDebug() << "TcpServer::incomingConnection threadID:"<< QThread::currentThreadId();
    std::shared_ptr<TcpSession> session = SessionThreads_.CreateSession(handle);
    if(this->OnAccepted)
        this->OnAccepted(session);
}

當然 ,這裡少了port和執行緒數的合法性驗證,下章會講到。

5、總結

執行緒池完成了,那麼整個Server端也基本完成了,可以直接使用並接收連線了整個流程大致是沒什麼問題了。

當然,細心的話,你可能會發現,我在關服的時候順序是:
關閉執行緒池裡所有執行緒-》刪除執行緒池裡所有執行緒-》斷開所有會話連線-》清空會話。
這樣做的話,斷開所有會話連線的時候,都是在TcpServer所線上程了,甚至此時的讀取資料也在這個執行緒。

有經驗的同學應該知道,關服的時候,為防止正在讀寫資料產生可能存在的問題,亦或者說為了可讀性,體現正常思維,正確的關服順序應該是:
斷開所有會話連線-》關閉執行緒池裡所有執行緒-》刪除執行緒池裡所有執行緒-》清空會話。

那麼問題來了,既然我知道正確的順序,為什麼我不按照正常的邏輯順序來幹呢?

原因:

因為直接按照這個順序的話,QTcpSocket不提供非同步斷開連線方法(即將斷開連線操作丟到所線上程佇列裡去),我們顯然不能在TcpServer所線上程裡這麼直接disconnect,邏輯上不對,並且這樣幹qt也不讓(直接給你報錯)。

所以這裡我採取了一個投機的方法(這種行為不可取,我是為了寫部落格,一步步走,才這樣乾的)。

那麼要實現正確的關服邏輯怎麼辦呢?
很簡單,請看下篇~~~~