十五ACE編寫簡單的通訊程式
該程式將演示如何將一個簡單結構序列化後傳送到網路上,如何從網路上接收到資料後反序列化回結構。
ACE的C++ WRAPPER FACADE層將網路通訊分成三種角色:連線者(ACE_SOCK_Connector)、等待者(ACE_SOCK_Acceptor)和傳輸者(ACE_SOCK_Stream)。
建立連線
首先使用ACE_SOCK_Connector::connect連線某個伺服器(使用ip地址和埠號),該伺服器上使用ACE_SOCK_Acceptor::accept等待外部的連線請求。ACE_INET_Addr類進行管理SOCKET通訊使用的IP地址和埠號。
下面就是連線者如何連線本機的7777埠的服務程式程式碼:
#include <iostream>
using namespace std;
#include "ace/INET_Addr.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
int main(void)
{
ACE_INET_Addr address("127.0.0.1:7777");
ACE_SOCK_Connector connector;
ACE_SOCK_Stream stream;
if(connector.connect(stream,address)==-1)
{
cout<<strerror(errno)<<endl;
}
}
如果連線成功,connect方法返回0,如果連線失敗,返回-1,執行緒專有的errno變數將被設定對應的錯誤碼,你可以通過strerror函式獲取錯誤資訊描述字串,也可以使用執行緒安全的版本strerror_r。ACE不使用異常報錯,原因之一是早些時候異常並不被所有的C++編譯器支援,原因之二是異常對效能仍然有影響,作為高效能底層庫ACE仍然採用了C風格進行錯誤處理。但是你仍然可以在自己的應用邏輯中使用異常,並不會和
下面是等待者的示例:
#include <iostream>
using namespace std;
#include "ace/INET_Addr.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Acceptor.h"
int main(void)
{
ACE_SOCK_Acceptor acceptor;
//本地埠7777的ACE_INET_Addr物件
ACE_INET_Addr address;
address.set(7777);
//繫結本地埠,並且設定為監聽狀態
if(acceptor.open(address)==-1)
{
cout<<strerror(errno)<<endl;
}
ACE_SOCK_Stream stream;
if(acceptor.accept(stream)==-1)
{
cout<<strerror(errno)<<endl;
}
}
注意,ACE_SOCK_Acceptor::accept和ACE_SOCK_Connector::connect方法都可以接收一個ACE_TIME_Value*引數。該引數預設直為NULL,就像上面的兩個示例,表示除非建立連線,否則不會返回;如果我們建立ACE_TIME_Value time(0,0)物件作為引數,則表示方法不會阻塞,如果不能立刻建立連線,就返回-1,並且errno為EWOULDBLOCK;如果我們建立ACE_TIME_Value time(5,0)物件作為引數,就表示方法會最多等待5秒鐘,如果5秒鐘內還沒有建立連線,就返回-1,並且errno為ETIME.
ACE_SOCK_Acceptor物件沒有狀態,因此多執行緒可以在不鎖定的情況下共享該物件。
資料傳輸
通常資料傳輸的過程是將物件中的資料按照某種格式序列化成連續的位元組流,然後傳送到網路上,當另一端接收到位元組流後,按照此格式反序列化成物件。當連線建立好後,通訊雙方都有兩個可以傳送和接收資料的ACE_SOCK_Stream物件。該物件提供了傳送和接收的方法。send_n/recv_n用於傳送和接收確定數量的位元組流,如果沒有傳送或者接收完,該方法將阻塞。而send/recv就不保證這一點,可能實際傳送或者接收的資料比引數指定的少,該方法不會阻塞,而是返回實際傳送或者接收的資料大小。send/recv方法實際是從父類ACE_SOCK_IO繼承而來的。
網路傳輸的一種高效的方法是集中寫和分散讀。不同緩衝區的資料沒有必要拷貝到一起,就可以直接按照次序一次型的傳送出去。從網路另一端收到後,有可以分散的寫到不同的緩衝區中。這就避免了資料複製的開銷。ACE_SOCK_Stream的方法recvv_n/sendv_n方法就提供了這個機制。我們後面的示例將演示這個方法的使用。
如果我們使用TCP/IP協議傳送資料,TCP/IP協議有一個Nagle演算法。該演算法將快取小資料,減少網路傳送的次數,從而避免過多通訊的開銷。在某些情況下,我們需要關閉該演算法,讓我們的資料能夠立刻傳送出去。ACE_SOCK_Stream的set_option方法使用引數TCP_NODELAY可以關閉這個演算法。另一個方法是當我們使用sendv_n方法時,也會強制資料立刻傳送。
下面的示例將一個結構SHMRecord初始化,並序列化到ACE_OutputCDR物件中。然後使用sendv_n方法將資料發出。
#include <iostream>
using namespace std;
#include "ace/INET_Addr.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Connector.h"
#include "ace/CDR_Stream.h"
class SHMRecord
{
public:
SHMRecord():pData_(NULL){}
ACE_UINT16 type_;
ACE_UINT32 offset_;
void* pData_;
ACE_UINT32 dataLength_;
size_t size() const
{
return 2+4+4+dataLength_;
}
~SHMRecord()
{
if(pData_!=NULL)
delete[] static_cast<char*>(pData_);
}
};
int operator<<(ACE_OutputCDR & cdr,SHMRecord const& record)
{
cdr<<record.type_;
cdr<<record.offset_;
cdr<<record.dataLength_;
cdr.write_char_array(static_cast<char*>(record.pData_),record.dataLength_);
return cdr.good_bit();
}
int operator>>(ACE_InputCDR & cdr,SHMRecord & record)
{
cdr>>record.type_;
cdr>>record.offset_;
cdr>>record.dataLength_;
record.pData_=new char[record.dataLength_]();
cdr.read_char_array(static_cast<char*>(record.pData_),record.dataLength_);
return cdr.good_bit();
}
int main(void)
{
ACE_INET_Addr address("127.0.0.1:7777");
ACE_SOCK_Connector connector;
ACE_SOCK_Stream stream;
if(connector.connect(stream,address)==-1)
{
cout<<strerror(errno)<<endl;
}
SHMRecord record;
record.type_=1;
record.offset_=2;
record.pData_=new char[4]();
record.dataLength_=4;
strcpy(static_cast<char*>(record.pData_),"hih");
const size_t size=record.size()+ACE_CDR::MAX_ALIGNMENT;
ACE_OutputCDR payload(size);
payload<<record;
//create cdr header for this data
ACE_OutputCDR header(ACE_CDR::MAX_ALIGNMENT+8);
header<<ACE_OutputCDR::from_boolean(ACE_CDR_BYTE_ORDER);
header<<ACE_CDR::ULong(size);
iovec iov[2];
iov[0].iov_base=header.begin()->rd_ptr();
iov[0].iov_len=8;//如果使用ACE_LACKS_CDR_ALIGNMENT巨集,8要被替換成實際的大小
iov[1].iov_base=payload.begin()->rd_ptr();
iov[1].iov_len=size;
stream.sendv_n(iov,2);
cout<<record.type_<<endl;
cout<<record.offset_<<endl;
cout<<static_cast<char*>(record.pData_)<<endl;
cout<<record.dataLength_<<endl;
}
ACE提供了ACE_OutputCDR和ACE_InputCDR類,是針對網路程式經常遇到的將物件資料序列化到位元組流和從位元組流中反序列化到物件的情況。你可以提供自己的operator<<和operator>>操作,就像上面的例子一樣。 為什麼我在構造ACE_OutputCDR物件的時候總是在實際緩衝區大小上加上ACE_CDR::MAX_ALIGNMENT呢?因為如果我在編譯時沒有在config.h中用巨集#defineACE_CDR_IGNORE_ALIGNMENT,那麼ACE會自動進行邊界調整,使得變數按照四位元組對齊(32位處理器)。如果緩衝區中的起始位置沒有對齊,第一個變數值通常都會往緩衝區頭部的後面若干位元組開始寫入。這時候,緩衝區如果沒有足夠的空間,寫資料就會越界。因此ACE_CDR::MAX_ALIGNMENT代表的8位元組,能夠保證我們建立足夠大小的緩衝區應付這種情況。這種方式和支援標準C++流的方式是一樣的。那麼,為什麼不直接使用標準C++流呢?因為ACE所支援的平臺很多,有些編譯器不支援標準C++流。並且據我個人的體驗,標準C++流在記憶體管理上是封裝的,你不可能通過公有方法獲得內部關裡的緩衝區的指標,除非自己定義自己的派生類,這並不容易。還有一個原因是不同編譯器和不同的硬體使用了不同的位元組順序(大尾數法和小尾數法)。正確的使用ACE的cdr類就可以保證各種環境下都能使用,因為它在內部使用了CORBA公共資料表示的格式。在這個示例程式裡,我們實際上建立了兩個ACE_OutputCDR物件,一個用來表示資料頭,一個存放實際結構中的資料。資料頭中前4個位元組存放了一個布林值,表示本機的位元組順序,後面四個位元組表示第二個物件的實際長度。這裡我沒有使用巨集#define ACE_LACKS_CDR_ALIGNMENT,否則資料會被緊縮。布林值將只有一個位元組大小。
還有一種方法,本例沒有提供,這裡說明一下。首先在config.h檔案中加上巨集#define ACE_ENABLE_SWAP_ON_WRITE,然後重新編譯ACE。然後建立CDR物件時明確就是通過網路位元組順序(大尾數法):
ACE_OutputCDR ocdr(&mb, ACE_CDR::BYTE_ORDER_BIG_ENDIAN);
ACE_InputCDR icdr(&mb, ACE_CDR::BYTE_ORDER_BIG_ENDIAN);
對於基本的數值型別,各個平臺也有可能有長度的差異,比如int究竟是16,32還是64。所以這裡使用了ACE提供的基本數值型別,比如ACE_UINT32。
本例中,接收資料時首先接收固定長度的頭物件,取得位元組順序標誌後,調整位元組順序,然後獲取實際長度,根據該長度接收第二個ACE_OutputCDR物件存放的實際資料。
下面的例子演示瞭如何接收發送來的資料。
int main(void)
{
ACE_SOCK_Acceptor acceptor;
ACE_INET_Addr address;
address.set(7777);
if(acceptor.open(address)==-1)
{
cout<<strerror(errno)<<endl;
}
ACE_SOCK_Stream stream;
if(acceptor.accept(stream)==-1)
{
cout<<strerror(errno)<<endl;
}
auto_ptr<ACE_Message_Block> spBlock(new ACE_Message_Block(ACE_DEFAULT_CDR_BUFSIZE));
ACE_CDR::mb_align(spBlock.get());
if(stream.recv_n(spBlock->wr_ptr(),8)==8)//receive the header of CDR
{
//parse the cdr header
spBlock->wr_ptr(8);
ACE_InputCDR cdr(spBlock.get());
ACE_CDR::Boolean byte_order;
cdr>>ACE_InputCDR::to_boolean(byte_order);
cdr.reset_byte_order(byte_order);
ACE_CDR::ULong length;
cdr>>length;
//receive the data from master
spBlock->size(length+8+ACE_CDR::MAX_ALIGNMENT);
if(stream.recv_n(spBlock->wr_ptr(),length)==length)
{
spBlock->wr_ptr(length);
//必須重新建立一個CDR物件,否則解析不正確
ACE_InputCDR cdr2(spBlock.get());
ACE_CDR::Boolean byte_order;
cdr2>>ACE_InputCDR::to_boolean(byte_order);
cdr2.reset_byte_order(byte_order);
ACE_CDR::ULong length;
cdr2>>length;
auto_ptr<SHMRecord> spRecord(new SHMRecord);
cdr2>>*spRecord;
cout<<spRecord->type_<<endl;
cout<<spRecord->offset_<<endl;
cout<<static_cast<char*>(spRecord->pData_)<<endl;
cout<<spRecord->dataLength_<<endl;
}
}
}
ACE_Message_Block類用來管理資料,內部有一個指向ACE_Data_Block物件的指標,ACE_Data_Block類管理實際的緩衝區資料。這種設計允許多個ACE_Message_Block物件共享同一個ACE_Data_Block物件,對於效率的提高很有幫助。多個ACE_Message_Block物件可以組成一個連結串列(雙向或者單向)。
在上面的例子中,我們 建立了一個預設大小的ACE_Message_Block物件,然後將接收的資料寫入ACE_Data_Block的緩衝區中,並且移動寫指標的位置。ACE_InputCDR通過和ACE_Message_Block物件關聯來讀取緩衝區的資料。