boost庫之tcp例項(非同步方式)
阿新 • • 發佈:2019-01-11
//服務端
#define TCP_RECV_DATA_PACKAGE_MAX_LENGTH 2048 #define TCP_SEND_DATA_PACKAGE_MAX_LENGTH 2048 #include <iostream> #include <boost/function.hpp> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/shared_ptr.hpp> using namespace boost::asio; std::string make_daytime_string() { using namespace std; time_t now = std::time(NULL); return ctime(&now); } //發生資料回撥函式 typedef void (CALLBACK *SendDataCallback)(const boost::system::error_code& error,std::size_t bytes_transferred,DWORD dwUserData1,DWORD dwUserData2); //接收資料回撥函式 typedef void (CALLBACK *RecvDataCallback)(const boost::system::error_code& error,char *pData,int nDataSize,DWORD dwUserData1,DWORD dwUserData2); //tcp connection class TcpConnection { public: static TcpConnection * create(io_service& ioService) { return new TcpConnection(ioService); } virtual ~TcpConnection() { m_bDisconnect = true; m_socket.close(); } ip::tcp::socket& socket() { return m_socket; } //傳送資料 int sendData(char *pData,int nDataSize,SendDataCallback fnCallback,DWORD dwUserData1,DWORD dwUserData2) { if(fnCallback == NULL) { //同步 if(!m_socket.is_open()) { return 0; } std::size_t nSendedSize = boost::asio::write(m_socket,boost::asio::buffer(pData,nDataSize)); if(nDataSize == nSendedSize) { return 0; } else { return nSendedSize; } } else { //非同步 if(!m_socket.is_open()) { return 0; } memcpy(m_sendBuf.data(),pData,nDataSize); boost::asio::async_write( m_socket, boost::asio::buffer(m_sendBuf.data(),nDataSize), boost::bind(&TcpConnection::handle_write, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, fnCallback,dwUserData1,dwUserData2)); } return 0; } //接收資料(同步) int recvDataBySync() { if(!m_socket.is_open()) { return 0; } boost::system::error_code error; std::size_t nSize = m_socket.read_some(boost::asio::buffer(m_recvBuf),error); if(error != NULL) { //錯誤 return 0; } return nSize; } //接收資料(非同步) int recvDataByAsync(RecvDataCallback fnCallback,DWORD dwUserData1,DWORD dwUserData2) { m_socket.async_read_some(boost::asio::buffer(m_recvBuf), boost::bind(&TcpConnection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, fnCallback,dwUserData1,dwUserData2)); return 0; } private: TcpConnection(io_service& ioService) : m_socket(ioService) { m_bDisconnect = false; } void handle_write(const boost::system::error_code& error,size_t bytes_transferred,SendDataCallback fnCallback,DWORD dwUserData1,DWORD dwUserData2) { if(fnCallback != NULL) { fnCallback(error,bytes_transferred,dwUserData1,dwUserData2); } if(error != NULL) { m_bDisconnect = true; if(m_socket.is_open()) { m_socket.close(); } if(!m_bDisconnect) { printf("close connect \n"); } //傳送資料失敗 return; } printf("write data!!!"); } void handle_read(const boost::system::error_code& error,size_t bytes_transferred,RecvDataCallback fnCallback,DWORD dwUserData1,DWORD dwUserData2) { if(fnCallback != NULL) { fnCallback(error,m_recvBuf.data(),bytes_transferred,dwUserData1,dwUserData2); } if(error != NULL) { if (error == boost::asio::error::eof || error == boost::asio::error::connection_reset) { //boost::asio::error::eof --對端方關閉連線(正常關閉套接字) //boost::asio::error::connection_reset --對端方關閉連線(暴力關閉套接字) //對端方關閉連線 if(m_socket.is_open()) { m_socket.close(); } printf("close connect \n"); } else { if(m_socket.is_open()) { m_socket.close(); } } return; } char szMsg[128] = {0}; memcpy(szMsg,m_recvBuf.data(),bytes_transferred); printf("%s \n",szMsg); } bool m_bDisconnect; //是否斷開連線 ip::tcp::socket m_socket; boost::array<char,TCP_RECV_DATA_PACKAGE_MAX_LENGTH> m_recvBuf; //接收資料緩衝區 boost::array<char,TCP_SEND_DATA_PACKAGE_MAX_LENGTH> m_sendBuf; //傳送資料緩衝區 }; class TcpServer { public: TcpServer(io_service& ioService) : m_acceptor(ioService, ip::tcp::endpoint(ip::tcp::v4(), 10005)) { m_funcConnectionHandler = NULL; m_nUserData = 0; m_bStop = false; } //新建連接回調函式 typedef boost::function<void (TcpConnection * new_connection,int nUserData)> CreateConnnectionCallbackHandler; //設定新建連接回調函式 void setNewConnectionCallback(CreateConnnectionCallbackHandler fnHandler,int nUserData) { m_funcConnectionHandler = fnHandler; m_nUserData = nUserData; } //開始工作 void startWork() { m_bStop = false; start_accept(); } //停止工作 void stopWork() { m_bStop = true; m_acceptor.close(); } private: void start_accept() { if(m_bStop) { return; } TcpConnection *new_connection = TcpConnection::create(m_acceptor.get_io_service()); m_acceptor.async_accept( new_connection->socket(), boost::bind(&TcpServer::handle_accept, this, new_connection, boost::asio::placeholders::error)); } void handle_accept(TcpConnection * new_connection, const boost::system::error_code& error) { if (!error) { if(m_funcConnectionHandler != NULL) { m_funcConnectionHandler(new_connection,m_nUserData); } new_connection->sendData("abcdefg",strlen("abcdefg"),NULL,0,0); start_accept(); } } ip::tcp::acceptor m_acceptor; CreateConnnectionCallbackHandler m_funcConnectionHandler; int m_nUserData; bool m_bStop; }; //所有連線 std::list<TcpConnection *> g_listConnection; //無效連線 std::list<TcpConnection *> g_listNoEffectConnection; //插入無效連線 void InsertNoEffectConnection(TcpConnection * pConnnection) { bool bFind = false; TcpConnection * pTcpConnectionTmpObject = NULL; std::list<TcpConnection *>::iterator iter,iterEnd; iter = g_listNoEffectConnection.begin(); iterEnd = g_listNoEffectConnection.end(); for(iter; iter!=iterEnd; iter++) { pTcpConnectionTmpObject = *iter; if(pTcpConnectionTmpObject == pConnnection) { bFind = true; break; } } //未找到,插入 if(!bFind) { g_listNoEffectConnection.push_back(pTcpConnectionTmpObject); } } //刪除無效連線 void DeleteNoEffectConnection() { std::list<TcpConnection *>::iterator iter; if(g_listNoEffectConnection.size() > 0) { TcpConnection * pTcpConnectionTmpObject = NULL; iter = g_listNoEffectConnection.begin(); while(iter != g_listNoEffectConnection.end()) { pTcpConnectionTmpObject = *iter; if(pTcpConnectionTmpObject != NULL) { g_listConnection.remove(pTcpConnectionTmpObject); delete pTcpConnectionTmpObject; } iter++; } g_listNoEffectConnection.clear(); } } //新連接回調處理 void NewConnectionCallbackProcess(TcpConnection * new_connection,int nUserData) { g_listConnection.push_back(new_connection); } void WINAPI RecvDataCallbackProcess(const boost::system::error_code& error,char *pData,int nDataSize,DWORD dwUserData1,DWORD dwUserData2) { if (error == boost::asio::error::eof) { //對端方關閉連線 TcpConnection * pTcpConnectionTmpObject = (TcpConnection *)dwUserData1; if(pTcpConnectionTmpObject != NULL) { InsertNoEffectConnection(pTcpConnectionTmpObject); } return; } } int main(int argc, char* argv[]) { try { io_service ioService; TcpServer server(ioService); server.setNewConnectionCallback(NewConnectionCallbackProcess,0); server.startWork(); TcpConnection * pTcpConnectionObject = NULL; std::list<TcpConnection *>::iterator iter,iterEnd; int n = 0; while(true) { //刪除無效連線 DeleteNoEffectConnection(); //遍歷 iter = g_listConnection.begin(); iterEnd = g_listConnection.end(); for(iter; iter!=iterEnd; iter++) { pTcpConnectionObject = *iter; pTcpConnectionObject->sendData("111",3,NULL,0,0); pTcpConnectionObject->recvDataByAsync(RecvDataCallbackProcess,(int)pTcpConnectionObject,0); } ioService.poll(); Sleep(200); n++; if(n > 1000) { break; } } // 只有io_service類的run()方法執行之後回撥物件才會被呼叫 //ioService.run(); //釋放空間 iter = g_listConnection.begin(); iterEnd = g_listConnection.end(); for(iter; iter!=iterEnd; iter++) { pTcpConnectionObject = *iter; if(pTcpConnectionObject != NULL) { delete pTcpConnectionObject; } } g_listConnection.clear(); server.stopWork(); } catch (std::exception& e) { std::cout << e.what() << std::endl; } return 0; }
//客戶端
#include <iostream> #include <boost/asio.hpp> using namespace boost::asio; int main(int argc, char* argv[]) { io_service ioService; boost::system::error_code error; try { //獲取ip(用直譯器的方法來解析域名) /* ip::tcp::resolver resolver(ioService); ip::tcp::resolver::query query("www.yahoo.com", "80"); ip::tcp::resolver::iterator iter = resolver.resolve( query); ip::tcp::endpoint ep = *iter; std::cout << ep.address().to_string() << std::endl; */ ip::tcp::socket socket(ioService); ip::tcp::endpoint endpoint(boost::asio::ip::address_v4::from_string("127.0.0.1"), 10005); socket.connect(endpoint, error); //是否出錯 if (error) { throw boost::system::system_error(error); } while(true) { boost::array<char,128> buf; size_t len = socket.read_some(boost::asio::buffer(buf), error); //服務端執行斷開. if(error != NULL) { if (error == boost::asio::error::eof) { break; // 對端方關閉連線(正常關閉套接字) } else if (error == boost::asio::error::connection_reset) { break;//對端方關閉連線(暴力關閉套接字) } else { throw boost::system::system_error(error); // Some other error. } } char szMsg[128] = {0}; memcpy(szMsg,buf.data(),len); printf("%s\n",szMsg); //傳送資料 socket.write_some(boost::asio::buffer(buf, len), error); } } catch (std::exception& e) { printf(e.what()); } return 0; }