boost asio 非同步實現tcp通訊
一、前言
boost asio可算是一個簡單易用,功能又強大可跨平臺的C++通訊庫,效率也表現的不錯,linux環境是epoll實現的,而windows環境是iocp實現的。而tcp通訊是專案當中經常用到通訊方式之一,實現的方法有各式各樣,因此總結一套適用於自己專案的方法是很有必要,很可能下一個專案直接套上去就可以用了。
二、實現思路
1.通訊包資料結構
Tag:檢查資料包是否合法,具體會在下面講解;
Length:描述Body的長度;
Command:表示資料包的型別,0表示心跳包(長連線需要心跳來檢測連線是否正常),1表示註冊包(客戶端連線上伺服器之後要將相關資訊註冊給伺服器),2表示業務訊息包;
business_type:業務訊息包型別,伺服器會根據業務訊息包型別將資料路由到對應的客戶端(客戶端是有業務型別分類的);
app_id:客戶端唯一識別符號;
Data:訊息資料;
2.連線物件
客戶端連線上伺服器之後,雙方都會產生一個socket連線物件,通過這個物件可以收發資料,因此我定義為socket_session。
//socket_session.h
#pragma once #include <iostream> #include <list> #include <hash_map> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <boost/shared_ptr.hpp> #include <boost/make_shared.hpp> #include <boost/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/enable_shared_from_this.hpp> #include <firebird/log/logger_log4.hpp> #include <firebird/detail/config.hpp> #include <firebird/socket_utils/message_archive.hpp> using boost::asio::ip::tcp; namespace firebird{ enum command{ heartbeat = 0, regist, normal}; const std::string tag = "KDS"; class FIREBIRD_DECL socket_session; typedef boost::shared_ptr<socket_session> socket_session_ptr; class FIREBIRD_DECL socket_session: public boost::enable_shared_from_this<socket_session>, private boost::noncopyable { public: typedef boost::function<void(socket_session_ptr)> close_callback; typedef boost::function<void( const boost::system::error_code&, socket_session_ptr, message&)> read_data_callback; socket_session(boost::asio::io_service& io_service); ~socket_session(void); DWORD id() { return m_id; } WORD get_business_type(){ return m_business_type; } void set_business_type(WORD type) { m_business_type = type; } DWORD get_app_id(){ return m_app_id; } void set_app_id(DWORD app_id) { m_app_id = app_id; } std::string& get_remote_addr() { return m_name; } void set_remote_addr(std::string& name) { m_name = name; } tcp::socket& socket() { return m_socket; } void installCloseCallBack(close_callback cb){ close_cb = cb; } void installReadDataCallBack(read_data_callback cb) { read_data_cb = cb; } void start(); void close(); void async_write(const std::string& sMsg); void async_write(message& msg); bool is_timeout(); void set_op_time(){std::time(&m_last_op_time);} private: static boost::detail::atomic_count m_last_id; DWORD m_id; WORD m_business_type; DWORD m_app_id; std::string m_name; boost::array<char, 7> sHeader; std::string sBody; tcp::socket m_socket; boost::asio::io_service& m_io_service; std::time_t m_last_op_time; close_callback close_cb; read_data_callback read_data_cb; //傳送訊息 void handle_write(const boost::system::error_code& e, std::size_t bytes_transferred, std::string* pmsg); //讀訊息頭 void handle_read_header(const boost::system::error_code& error); //讀訊息體 void handle_read_body(const boost::system::error_code& error); void handle_close(); }; }
這裡注意的是,定義了一個tag="KDS",目的是為了檢查收到的資料包是否有效,每一個數據包前3個位元組不為“KDS”,那麼就認為是非法的請求包,你也可以定義tag等於其它字串,只要按協議發包就正常,當然這是比較簡單的資料包檢查方法了。比較嚴謹的方法是雙方使用雜湊演算法來檢查的,怎麼做,這裡先不做詳解。
//socket_session.cpp
#include "socket_session.h" namespace firebird{ boost::detail::atomic_count socket_session::m_last_id(0); socket_session::socket_session(boost::asio::io_service& io_srv) :m_io_service(io_srv), m_socket(io_srv), m_business_type(0), m_app_id(0) { m_id = ++socket_session::m_last_id; } socket_session::~socket_session(void) { m_socket.close(); } void socket_session::start() { m_socket.set_option(boost::asio::ip::tcp::acceptor::linger(true, 0)); m_socket.set_option(boost::asio::socket_base::keep_alive(true)); std::time(&m_last_op_time); const boost::system::error_code error; handle_read_header(error); } void socket_session::handle_close() { try{ m_socket.close(); close_cb(shared_from_this()); } catch(std::exception& e) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]"); } catch(...) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[未知異常]"); } } void socket_session::close() { //由於回撥中有加鎖的情況,必須提交到另外一個執行緒去做,不然會出現死鎖 m_io_service.post(boost::bind(&socket_session::handle_close, shared_from_this())); } static int connection_timeout = 60; bool socket_session::is_timeout() { std::time_t now; std::time(&now); return now - m_last_op_time > connection_timeout; } //讀訊息頭 void socket_session::handle_read_header(const boost::system::error_code& error) { LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter."); try{ if(error) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[" << error.message().c_str() << "]"); close(); return; } std::string data; data.swap(sBody); boost::asio::async_read(m_socket, boost::asio::buffer(sHeader), boost::bind(&socket_session::handle_read_body, shared_from_this(), boost::asio::placeholders::error)); if (data.length() > 0 && data != "") {//讀到資料回撥註冊的READ_DATA函式 message msg; message_iarchive(msg, data); read_data_cb(error, shared_from_this(), msg); } } catch(std::exception& e) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]"); close(); } catch(...) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[未知異常]"); close(); } } //讀訊息體 void socket_session::handle_read_body(const boost::system::error_code& error) { LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter."); try{ if(error) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[" << error.message().c_str() << "]"); close(); return; } if (tag.compare(0, tag.length(), sHeader.data(), 0, tag.length())) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[這是個非法連線!]"); close(); return; } DWORD dwLength = 0; char* len = (char*)&dwLength; memcpy(len, &sHeader[tag.length()], sizeof(dwLength)); sBody.resize(dwLength); char* pBody = &sBody[0]; boost::asio::async_read(m_socket, boost::asio::buffer(pBody, dwLength), boost::bind(&socket_session::handle_read_header, shared_from_this(), boost::asio::placeholders::error)); } catch(std::exception& e) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]"); close(); } catch(...) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[未知異常]"); close(); } } void socket_session::handle_write(const boost::system::error_code& error, std::size_t bytes_transferred, std::string* pmsg) { //資料傳送成功就銷燬 if (pmsg != NULL) { delete pmsg; } if(error) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[" << error.message().c_str() << "]"); close(); return; } } void socket_session::async_write(const std::string& sMsg) { LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.") try { DWORD dwLength = sMsg.size(); char* pLen = (char*)&dwLength; //由於是非同步傳送,要保證資料傳送完整時,才把資料銷燬 std::string* msg = new std::string(); msg->append(tag); msg->append(pLen, sizeof(dwLength)); msg->append(sMsg); boost::asio::async_write(m_socket,boost::asio::buffer(*msg, msg->size()), boost::bind(&socket_session::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred, msg)); } catch(std::exception& e) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[" << e.what() << "]"); close(); } catch(...) { LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << get_remote_addr() << "],socket異常:[未知異常]"); close(); } } void socket_session::async_write(message& msg) { std::string data; message_oarchive(data, msg); async_write(data); } }
接受資料時,socket_session會先讀取7個位元組的head,比較前3個位元組“KDS”,然後取得4個位元組的Length,再讀出Length長度的資料,最後將該資料傳給read_data_cb回撥函式處理,read_data_cb回撥函式是在外部註冊的。
3.連線管理器
對於伺服器來說,它同時服務多個客戶端,為了有效的管理,因此需要一個連線管理器,我定義為session_manager。session_manager主要是對socket_session的增刪改查,和有效性檢查。
//session_manager.h
#pragma once
#include "socket_session.h"
#include "filter_container.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/typeof/typeof.hpp>
#include <boost/random.hpp>
#include <boost/pool/detail/singleton.hpp>
namespace firebird{
template<typename T>
class var_gen_wraper
{
public:
var_gen_wraper(): gen(boost::mt19937((boost::int32_t)std::time(0)),
boost::uniform_smallint<>(1, 100)) {}
typename T::result_type operator() () { return gen(); }
private:
T gen;
};
struct session_stu
{
DWORD id;
WORD business_type;
std::string address;
DWORD app_id;
socket_session_ptr session;
};
struct sid{};
struct sbusiness_type{};
struct saddress{};
struct sapp_id{};
enum session_idx_member{ session_id = 0, session_business_type, session_address, app_id};
#define CLIENT 0
#define SERVER 1
typedef boost::multi_index::multi_index_container<
session_stu,
boost::multi_index::indexed_by<
boost::multi_index::ordered_unique<
boost::multi_index::tag<sid>, BOOST_MULTI_INDEX_MEMBER(session_stu, DWORD, id)>,
boost::multi_index::ordered_non_unique<
boost::multi_index::tag<sbusiness_type>, BOOST_MULTI_INDEX_MEMBER(session_stu, WORD, business_type)>,
boost::multi_index::ordered_non_unique<
boost::multi_index::tag<saddress>, BOOST_MULTI_INDEX_MEMBER(session_stu, std::string, address)>,
boost::multi_index::ordered_non_unique<
boost::multi_index::tag<sapp_id>, BOOST_MULTI_INDEX_MEMBER(session_stu, DWORD, app_id)>
>
> session_set;
#define MULTI_MEMBER_CON(Tag) boost::multi_index::index<session_set,Tag>::type&
#define MULTI_MEMBER_ITR(Tag) boost::multi_index::index<session_set,Tag>::type::iterator
struct is_business_type {
is_business_type(WORD type)
:m_type(type)
{
}
bool operator()(const session_stu& s)
{
return (s.business_type == m_type);
}
WORD m_type;
};
class session_manager
{
public:
typedef boost::shared_lock<boost::shared_mutex> readLock;
typedef boost:: unique_lock<boost::shared_mutex> writeLock;
session_manager(boost::asio::io_service& io_srv, int type, int expires_time);
~session_manager();
void add_session(socket_session_ptr p);
void update_session(socket_session_ptr p);
template<typename Tag, typename Member>
void del_session(Member m)
{
writeLock lock(m_mutex);
if (m_sessions.empty())
{
return ;
}
MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);
//BOOST_AUTO(idx, boost::multi_index::get<Tag>(m_sessions));
BOOST_AUTO(iter, idx.find(m));
if (iter != idx.end())
{
idx.erase(iter);
}
}
//獲取容器中的第一個session
template<typename Tag, typename Member>
socket_session_ptr get_session(Member m)
{
readLock lock(m_mutex);
if (m_sessions.empty())
{
return socket_session_ptr();
}
MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);
BOOST_AUTO(iter, idx.find(m));
return iter != boost::end(idx) ? iter->session : socket_session_ptr();
}
//隨機獲取容器中的session
template<typename Tag>
socket_session_ptr get_session_by_business_type(WORD m)
{
typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;
readLock lock(m_mutex);
if (m_sessions.empty())
{
return socket_session_ptr();
}
MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);
//對容器的元素條件過濾
is_business_type predicate(m);
FilterContainer fc(predicate, idx.begin(), idx.end());
FilterContainer::FilterIter iter = fc.begin();
if (fc.begin() == fc.end())
{
return socket_session_ptr();
}
//typedef boost::variate_generator<boost::mt19937, boost::uniform_smallint<>> var_gen;
//typedef boost::details::pool::singleton_default<var_gen_wraper<var_gen>> s_var_gen;
////根據隨機數產生session
//s_var_gen::object_type &gen = s_var_gen::instance();
//int step = gen() % fc.szie();
int step = m_next_session % fc.szie();
++m_next_session;
for (int i = 0; i < step; ++i)
{
iter++;
}
return iter != fc.end() ? iter->session : socket_session_ptr();
}
//根據型別和地址取session
template<typename Tag>
socket_session_ptr get_session_by_type_ip(WORD m, std::string& ip)
{
typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;
readLock lock(m_mutex);
if (m_sessions.empty())
{
return socket_session_ptr();
}
MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);
//對容器的元素條件過濾
is_business_type predicate(m);
FilterContainer fc(predicate, idx.begin(), idx.end());
FilterContainer::FilterIter iter = fc.begin();
if (fc.begin() == fc.end())
{
return socket_session_ptr();
}
while (iter != fc.end())
{
if (iter->session->get_remote_addr().find(ip) != std::string::npos)
{
break;
}
iter++;
}
return iter != fc.end() ? iter->session : socket_session_ptr();
}
//根據型別和app_id取session
template<typename Tag>
socket_session_ptr get_session_by_type_appid(WORD m, DWORD app_id)
{
typedef filter_container<is_business_type, MULTI_MEMBER_ITR(Tag)> FilterContainer;
readLock lock(m_mutex);
if (m_sessions.empty())
{
return socket_session_ptr();
}
MULTI_MEMBER_CON(Tag) idx = boost::multi_index::get<Tag>(m_sessions);
//對容器的元素條件過濾
is_business_type predicate(m);
FilterContainer fc(predicate, idx.begin(), idx.end());
FilterContainer::FilterIter iter = fc.begin();
if (fc.begin() == fc.end())
{
return socket_session_ptr();
}
while (iter != fc.end())
{
if (iter->session->get_app_id() == app_id)
{
break;
}
iter++;
}
return iter != fc.end() ? iter->session : socket_session_ptr();
}
private:
int m_type;
int m_expires_time;
boost::asio::io_service& m_io_srv;
boost::asio::deadline_timer m_check_tick;
boost::shared_mutex m_mutex;
unsigned short m_next_session;
session_set m_sessions;
void check_connection();
};
}
這裡主要用到了boost的multi_index容器,這是一個非常有用方便的容器,可實現容器的多列索引,具體的使用方法,在這裡不多做詳解。
//session_manager.cpp
#include "session_manager.h"
namespace firebird{
session_manager::session_manager(boost::asio::io_service& io_srv, int type, int expires_time)
:m_io_srv(io_srv), m_check_tick(io_srv), m_type(type), m_expires_time(expires_time),m_next_session(0)
{
check_connection();
}
session_manager::~session_manager()
{
}
//檢查伺服器所有session的連線狀態
void session_manager::check_connection()
{
try{
writeLock lock(m_mutex);
session_set::iterator iter = m_sessions.begin();
while (iter != m_sessions.end())
{
LOG4CXX_DEBUG(firebird_log, "迴圈");
if (CLIENT == m_type)//客戶端的方式
{
if (!iter->session->socket().is_open())//已斷開,刪除已斷開的連線
{
LOG4CXX_INFO(firebird_log, "重新連線[" << iter->address << "]");
iter->session->close(); //通過關閉觸發客戶端重連
}
else{//連線中,傳送心跳
message msg;
msg.command = heartbeat;
msg.business_type = iter->session->get_business_type();
msg.app_id = iter->session->get_app_id();
msg.data() = "H";
iter->session->async_write(msg);
iter->session->set_op_time();
}
}
else if (SERVER == m_type)//伺服器的方式
{
if (!iter->session->socket().is_open())//已斷開,刪除已斷開的連線
{
LOG4CXX_INFO(firebird_log, KDS_CODE_INFO << "刪除已關閉的session:[" << iter->session->get_remote_addr() << "]");
iter = m_sessions.erase(iter);
continue;
}
else{//連線中,設定每30秒檢查一次
if (iter->session->is_timeout()) //如果session已長時間沒操作,則關閉
{
LOG4CXX_INFO(firebird_log, KDS_CODE_INFO << "刪除已超時的session:[" << iter->session->get_remote_addr() << "]");
iter->session->close();//通過關閉觸發刪除session
}
}
iter->session->set_op_time();
}
else{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "unknown manager_type");
}
++iter;
}
LOG4CXX_DEBUG(firebird_log, "定時檢查");
m_check_tick.expires_from_now(boost::posix_time::seconds(m_expires_time));
m_check_tick.async_wait(boost::bind(&session_manager::check_connection, this));
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "unknown exception.");
}
}
void session_manager::add_session(socket_session_ptr p)
{
writeLock lock(m_mutex);
session_stu stuSession;
stuSession.id = p->id();
stuSession.business_type = 0;
stuSession.address = p->get_remote_addr();
stuSession.app_id = p->get_app_id();
stuSession.session = p;
m_sessions.insert(stuSession);
}
void session_manager::update_session(socket_session_ptr p)
{
writeLock lock(m_mutex);
if (m_sessions.empty())
{
return ;
}
MULTI_MEMBER_CON(sid) idx = boost::multi_index::get<sid>(m_sessions);
BOOST_AUTO(iter, idx.find(p->id()));
if (iter != idx.end())
{
const_cast<session_stu&>(*iter).business_type = p->get_business_type();
const_cast<session_stu&>(*iter).app_id = p->get_app_id();
}
}
}
這個時候,我就可以使用id、business_type、address、app_id當做key來索引socket_session了,單使用map容器是做不到的。
還有索引時,需要的一個條件過濾器
//filter_container.h
#pragma once
#include <boost/iterator/filter_iterator.hpp>
namespace firebird{
template <class Predicate, class Iterator>
class filter_container
{
public:
typedef boost::filter_iterator<Predicate, Iterator> FilterIter;
filter_container(Predicate p, Iterator begin, Iterator end)
:m_begin(p, begin, end),
m_end(p, end, end)
{
}
~filter_container() {}
FilterIter begin() { return m_begin; }
FilterIter end() { return m_end; }
int szie() {
int i = 0;
FilterIter fi = m_begin;
while(fi != m_end)
{
++i;
++fi;
}
return i;
}
private:
FilterIter m_begin;
FilterIter m_end;
};
}
4.伺服器端的實現
伺服器我定義為server_socket_utils,擁有一個session_manager,每當accept成功得到一個socket_session時,都會將其增加到session_manager去管理,註冊相關回調函式。
read_data_callback 接收到資料的回撥函式
收到資料之後,也就是資料包的body部分,反序列化出command、business_type、app_id和data(我使用到了thrift),如果command==normal正常的業務包,會呼叫handle_read_data傳入data。
close_callback關閉socket_session觸發的回撥函式
根據id將該連線從session_manager中刪除掉
//server_socket_utils.h
#pragma once
#include "socket_session.h"
#include "session_manager.h"
#include <boost/format.hpp>
#include <firebird/message/message.hpp>
namespace firebird{
using boost::asio::ip::tcp;
class FIREBIRD_DECL server_socket_utils
{
private:
boost::asio::io_service m_io_srv;
boost::asio::io_service::work m_work;
tcp::acceptor m_acceptor;
void handle_accept(socket_session_ptr session, const boost::system::error_code& error);
void close_callback(socket_session_ptr session);
void read_data_callback(const boost::system::error_code& e,
socket_session_ptr session, message& msg);
protected:
virtual void handle_read_data(message& msg, socket_session_ptr pSession) = 0;
public:
server_socket_utils(int port);
~server_socket_utils(void);
void start();
boost::asio::io_service& get_io_service() { return m_io_srv; }
session_manager m_manager;
};
}
//server_socket_utils.cpp
#include "server_socket_utils.h"
namespace firebird{
server_socket_utils::server_socket_utils(int port)
:m_work(m_io_srv),
m_acceptor(m_io_srv, tcp::endpoint(tcp::v4(), port)),
m_manager(m_io_srv, SERVER, 3)
{
//m_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
//// 關閉連線前留0秒給客戶接收資料
//m_acceptor.set_option(boost::asio::ip::tcp::acceptor::linger(true, 0));
//m_acceptor.set_option(boost::asio::ip::tcp::no_delay(true));
//m_acceptor.set_option(boost::asio::socket_base::keep_alive(true));
//m_acceptor.set_option(boost::asio::socket_base::receive_buffer_size(16384));
}
server_socket_utils::~server_socket_utils(void)
{
}
void server_socket_utils::start()
{
try{
socket_session_ptr new_session(new socket_session(m_io_srv));
m_acceptor.async_accept(new_session->socket(),
boost::bind(&server_socket_utils::handle_accept, this, new_session,
boost::asio::placeholders::error));
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");
}
}
void server_socket_utils::handle_accept(socket_session_ptr session, const boost::system::error_code& error)
{
if (!error)
{
try{
socket_session_ptr new_session(new socket_session(m_io_srv));
m_acceptor.async_accept(new_session->socket(),
boost::bind(&server_socket_utils::handle_accept, this, new_session,
boost::asio::placeholders::error));
if (session != NULL)
{
//註冊關閉回撥函式
session->installCloseCallBack(boost::bind(&server_socket_utils::close_callback, this, _1));
//註冊讀到資料回撥函式
session->installReadDataCallBack(boost::bind(&server_socket_utils::read_data_callback, this, _1, _2, _3));
boost::format fmt("%1%:%2%");
fmt % session->socket().remote_endpoint().address().to_string();
fmt % session->socket().remote_endpoint().port();
session->set_remote_addr(fmt.str());
session->start();
m_manager.add_session(session);
}
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");
}
}
}
void server_socket_utils::close_callback(socket_session_ptr session)
{
LOG4CXX_DEBUG(firebird_log, "close_callback");
try{
m_manager.del_session<sid>(session->id());
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");
}
}
void server_socket_utils::read_data_callback(const boost::system::error_code& e,
socket_session_ptr session, message& msg)
{
try{
LOG4CXX_DEBUG(firebird_log, "command =[" << msg.command << "],["
<< msg.business_type << "],[" << msg.data() << "]");
if (msg.command == heartbeat)
{//心跳
session->async_write(msg);
}
else if (msg.command == regist)
{//註冊
session->set_business_type(msg.business_type);
session->set_app_id(msg.app_id);
m_manager.update_session(session);
session->async_write(msg);
LOG4CXX_FATAL(firebird_log, "遠端地址:[" << session->get_remote_addr() << "],伺服器型別:[" <<
session->get_business_type() << "],伺服器ID:[" << session->get_app_id() << "]註冊成功!");
}
else if (msg.command == normal)
{//業務資料
handle_read_data(msg, session);
}
else
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "收到非法訊息包!");
}
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "socket異常:[未知異常]");
}
}
}
5.客戶端
客戶端與伺服器的邏輯也差不多,區別就是在於客戶端通過connect得到socket_session,而伺服器是通過accept得到socket_session。
//client_socket_utils.h
#pragma once
#include "socket_session.h"
#include "session_manager.h"
#include <boost/algorithm/string.hpp>
#include <firebird/message/message.hpp>
namespace firebird{
class FIREBIRD_DECL client_socket_utils
{
public:
client_socket_utils();
~client_socket_utils();
void session_connect(std::vector<socket_session_ptr>& vSession);
void session_connect(socket_session_ptr pSession);
//socket_session_ptr get_session(std::string& addr);
boost::asio::io_service& get_io_service() { return m_io_srv; }
protected:
virtual void handle_read_data(message& msg, socket_session_ptr pSession) = 0;
private:
boost::asio::io_service m_io_srv;
boost::asio::io_service::work m_work;
session_manager m_manager;
void handle_connect(const boost::system::error_code& error,
tcp::resolver::iterator endpoint_iterator, socket_session_ptr pSession);
void close_callback(socket_session_ptr session);
void read_data_callback(const boost::system::error_code& e,
socket_session_ptr session, message& msg);
};
}
//client_socket_utils.cpp
#include "client_socket_utils.h"
namespace firebird{
client_socket_utils::client_socket_utils()
:m_work(m_io_srv), m_manager(m_io_srv, CLIENT, 3)
{
}
client_socket_utils::~client_socket_utils()
{
}
void client_socket_utils::session_connect(std::vector<socket_session_ptr>& vSession)
{
for (int i = 0; i < vSession.size(); ++i)
{
session_connect(vSession[i]);
}
}
void client_socket_utils::session_connect(socket_session_ptr pSession)
{
std::string& addr = pSession->get_remote_addr();
try{
//註冊關閉回撥函式
pSession->installCloseCallBack(boost::bind(&client_socket_utils::close_callback, this, _1));
//註冊讀到資料回撥函式
pSession->installReadDataCallBack(boost::bind(&client_socket_utils::read_data_callback, this, _1, _2, _3));
std::vector<std::string> ip_port;
boost::split(ip_port, addr, boost::is_any_of(":"));
if (ip_port.size() < 2)
{
//throw std::runtime_error("ip 格式不正確!");
LOG4CXX_ERROR(firebird_log, "[" << addr << "] ip 格式不正確!");
return;
}
tcp::resolver resolver(pSession->socket().get_io_service());
tcp::resolver::query query(ip_port[0], ip_port[1]);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
//pSession->set_begin_endpoint(endpoint_iterator);//設定起始地址,以便重連
//由於客戶端是不斷重連的,即使還未連線也要儲存該session
m_manager.add_session(pSession);
tcp::endpoint endpoint = *endpoint_iterator;
pSession->socket().async_connect(endpoint,
boost::bind(&client_socket_utils::handle_connect, this,
boost::asio::placeholders::error, ++endpoint_iterator, pSession));
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << addr << "],socket異常:[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << addr << "],socket異常:[未知異常]");
}
}
void client_socket_utils::handle_connect(const boost::system::error_code& error,
tcp::resolver::iterator endpoint_iterator, socket_session_ptr pSession)
{
LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << " enter.");
std::string sLog;
try{
if (!error)
{
LOG4CXX_FATAL(firebird_log, "伺服器:[" << pSession->get_business_type() <<"],連線遠端地址:[" << pSession->get_remote_addr().c_str() << "]成功!");
pSession->start();
//向伺服器註冊服務型別
message msg;
msg.command = regist;
msg.business_type = pSession->get_business_type();
msg.app_id = pSession->get_app_id();
msg.data() = "R";
pSession->async_write(msg);
}
else if (endpoint_iterator != tcp::resolver::iterator())
{
LOG4CXX_ERROR(firebird_log, "連線遠端地址:[" << pSession->get_remote_addr().c_str() << "]失敗,試圖重連下一個地址。");
pSession->socket().close();//此處用socket的close,不應用session的close觸發連線,不然會導致一直重連
tcp::endpoint endpoint = *endpoint_iterator;
pSession->socket().async_connect(endpoint,
boost::bind(&client_socket_utils::handle_connect, this,
boost::asio::placeholders::error, ++endpoint_iterator, pSession));
}
else
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << pSession->get_remote_addr().c_str() << "]失敗!");
pSession->socket().close();//此處用socket的close,不應用session的close觸發連線,不然會導致一直重連
}
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << pSession->get_remote_addr().c_str() <<"],socket異常:[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << pSession->get_remote_addr().c_str() <<"],socket異常:[未知異常]");
}
}
void client_socket_utils::read_data_callback(const boost::system::error_code& e,
socket_session_ptr session, message& msg)
{
LOG4CXX_DEBUG(firebird_log, "command =[" << msg.command << "],["
<< msg.business_type << "],[" << msg.data() << "]");
if (msg.command == heartbeat)
{//心跳
}
else if (msg.command == regist)
{//註冊
LOG4CXX_FATAL(firebird_log, "伺服器:[" << session->get_business_type() <<"]註冊成功。");
}
else if (msg.command == normal)
{//業務資料
handle_read_data(msg, session);
}
else
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "收到非法訊息包!");
}
}
//關閉session就會重連
void client_socket_utils::close_callback(socket_session_ptr session)
{
LOG4CXX_DEBUG(firebird_log, KDS_CODE_INFO << "enter.");
try{
//tcp::resolver::iterator endpoint_iterator = context.session->get_begin_endpoint();
std::string& addr = session->get_remote_addr();
std::vector<std::string> ip_port;
boost::split(ip_port, addr, boost::is_any_of(":"));
if (ip_port.size() < 2)
{
LOG4CXX_ERROR(firebird_log, "[" << addr << "] ip 格式不正確!");
return;
}
tcp::resolver resolver(session->socket().get_io_service());
tcp::resolver::query query(ip_port[0], ip_port[1]);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
tcp::endpoint endpoint = *endpoint_iterator;
session->socket().async_connect(endpoint,
boost::bind(&client_socket_utils::handle_connect, this,
boost::asio::placeholders::error, ++endpoint_iterator, session));
}
catch(std::exception& e)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << session->get_remote_addr().c_str() <<"],socket異常:[" << e.what() << "]");
}
catch(...)
{
LOG4CXX_ERROR(firebird_log, KDS_CODE_INFO << "連線遠端地址:[" << session->get_remote_addr().c_str() <<"],socket異常:[未知異常]");
}
}
}
5.物件序列化
socket_session傳送和接收資料包的時候使用到了物件序列化,我這裡是通過thrift實現的,其實boost的serialization庫也提供了這樣的功能,使用起來更為方便,但我在測試過程中,thrift相比之下效能會高很多,因此就堅持使用thrift了,感興趣的話可以看我之前寫的《》和《輕量級序列化庫boost serialization》。
5.1字串與thrift物件的相互轉換
#pragma once
#include <boost/shared_ptr.hpp>
#include <transport/TBufferTransports.h>
#include <protocol/TProtocol.h>
#include <protocol/TBinaryProtocol.h>
namespace firebird{
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
template<typename T>
void thrift_iserialize(T& stu, std::string& s)
{
boost::shared_ptr<TMemoryBuffer> trans(new TMemoryBuffer((uint8_t*)&s[0], s.size()));
boost::shared_ptr<TProtocol> proto(new TBinaryProtocol(trans));
stu.read(proto.get());
}
template<typename T>
void thrift_oserialize(T& stu, std::string& s)
{
boost::shared_ptr<TMemoryBuffer> trans(new TMemoryBuffer());
boost::shared_ptr<TProtocol> proto(new TBinaryProtocol(trans));
stu.write(proto.get());
s = trans->getBufferAsString();
}
}
5.2通過thrift物件,普通的物件與字串的相互轉換
#pragma once
#include "message_archive.hpp"
#include <firebird/archive/thrift_archive.hpp>
#include <firebird/message/TMessage_types.h>
namespace firebird
{
/*** message to ThriftMessage ***/
void msg_to_tmsg(TMessage& tmsg, message& msg)
{
//設定
tmsg.command = msg.command;
tmsg.business_type = msg.business_type;
tmsg.app_id = msg.app_id;
//設定context
tmsg.context.cmdVersion = msg.context().cmdVersion;
tmsg.context.cpid.swap(msg.context().cpid);
tmsg.context.remote_ip.swap(msg.context().remote_ip);
tmsg.context.wSerialNumber = msg.context().wSerialNumber;
tmsg.context.session_id = msg.context().session_id;
//設定source
for (int i = 0; i < msg.source().size(); ++i)
{
tmsg.source.push_back(msg.source()[i]);
}
//設定destination
for (int i = 0; i < msg.destination().size(); ++i)
{
tmsg.destination.push_back(msg.destination()[i]);
}
//設定data
tmsg.data = msg.data();
}
/*** ThriftMessage to message ***/
void tmsg_to_msg(message& msg, TMessage& tmsg)
{
//設定
msg.command = tmsg.command;
msg.business_type = tmsg.business_type;
msg.app_id = tmsg.app_id;
//設定context
msg.context().cmdVersion = tmsg.context.cmdVersion;
msg.context().cpid = tmsg.context.cpid;
msg.context().remote_ip = tmsg.context.remote_ip;
msg.context().wSerialNumber = tmsg.context.wSerialNumber;
msg.context().session_id = tmsg.context.session_id;
//設定source
for (int i = 0; i < tmsg.source.size(); ++i)
{
msg.source() << tmsg.source[i];
}
//設定destination
for (int i = 0; i < tmsg.destination.size(); ++i)
{
msg.destination() << tmsg.destination[i];
}
//設定data
msg.data() = tmsg.data;
}
void message_iarchive(message& msg, std::string& s)
{
TMessage tmsg;
thrift_iserialize(tmsg, s);
tmsg_to_msg(msg, tmsg);
}
void message_oarchive(std::string& s, message& msg)
{
TMessage tmsg;
msg_to_tmsg(tmsg, msg);
thrift_oserialize(tmsg, s);
}
}