QT 執行緒池 + TCP 小試(二)實現通訊功能
阿新 • • 發佈:2019-02-04
*免分資源連結點選開啟連結http://download.csdn.net/detail/goldenhawking/4492378
有了執行緒池,我們下一步就利用 QTcpServer 搭建一個伺服器,接受客戶端的連線,並把資料傳送到執行緒池上。由於 QTcpServer 資料太多了,這裡不在贅述。唯一值得注意的是,當客戶端退出時,如果執行緒池佇列中還有該客戶的資訊,這個資訊還會被處理,只是無法再發送回去而已。其實,還可實現成客戶端退出,就發一個訊號到執行緒池,刪除自己的所有任務。這個也很簡單,但之所以沒有做,因為這些資料的處理結果可能還會被其他消費者(而非生產者自己)使用,最典型的例子是從工業感測器上採集的資料,其生成的影象需要儲存到裝置中去。
QTcpSocket的 Write 方法預設是支援大體積資料的,即使一次發了500MB的資料,只要硬體資源可以承受,呼叫也會成功並立刻返回。接受者會以一定的載荷大小不停的觸發readyRead,直到傳送全部成功。但是,為了能夠觀察到並控制收發佇列中的資料包的大小、體積,我們在外層實現了一個傳送佇列,每次以 payLoad為大小發送資料包。這是從MFC中帶來的習慣,很難說好壞。
qghtcpserver.h
#ifndef QGHTCPSERVER_H #define QGHTCPSERVER_H #include <QTcpServer> #include <QMap> #include <QList> class QGHTcpServer : public QTcpServer { Q_OBJECT public: QGHTcpServer(QObject *parent,int nPayLoad = 4096); ~QGHTcpServer(); //踢出所有客戶 void KickAllClients(); QList <QObject *> clientsList(); void SetPayload(int nPayload); private: QMap<QObject *,QList<QByteArray> > m_buffer_sending; QMap<QObject *,QList<qint64> > m_buffer_sending_offset; QMap<QObject*,int> m_clientList; int m_nPayLoad; public slots: //新的客戶連線到來 void new_client_recieved(); //客戶連線被關閉 void client_closed(); //新的資料到來 void new_data_recieved(); //一批資料傳送完畢 void some_data_sended(qint64); //客戶端錯誤 void displayError(QAbstractSocket::SocketError socketError); //向客戶端傳送資料 void SendDataToClient(QObject * objClient,const QByteArray & dtarray); //向客戶端廣播資料,不包括 objFromClient void BroadcastData(QObject * objFromClient,const QByteArray & dtarray); signals: //錯誤資訊 void evt_SocketError(QObject * senderSock ,QAbstractSocket::SocketError socketError); //新的客戶端連線 void evt_NewClientConnected(QObject * client); //客戶端退出 void evt_ClientDisconnected(QObject * client); //收到一批資料 void evt_Data_recieved(QObject * ,const QByteArray & ); //一批資料被髮送 void evt_Data_transferred(QObject * client,qint64); }; #endif // QGHTCPSERVER_H
qghtcpserver.cpp
下一次,我會介紹最後的實現功能。#include "qghtcpserver.h" #include <assert.h> #include <QTcpSocket> QGHTcpServer::QGHTcpServer(QObject *parent,int nPayLoad ) : QTcpServer(parent), m_nPayLoad(nPayLoad) { assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024); connect(this, SIGNAL(newConnection()), this, SLOT(new_client_recieved())); } QGHTcpServer::~QGHTcpServer() { } QList <QObject *> QGHTcpServer::clientsList() { return m_clientList.keys(); } void QGHTcpServer::SetPayload(int nPayload) { m_nPayLoad = nPayload; assert(m_nPayLoad>=256 && m_nPayLoad<=16*1024*1024); } void QGHTcpServer::new_client_recieved() { QTcpSocket * sock_client = nextPendingConnection(); while (sock_client) { connect(sock_client, SIGNAL(readyRead()),this, SLOT(new_data_recieved())); connect(sock_client, SIGNAL(disconnected()),this,SLOT(client_closed())); connect(sock_client, SIGNAL(error(QAbstractSocket::SocketError)),this, SLOT(displayError(QAbstractSocket::SocketError))); connect(sock_client, SIGNAL(bytesWritten(qint64)), this, SLOT(some_data_sended(qint64))); m_clientList[sock_client] = 0; emit evt_NewClientConnected(sock_client); sock_client = nextPendingConnection(); } } void QGHTcpServer::client_closed() { QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender()); if (pSock) { emit evt_ClientDisconnected(pSock); m_buffer_sending.remove(pSock); m_buffer_sending_offset.remove(pSock); m_clientList.remove(pSock); pSock->deleteLater(); } } void QGHTcpServer::new_data_recieved() { QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender()); if (pSock) emit evt_Data_recieved(pSock,pSock->readAll()); } void QGHTcpServer::some_data_sended(qint64 wsended) { QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender()); if (pSock) { emit evt_Data_transferred(pSock,wsended); QList<QByteArray> & list_sock_data = m_buffer_sending[pSock]; QList<qint64> & list_offset = m_buffer_sending_offset[pSock]; while (list_sock_data.empty()==false) { QByteArray & arraySending = *list_sock_data.begin(); qint64 & currentOffset = *list_offset.begin(); qint64 nTotalBytes = arraySending.size(); assert(nTotalBytes>=currentOffset); qint64 nBytesWritten = pSock->write(arraySending.constData()+currentOffset,qMin((int)(nTotalBytes-currentOffset),m_nPayLoad)); currentOffset += nBytesWritten; if (currentOffset>=nTotalBytes) { list_offset.pop_front(); list_sock_data.pop_front(); } else break; } } } void QGHTcpServer::displayError(QAbstractSocket::SocketError socketError) { QTcpSocket * pSock = qobject_cast<QTcpSocket*>(sender()); if (pSock) { emit evt_SocketError(pSock,socketError); pSock->disconnectFromHost(); } } void QGHTcpServer::SendDataToClient(QObject * objClient,const QByteArray & dtarray) { if (m_clientList.find(objClient)==m_clientList.end()) return; QTcpSocket * pSock = qobject_cast<QTcpSocket*>(objClient); if (pSock&&dtarray.size()) { QList<QByteArray> & list_sock_data = m_buffer_sending[pSock]; QList<qint64> & list_offset = m_buffer_sending_offset[pSock]; if (list_sock_data.empty()==true) { qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad)); if (bytesWritten < dtarray.size()) { list_sock_data.push_back(dtarray); list_offset.push_back(bytesWritten); } } else { list_sock_data.push_back(dtarray); list_offset.push_back(0); } } } void QGHTcpServer::BroadcastData(QObject * objClient,const QByteArray & dtarray) { for(QMap<QObject *,int>::iterator p = m_clientList.begin();p!=m_clientList.end();p++) { QTcpSocket * pSock = qobject_cast<QTcpSocket*>(p.key()); if (pSock&&dtarray.size()&&pSock!=objClient) { QList<QByteArray> & list_sock_data = m_buffer_sending[pSock]; QList<qint64> & list_offset = m_buffer_sending_offset[pSock]; if (list_sock_data.empty()==true) { qint64 bytesWritten = pSock->write(dtarray.constData(),qMin(dtarray.size(),m_nPayLoad)); if (bytesWritten < dtarray.size()) { list_sock_data.push_back(dtarray); list_offset.push_back(bytesWritten); } else { list_sock_data.push_back(dtarray); list_offset.push_back(0); } } } } } void QGHTcpServer::KickAllClients() { QList<QObject *> clientList = m_clientList.keys(); foreach(QObject * obj,clientList) { QTcpSocket * pSock = qobject_cast<QTcpSocket*>(obj); if (pSock) { pSock->disconnectFromHost(); } } }