Boost.asio學習之Tcp非同步通訊Demo
現在處於剛剛學了C++基礎語法,看別人的程式碼縷邏輯縷的慢,按自己的邏輯寫程式碼構思不太清楚漏洞百出。雖然boost asio程式設計已經學了一週,邏輯也差不多清楚,不過親手寫一個Demo會發現很多平時注意不到的小細節,往往這些小細節就是以後耽誤時間的主要原因。所以強行逼自己寫了這個Demo,寫了1個小時,改bug改了好幾天,終於在週末前把程式調通了,只是簡單的服務端接收客戶端訊息,就這麼艱難,看來未來的路還很長。現在將這個程式碼記錄在這裡。
下面是這個程式碼的主要流程與自己覺得需要注意的地方:
一、服務端
1.伺服器自己建立io_service,endpoint,設定ip與port。
2.建立兩個類,一個接受連線,其中acceptor使用io_service與endpoint兩個引數初始化,類成員變數io_service需要用引用。
開始async_accept後會阻塞在這裡,其中socket引數由session類getSocket獲取,當有客戶端連線後啟用回撥函式,回撥函式判斷是否連線成功,成功則開始處理會話部分。是否成功都會繼續呼叫async_accept,迴圈接受連線。
3.另一個處理連線後的會話,繼承自基類boost::enable_shared_from_this<class>,成員變數socket由io_service初始化,並將新的客戶端存入Server的列表中,如果斷開了連線,則根據Session裡記錄的編號從List中刪除。開始async_read讀取客戶端發來的訊息,這裡如果buffer設定的長度小於等於訊息長度會讀取設定長度的訊息,否則會一直阻塞在這裡,這個解決方法有待進一步學習。關閉socket後執行析構。
4.接受分為兩部分,第一部分讀取報頭,固定長度5位元組,裡面存放即將發來的資料報長度,第二部分根據報頭提供的長度資訊讀取指定長度的報文。這裡注意,async_read只會在讀完或者出錯是才返回,如果設定buffer內的長度大於報文,將一直阻塞,我改用了async_read_some,這樣讀完或者讀取到指定的長度後即返回。
5.服務端總結:服務端分為兩個部分,一部分負責建立服務端,另一部分負責資料接受與傳送。
二、客戶端
1.建立io_service。
2.socket由io_service初始化,建立resolver與query,query請求服務端地址和ip,resolver將請求到的query解析成endpoint_iterator格式,以建立與服務端的連線。通過socket與endpoint_iterator利用async_connect與服務端建立連線,並呼叫回撥函式。
3.連線成功後,非同步傳送報頭與報文。
程式碼如下:
標頭檔案
// TODO: 在此處引用程式需要的其他標頭檔案
#include <boost\asio.hpp>
#include <boost\shared_ptr.hpp>
#include <boost\bind.hpp>
#include <boost\lexical_cast.hpp>
#include <boost\enable_shared_from_this.hpp>
#include <iostream>
#define Asio boost::asio
#define Tcp boost::asio::ip::tcp
MyServer.h
#pragma once
#include <unordered_map>
class CMySession;
typedef boost::shared_ptr<CMySession> session_ptr;
class CMyServer
{
public:
CMyServer(Asio::io_service &io_service,Tcp::endpoint &ep)
:m_io_service(io_service), m_acceptor(io_service, ep), m_nSessionCount(0)
{
StartServer();
}
~CMyServer();
/* 公有成員函式 */
public:
void StartServer();
/* 私有成員函式 */
private:
void handle_accept(session_ptr new_session,const boost::system::error_code &error);
/* 私有成員變數 */
private:
Asio::io_service &m_io_service;
Tcp::acceptor m_acceptor;
int m_nSessionCount;
std::unordered_map<int, session_ptr> m_mapSessionList;
};
class CMySession
:public boost::enable_shared_from_this<CMySession>
{
public:
/* 建構函式中,io_service需要傳引用 */
CMySession(Asio::io_service &io_service, std::unordered_map<int, session_ptr> &mapSessionList,int nCount)
:m_socket(io_service), m_mapSessionList(mapSessionList), m_nCount(nCount)
{
}
~CMySession();
public:
Tcp::socket& getSocket();
void start();
private:
void handle_read_head(const boost::system::error_code &error);
void handle_read_body(const boost::system::error_code &error);
private:
char m_cHead[5];
char m_cReciv[512];
Tcp::socket m_socket;
std::unordered_map<int, session_ptr> &m_mapSessionList;
int m_nCount;
};
MyServer.cpp
#include "stdafx.h"
#include "MyServer.h"
CMyServer::~CMyServer()
{
}
void CMyServer::StartServer()
{
/* 將表的引用傳遞給Session,儲存資訊 */
session_ptr new_session(new CMySession(m_io_service, m_mapSessionList, m_nSessionCount));
++m_nSessionCount;
m_acceptor.async_accept(new_session->getSocket(),
boost::bind(&CMyServer::handle_accept, this, new_session, Asio::placeholders::error
));
}
void CMyServer::handle_accept(session_ptr session,const boost::system::error_code &error)
{
if (!error)
{
session->start();
}
/* 此處不管是否成功,都會繼續監聽 */
StartServer();
}
CMySession::~CMySession()
{
// auto it = m_mapSessionList.find(m_nCount);
// m_mapSessionList.erase(it);
m_mapSessionList;
std::cout << m_nCount << ":~SessionComplete" << std::endl;
}
Tcp::socket& CMySession::getSocket()
{
return m_socket;
}
void CMySession::start()
{
/* 成功建立連線 加入List */
m_mapSessionList.insert(std::make_pair(m_nCount, shared_from_this()));
Asio::async_read(m_socket, Asio::buffer(m_cHead, 5), boost::bind(&CMySession::handle_read_head, shared_from_this(), Asio::placeholders::error));
}
void CMySession::handle_read_head(const boost::system::error_code &error)
{
if (!error)
{
/* 獲得資料報長度 */
int nLenth = atoi(m_cHead);
memset(m_cHead, 0, 5);
//std::cout << nLenth << std::endl;
//Asio::async_read(m_socket, Asio::buffer(m_cReciv, 20), boost::bind(&CMySession::handle_read_body, shared_from_this(), Asio::placeholders::error));
m_socket.async_read_some(Asio::buffer(m_cReciv, nLenth + 1), boost::bind(&CMySession::handle_read_body, shared_from_this(), Asio::placeholders::error));
}
else
{
auto it = m_mapSessionList.find(m_nCount);
m_mapSessionList.erase(it);
m_socket.shutdown(Tcp::socket::shutdown_both);
m_socket.close();
}
}
void CMySession::handle_read_body(const boost::system::error_code &error)
{
if (!error)
{
std::cout << m_cReciv << std::endl;
//Asio::async_read(m_socket, Asio::buffer(m_cHead, 5), boost::bind(&CMySession::handle_read_head, shared_from_this(), Asio::placeholders::error));
m_socket.async_read_some(Asio::buffer(m_cHead, 5), boost::bind(&CMySession::handle_read_head, shared_from_this(), Asio::placeholders::error));
}
else
{
auto it = m_mapSessionList.find(m_nCount);
m_mapSessionList.erase(it);
m_socket.shutdown(Tcp::socket::shutdown_both);
m_socket.close();
}
}
服務端main函式
// Server.cpp : 定義控制檯應用程式的入口點。
//
#include "stdafx.h"
#include "MyServer.h"
int _tmain(int argc, _TCHAR* argv[])
{
Asio::io_service io_service;
Tcp::endpoint ep(Tcp::v4(), 8899);
CMyServer a(io_service, ep);
io_service.run();
system("pause");
return 0;
}
標頭檔案
#include <boost\asio.hpp>
#include <boost\shared_ptr.hpp>
#include <boost\bind.hpp>
#include <boost\enable_shared_from_this.hpp>
#include <iostream>
#define ASIO boost::asio
#define TCP boost::asio::ip::tcp
Client.h
#pragma once
class CClient:
public boost::enable_shared_from_this<CClient>
{
public:
CClient(ASIO::io_service&io_service)
:m_io_service(io_service), m_socket(io_service)
{}
~CClient();
public:
void connectServer(std::string strIP,int nPort);
private:
void handle_connect(const boost::system::error_code &error);
void handle_write(const boost::system::error_code &error);
void handle_write_stop(const boost::system::error_code &error);
private:
ASIO::io_service &m_io_service;
TCP::resolver::iterator m_it_ep;
TCP::socket m_socket;
};
Client.cpp
#include "stdafx.h"
#include "Client.h"
#define testStringB "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
#pragma disable(error:4996)
CClient::~CClient()
{
}
void CClient::connectServer(std::string strIP, int nPort)
{
TCP::resolver cResolver(m_io_service);
TCP::resolver::query cQuery(strIP, std::to_string(nPort)); /* 接受兩個string型別引數 */
m_it_ep = cResolver.resolve(cQuery);
ASIO::async_connect(m_socket, m_it_ep, boost::bind(&CClient::handle_connect, shared_from_this(), ASIO::placeholders::error));
}
void CClient::handle_connect(const boost::system::error_code &error)
{
if (!error)
{
int n = strlen(testStringB);
char tmpHead[5 + 1] = "";
/* 將長度資訊由int轉換為字串,作為報頭髮送給服務端 */
sprintf_s(tmpHead, "%5d", n);
ASIO::async_write(m_socket, ASIO::buffer(tmpHead,5), boost::bind(&CClient::handle_write, shared_from_this(), ASIO::placeholders::error));
}
}
void CClient::handle_write(const boost::system::error_code &error)
{
if (!error)
{
std::cout << "報頭髮送成功!" << std::endl;
char ctmp[] = testStringB;
int nLenth = strlen(ctmp);
ASIO::async_write(m_socket, ASIO::buffer(ctmp,nLenth), boost::bind(&CClient::handle_write_stop, shared_from_this(), ASIO::placeholders::error));
}
}
void CClient::handle_write_stop(const boost::system::error_code &error)
{
return;
}
客戶端main函式
//Practice0530.cpp : 定義控制檯應用程式的入口點。
#include "stdafx.h"
#include "Client.h"
int _tmain(int argc, _TCHAR* argv[])
{
ASIO::io_service io_service;
ASIO::io_service::work work(io_service);
int i = 0;
boost::shared_ptr<CClient> a(new CClient(io_service));
a->connectServer("127.0.0.1", 8899);
io_service.run();
return 0;
}
下午的新發現
今天下午寫另外一個程式不小心點錯了 把這個程式與上一個程式進行了通訊,結果發現:
客戶端連續兩次ansyc_write,其實是一個數據流過去的,這就說明服務端那邊接收的時候,如果指定的read長度小於整個流的長度,那麼可以讀指定長度,如果大於則會阻塞住,相當於沒讀完,不會向下走。而用ansyc_read_some,則讀取到指定長度或者讀完了就不會阻塞住。