1. 程式人生 > >Boost.asio學習之Tcp非同步通訊Demo

Boost.asio學習之Tcp非同步通訊Demo

    現在處於剛剛學了C++基礎語法,看別人的程式碼縷邏輯縷的慢,按自己的邏輯寫程式碼構思不太清楚漏洞百出。雖然boost asio程式設計已經學了一週,邏輯也差不多清楚,不過親手寫一個Demo會發現很多平時注意不到的小細節,往往這些小細節就是以後耽誤時間的主要原因。所以強行逼自己寫了這個Demo,寫了1個小時,改bug改了好幾天,終於在週末前把程式調通了,只是簡單的服務端接收客戶端訊息,就這麼艱難,看來未來的路還很長。現在將這個程式碼記錄在這裡。

下面是這個程式碼的主要流程與自己覺得需要注意的地方:

一、服務端

1.伺服器自己建立io_service,endpoint,設定ip與port。

2.建立兩個類,一個接受連線,其中acceptor使用io_service與endpoint兩個引數初始化,類成員變數io_service需要用引用。

      開始async_accept後會阻塞在這裡,其中socket引數由session類getSocket獲取,當有客戶端連線後啟用回撥函式,回撥函式判斷是否連線成功,成功則開始處理會話部分。是否成功都會繼續呼叫async_accept,迴圈接受連線。

3.另一個處理連線後的會話,繼承自基類boost::enable_shared_from_this<class>,成員變數socket由io_service初始化,並將新的客戶端存入Server的列表中,如果斷開了連線,則根據Session裡記錄的編號從List中刪除。開始async_read讀取客戶端發來的訊息,這裡如果buffer設定的長度小於等於訊息長度會讀取設定長度的訊息,否則會一直阻塞在這裡,這個解決方法有待進一步學習。關閉socket後執行析構。

4.接受分為兩部分,第一部分讀取報頭,固定長度5位元組,裡面存放即將發來的資料報長度,第二部分根據報頭提供的長度資訊讀取指定長度的報文。這裡注意,async_read只會在讀完或者出錯是才返回,如果設定buffer內的長度大於報文,將一直阻塞,我改用了async_read_some,這樣讀完或者讀取到指定的長度後即返回。

5.服務端總結:服務端分為兩個部分,一部分負責建立服務端,另一部分負責資料接受與傳送。

二、客戶端

1.建立io_service。

2.socket由io_service初始化,建立resolver與query,query請求服務端地址和ip,resolver將請求到的query解析成endpoint_iterator格式,以建立與服務端的連線。通過socket與endpoint_iterator利用async_connect與服務端建立連線,並呼叫回撥函式。

3.連線成功後,非同步傳送報頭與報文。

程式碼如下:

標頭檔案

// TODO:  在此處引用程式需要的其他標頭檔案
#include <boost\asio.hpp>
#include <boost\shared_ptr.hpp>
#include <boost\bind.hpp>
#include <boost\lexical_cast.hpp>
#include <boost\enable_shared_from_this.hpp>
#include <iostream>

#define	Asio	boost::asio
#define Tcp	boost::asio::ip::tcp

MyServer.h

#pragma once
#include <unordered_map>

class CMySession;

typedef boost::shared_ptr<CMySession> session_ptr;

class CMyServer
{
public:
	CMyServer(Asio::io_service &io_service,Tcp::endpoint &ep)
		:m_io_service(io_service), m_acceptor(io_service, ep), m_nSessionCount(0)
	{
		StartServer();
	}
	~CMyServer();
/* 公有成員函式 */
public:
	void StartServer();
	
/* 私有成員函式 */
private:
	void handle_accept(session_ptr new_session,const boost::system::error_code &error);
/* 私有成員變數 */
private:
	Asio::io_service &m_io_service;
	Tcp::acceptor m_acceptor;

	int m_nSessionCount;
	std::unordered_map<int, session_ptr> m_mapSessionList;

};

class CMySession
	:public boost::enable_shared_from_this<CMySession>
{
public:
	/* 建構函式中,io_service需要傳引用 */
	CMySession(Asio::io_service &io_service, std::unordered_map<int, session_ptr> &mapSessionList,int nCount)
		:m_socket(io_service), m_mapSessionList(mapSessionList), m_nCount(nCount)
	{

	}
	~CMySession();
public:
	Tcp::socket& getSocket();
	void start();
private:
	void handle_read_head(const boost::system::error_code &error);
	void handle_read_body(const boost::system::error_code &error);
private:
	char m_cHead[5];
	char m_cReciv[512];
	Tcp::socket m_socket;
	std::unordered_map<int, session_ptr> &m_mapSessionList;
	int m_nCount;
};

MyServer.cpp

#include "stdafx.h"
#include "MyServer.h"

CMyServer::~CMyServer()
{
}

void CMyServer::StartServer()
{
	/* 將表的引用傳遞給Session,儲存資訊 */
	session_ptr new_session(new CMySession(m_io_service, m_mapSessionList, m_nSessionCount));
	++m_nSessionCount;
	m_acceptor.async_accept(new_session->getSocket(),
		boost::bind(&CMyServer::handle_accept, this, new_session, Asio::placeholders::error
		));
}

void CMyServer::handle_accept(session_ptr session,const boost::system::error_code &error)
{
	if (!error)
	{
		session->start();
	}
	/* 此處不管是否成功,都會繼續監聽 */
	StartServer();
}



CMySession::~CMySession()
{
// 	auto it = m_mapSessionList.find(m_nCount);
// 	m_mapSessionList.erase(it);
	m_mapSessionList;
	std::cout << m_nCount << ":~SessionComplete" << std::endl;
}

Tcp::socket& CMySession::getSocket()
{
	return m_socket;
}

void CMySession::start()
{
	/* 成功建立連線 加入List */
	m_mapSessionList.insert(std::make_pair(m_nCount, shared_from_this()));
	Asio::async_read(m_socket, Asio::buffer(m_cHead, 5), boost::bind(&CMySession::handle_read_head, shared_from_this(), Asio::placeholders::error));
}



void CMySession::handle_read_head(const boost::system::error_code &error)
{
	if (!error)
	{
		/* 獲得資料報長度 */
		int nLenth = atoi(m_cHead);
		memset(m_cHead, 0, 5);
		//std::cout << nLenth << std::endl;
		//Asio::async_read(m_socket, Asio::buffer(m_cReciv, 20), boost::bind(&CMySession::handle_read_body, shared_from_this(), Asio::placeholders::error));
		m_socket.async_read_some(Asio::buffer(m_cReciv, nLenth + 1), boost::bind(&CMySession::handle_read_body, shared_from_this(), Asio::placeholders::error));
	}
	else
	{
		auto it = m_mapSessionList.find(m_nCount);
		m_mapSessionList.erase(it);
		m_socket.shutdown(Tcp::socket::shutdown_both);
		m_socket.close();
	}
		
}
void CMySession::handle_read_body(const boost::system::error_code &error)
{
	if (!error)
	{
		std::cout << m_cReciv << std::endl;
		//Asio::async_read(m_socket, Asio::buffer(m_cHead, 5), boost::bind(&CMySession::handle_read_head, shared_from_this(), Asio::placeholders::error));
		m_socket.async_read_some(Asio::buffer(m_cHead, 5), boost::bind(&CMySession::handle_read_head, shared_from_this(), Asio::placeholders::error));
	}
	else
	{
		auto it = m_mapSessionList.find(m_nCount);
		m_mapSessionList.erase(it);
		m_socket.shutdown(Tcp::socket::shutdown_both);
		m_socket.close();
	}
}

服務端main函式

// Server.cpp : 定義控制檯應用程式的入口點。
//

#include "stdafx.h"
#include "MyServer.h"
int _tmain(int argc, _TCHAR* argv[])
{
	Asio::io_service io_service;
	Tcp::endpoint ep(Tcp::v4(), 8899);

	CMyServer a(io_service, ep);

	io_service.run();
	system("pause");
	return 0;
}

標頭檔案

#include <boost\asio.hpp>
#include <boost\shared_ptr.hpp>
#include <boost\bind.hpp>
#include <boost\enable_shared_from_this.hpp>
#include <iostream>

#define ASIO	boost::asio
#define TCP		boost::asio::ip::tcp

Client.h

#pragma once

class CClient:
	public boost::enable_shared_from_this<CClient>
{
public:
	CClient(ASIO::io_service&io_service)
		:m_io_service(io_service), m_socket(io_service)
	{}
	~CClient();
public:
	void connectServer(std::string strIP,int nPort);

private:
	void handle_connect(const boost::system::error_code &error);
	void handle_write(const boost::system::error_code &error);
	void handle_write_stop(const boost::system::error_code &error);

private:
	ASIO::io_service &m_io_service;
	TCP::resolver::iterator m_it_ep;
	TCP::socket	m_socket;
};

Client.cpp

#include "stdafx.h"
#include "Client.h"
#define testStringB	"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
#pragma disable(error:4996)
CClient::~CClient()
{
}

void CClient::connectServer(std::string strIP, int nPort)
{
	

	TCP::resolver cResolver(m_io_service);
	TCP::resolver::query cQuery(strIP, std::to_string(nPort));	/* 接受兩個string型別引數 */
	m_it_ep = cResolver.resolve(cQuery);

	ASIO::async_connect(m_socket, m_it_ep, boost::bind(&CClient::handle_connect, shared_from_this(), ASIO::placeholders::error));
}

void CClient::handle_connect(const boost::system::error_code &error)
{
	if (!error)
	{
		int n = strlen(testStringB);
		char tmpHead[5 + 1] = "";
		/* 將長度資訊由int轉換為字串,作為報頭髮送給服務端 */
		sprintf_s(tmpHead, "%5d", n);

		ASIO::async_write(m_socket, ASIO::buffer(tmpHead,5), boost::bind(&CClient::handle_write, shared_from_this(), ASIO::placeholders::error));
	}
	
}

void CClient::handle_write(const boost::system::error_code &error)
{
	if (!error)
	{		
		std::cout << "報頭髮送成功!" << std::endl;
		char ctmp[] = testStringB;
		int nLenth = strlen(ctmp);

		ASIO::async_write(m_socket, ASIO::buffer(ctmp,nLenth), boost::bind(&CClient::handle_write_stop, shared_from_this(), ASIO::placeholders::error));
	}

}

void CClient::handle_write_stop(const boost::system::error_code &error)
{
	return;
}

客戶端main函式

//Practice0530.cpp : 定義控制檯應用程式的入口點。


#include "stdafx.h"
#include "Client.h"

int _tmain(int argc, _TCHAR* argv[])
{
	ASIO::io_service io_service;
	ASIO::io_service::work work(io_service);
 	int i = 0;
	boost::shared_ptr<CClient> a(new CClient(io_service));
	a->connectServer("127.0.0.1", 8899);

	io_service.run(); 
	return 0;
}

下午的新發現

今天下午寫另外一個程式不小心點錯了 把這個程式與上一個程式進行了通訊,結果發現:

客戶端連續兩次ansyc_write,其實是一個數據流過去的,這就說明服務端那邊接收的時候,如果指定的read長度小於整個流的長度,那麼可以讀指定長度,如果大於則會阻塞住,相當於沒讀完,不會向下走。而用ansyc_read_some,則讀取到指定長度或者讀完了就不會阻塞住。