網路通訊協議之-進階學習
阿新 • • 發佈:2022-03-07
一、高速網路通訊基礎
-
繼上一章節介紹過網路程式設計粘包問題之後,一般的網路傳輸過程中,大都為以資料包組織的方式進行傳輸。因此,對於一般的資料包都包括了包頭包尾,包長度,包中的具體資料等資訊。對於校驗碼等相關內容無需新增,TCP網路在接收之後會對資料自行校驗,具體參見這裡。
-
對於之前介紹的Linux基礎TCP通訊的建立是不完備的,主要體現在一下幾點:
- 建立socket過程中並未處理假如某些服務已經佔用了當前主機埠Port而導致衝突的情況。
- 沒有對客戶端Client異常掉線失去連線的情況進行異常處理,這樣可能導致Server端Crash崩潰!
- 對於待進行傳輸的資料頻寬需要進行基本的計算,考慮裝置硬體的基本傳輸能力,避免導致Linux Socket底層資料緩衝空間溢位,send或recv函數出現阻塞等。
- 對於不希望accept阻塞等待的情況,還需要考慮使用多執行緒或者select非阻塞等待函式來實現。
二、基本異常處理
1. 資料包結構體組織形式
typedef struct ProfileData{ int iHeader; // 0xFFFF FFFE int iReserveS1; // 0x0000 0000 int iProfileSEQ; // 資料序號 int iReserveS2; // 0x0000 0000 int iEncoderVal; // 編碼資料 int iReserveS3; // 0x0000 0000 int *piProfileData; // 資料指標 int piProfileDataLen; // 資料總長 int iPacketEnd; // 0x0000 0000 }T_StructProfileData;
2. Client客戶端失聯訊號處理
static void socket_sig_deal(void) { struct sigaction sa; sigemptyset(&sa.sa_mask); // 設定訊號處理過程中被遮蔽的阻塞訊號 sa.sa_handler = handle_pipe_lt100; // 設定訊號處理函式地址, 一般情況下sa_handler只傳入訊號量值, 不攜帶其他訊號相關資訊 sa.sa_flags = 0; // 標誌位設定, 配合sa_handler/sa_sigaction不同的處理函式使用, 需要傳入其他引數進入處理函式時需要選擇sa.sa_sigaction + SA_SIGINFO sigaction(SIGPIPE,&sa,NULL); // SIGPIPE 管道斷開訊號, 產生的 error = EPIPE, 不做捕獲處理會導致程式奔潰 }
3. 主動判斷socket連線狀態
static int net__socket_established(int iSock)
{
struct tcp_info info; // 建立tcp_info結構體用來儲存tcp狀態資訊
int len = sizeof(info);
getsockopt(iSock, IPPROTO_TCP, TCP_INFO, &info, (socklen_t *)&len); // 獲取TCP當前狀態資訊
if ((info.tcpi_state == TCP_ESTABLISHED)) // 判斷當前TCP是否處在建立連線狀態
{
return SUCCESS; // 返回連線狀態
}
else
{
return ERR_CONN_LOST; // 返回失聯狀態
}
}
三、高速通訊基本程式碼實現
1. 服務端程式碼(Server.c)
#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h> // SOL_SOCKET,SO_RCVTIMO,SO_SNDTIMEO,IPPOTO_TCP,TCP_NODELAY
typedef struct ProfileData{
int iHeader; // 0xFFFF FFFE
int iReserveS1; // 0x0000 0000
int iProfileSEQ; // 資料序號
int iReserveS2; // 0x0000 0000
int iEncoderVal; // 編碼資料
int iReserveS3; // 0x0000 0000
int *piProfileData; // 資料指標
int piProfileDataLen; // 資料總長
int iPacketEnd; // 0x0000 0000
}T_StructProfileData;
static void socket_sig_deal(void);
static int net__socket_established(int iSock);
int highSpeedCommunicateSetup(int *ipState);
int highSpeedCommunicateTransimit(int iSock, const T_StructProfileData t_StructDataPacket);
#define host "0.0.0.0"
#define port 50000
#define SERVICE "50000"
#define socket_domain 1
#define SUCCESS 1
#define INVALID_SOCKET -1
#define ERR_CONN_LOST -2
#define ERR_ERRNO -3
/*
TCP/IP協議中針對TCP預設開啟了Nagle演算法。Nagle演算法通過減少需要傳輸的資料包,來優化網路。在核心實現中,資料包的傳送和接受會先做快取,分別對應於寫快取和讀快取。
啟動TCP_NODELAY,就意味著禁用了Nagle演算法,允許小包的傳送。對於延時敏感型,同時資料傳輸量比較小的應用,開啟TCP_NODELAY選項無疑是一個正確的選擇。
比如,對於SSH會話,使用者在遠端敲擊鍵盤發出指令的速度相對於網路頻寬能力來說,絕對不是在一個量級上的,所以資料傳輸非常少;而又要求使用者的輸入能夠及時獲得返回,
有較低的延時。如果開啟了Nagle演算法,就很可能出現頻繁的延時,導致使用者體驗極差。當然,你也可以選擇在應用層進行buffer,比如使用java中的buffered stream,
儘可能地將大包寫入到核心的寫快取進行傳送;vectored I/O(writev介面)也是個不錯的選擇。
對於關閉TCP_NODELAY,則是應用了Nagle演算法。資料只有在寫快取中累積到一定量之後,才會被髮送出去,這樣明顯提高了網路利用率(實際傳輸資料payload與協議頭的比例大大提高)。
但是這又不可避免地增加了延時;與TCP delayed ack這個特性結合,這個問題會更加顯著,延時基本在40ms左右。當然這個問題只有在連續進行兩次寫操作的時候,才會暴露出來。
連續進行多次對小資料包的寫操作,然後進行讀操作,本身就不是一個好的網路程式設計模式;在應用層就應該進行優化。
*/
#define set_tcp_nodelay 0 //
#define DATA_LEN 3200
int net__socket_listen(void);
int net__socket_accept(int listensock);
ssize_t net__write(int sock,const void *buf, size_t count);
int packet__write(int sock,const void *buf, size_t count);
int main(void)
{
int flag = 0;
T_StructProfileData ProfileData;
memset(&ProfileData,0,sizeof(T_StructProfileData));
ProfileData.iHeader = 0xFAFFF6F8;
ProfileData.iReserveS1 = ProfileData.iReserveS2 = ProfileData.iReserveS3 = ProfileData.iPacketEnd = 0x00000000;
ProfileData.iProfileSEQ = 0x00000000;
ProfileData.iEncoderVal = 0xEFFFD2E3;
ProfileData.piProfileDataLen = 3200;
ProfileData.piProfileData = (int *)malloc(sizeof(int)*ProfileData.piProfileDataLen);
memset(ProfileData.piProfileData, 21,sizeof(int)*ProfileData.piProfileDataLen);
int sock_ok = highSpeedCommunicateSetup(&flag);
if(flag == -1){
printf("net__socket_accept failed.\n");
return 0;
}
int temp = sock_ok;
printf("####################################################### sock_ok=%d\n",temp);
sleep(3);
while(1){
if(sock_ok != temp){
printf("####################################################### sock_ok=%d\n",sock_ok);
}
ProfileData.iProfileSEQ += 1;
flag = highSpeedCommunicateTransimit(sock_ok,ProfileData);
if(flag == ERR_CONN_LOST){
printf("ERR_CONN_LOST ####################################################################!!!!!!!!!!!!!!\n");
}
if(flag >= 0){
printf("packet__write successful.\n");
}else{
printf("packet__write failed.\n");
}
sleep(1);
}
return 0;
}
int highSpeedCommunicateSetup(int *ipState)
{
int isock = 0;
isock = net__socket_listen();
if(-1 == isock)
{
return INVALID_SOCKET;
}
isock = net__socket_accept(isock);
if(-1 == isock)
{
return INVALID_SOCKET;
}
return isock;
}
int highSpeedCommunicateTransimit(int iSock, const T_StructProfileData t_StructDataPacket)
{
int flag = 0;
flag = packet__write(iSock, &(t_StructDataPacket.iHeader), 4); if(0 != flag) {return flag;}// &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iReserveS1), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iProfileSEQ), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iReserveS2), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iEncoderVal), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iReserveS3), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iReserveS3), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iReserveS3), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, t_StructDataPacket.piProfileData, t_StructDataPacket.piProfileDataLen * 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
flag = packet__write(iSock, &(t_StructDataPacket.iPacketEnd), 4); if(0 != flag) {return flag;} // &(t_StructDataPacket.iHeader)
return 0;
}
int net__socket_listen(void)
{
int sock = INVALID_SOCKET;
struct addrinfo hints;
struct addrinfo *ainfo, *rp;
char service[10];
int rc;
char ss_opt = 1;
unsigned int sock_count = 0;
snprintf(service, 10, "%d", port);
memset(&hints, 0, sizeof(struct addrinfo)); // 初始化模板 hints 變數
if(socket_domain){
hints.ai_family = AF_INET; //AF_INET6
}else{
hints.ai_family = AF_UNSPEC;
}
hints.ai_flags = AI_PASSIVE;
hints.ai_socktype = SOCK_STREAM;
rc = getaddrinfo(NULL, SERVICE, &hints, &ainfo);
if (rc){
printf("Error creating listener: %s.", gai_strerror(rc));
return INVALID_SOCKET;
}
for(rp = ainfo; rp; rp = rp->ai_next){
if(rp->ai_family == AF_INET){
printf("Opening ipv4 listen socket on port %d.\n", ntohs(((struct sockaddr_in *)rp->ai_addr)->sin_port));
}else if(rp->ai_family == AF_INET6){
printf("Opening ipv6 listen socket on port %d.\n", ntohs(((struct sockaddr_in6 *)rp->ai_addr)->sin6_port));
}else{
continue;
}
sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if(sock == INVALID_SOCKET){
printf("ERR:%s-%d\n",__FUNCTION__,__LINE__);
continue;
}
if(bind(sock, rp->ai_addr, rp->ai_addrlen) != 0){
printf("ERR:%s-%d\n",__FUNCTION__,__LINE__);
break;
}
close(sock);
}
if(listen(sock, 100) == -1){
printf("Error:");
close(sock);
return INVALID_SOCKET;
}
freeaddrinfo(ainfo);
return sock;
}
int net__socket_accept(int listensock)
{
int new_sock = INVALID_SOCKET, struct_len = 0;
struct sockaddr_in client_addr;
struct_len = sizeof(struct sockaddr_in);
new_sock = accept(listensock, (struct sockaddr *)&client_addr, &struct_len);
if(new_sock == INVALID_SOCKET){
return INVALID_SOCKET;
}
if(set_tcp_nodelay){
int flag = 1;
if(setsockopt(new_sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(int)) != 0){
printf("XiongGu Warning: Unable to set TCP_NODELAY.");
}
}
return new_sock;
}
/**********************************************************************
* 函式名稱: // static ssize_t net__write(int sock,const void *buf, size_t count)
* 功能描述: // 嘗試向Socket套接字緩衝區寫入count位元組資料
* 訪問的表: //
* 修改的表: //
* 輸入引數: // int sock 可用的socket控制代碼
* 輸入引數: // void *buf 用於儲存讀準備寫入的位元組(一般為char)
* 輸入引數: // size_t count 期望寫入的位元組總數, 此值需要小於等於buf空間大小/sizeof(char) >= count
* 輸出引數: // 對輸出引數的說明
* 返 回 值: // ssize_t: 表示實際寫入的位元組數
* 其它說明: // 其它說明
* 修改日期 修改人 修改內容
* -----------------------------------------------
* 2021/11/02 XXXX XXXX
***********************************************************************/
ssize_t net__write(int sock,const void *buf, size_t count)
{
return send(sock, buf, count, 0);
}
/**********************************************************************
* 函式名稱: // static int packet__write(int sock,const void *buf, size_t count)
* 功能描述: // 嘗試從Socket套接字緩衝區寫入count位元組資料, 寫入到count位元組資料為止
* 訪問的表: //
* 修改的表: //
* 輸入引數: // int sock 可用的socket控制代碼
* 輸入引數: // void *buf 用於儲存讀準備寫入的位元組(一般為char)
* 輸入引數: // size_t count 期望寫入的位元組總數, 此值需要小於等於buf空間大小/sizeof(char) >= count
* 輸出引數: // 對輸出引數的說明
* 返 回 值: // ssize_t: 表示實際寫入的位元組數
* 其它說明: // 其它說明
* 修改日期 修改人 修改內容
* -----------------------------------------------
* 2021/11/02 XXXX XXXX
***********************************************************************/
int packet__write(int sock,const void *buf, size_t count)
{
ssize_t write_length;
char *data = (char *)buf;
int pos=0;
int to_process = count;
while(to_process > 0)
{
write_length = net__write(sock,&data[pos],count);
if(write_length > 0){
to_process -= write_length;
pos += write_length;
}
else
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
{
return 0;
}
else
{
switch(errno)
{
case ECONNRESET:
return ERR_CONN_LOST;
default:
return ERR_ERRNO;
}
}
}
}
return 0;
}
/**********************************************************************
* 函式名稱: // static void handle_pipe(int Sig)
* 功能描述: // 嘗試從Socket套接字緩衝區讀取count位元組資料
* 訪問的表: //
* 修改的表: //
* 輸入引數: // int Sig 當前系統產生的訊號編號
* 輸出引數: //
* 返 回 值: //
* 其它說明: // 其它說明
* 修改日期 修改人 修改內容
* -----------------------------------------------
* 2021/11/02 XXXX XXXX
***********************************************************************/
static void handle_pipe(int Sig)
{
printf("Handle the Disconnected Error Sig!\n"); // 捕捉訊號不做任何處理, 只打印
}
/**********************************************************************
* 函式名稱: // static void socket_sig_deal(void)
* 功能描述: // 嘗試從Socket套接字緩衝區讀取count位元組資料
* 訪問的表: //
* 修改的表: //
* 輸入引數: //
* 輸出引數: //
* 返 回 值: //
* 其它說明: // 其它說明
* 修改日期 修改人 修改內容
* -----------------------------------------------
* 2021/11/02 XXXX XXXX
***********************************************************************/
static void socket_sig_deal(void)
{
struct sigaction sa;
sigemptyset(&sa.sa_mask); // 設定訊號處理過程中被遮蔽的阻塞訊號
sa.sa_handler = handle_pipe; // 設定訊號處理函式地址, 一般情況下sa_handler只傳入訊號量值, 不攜帶其他訊號相關資訊
sa.sa_flags = 0; // 標誌位設定, 配合sa_handler/sa_sigaction不同的處理函式使用, 需要傳入其他引數進入處理函式時需要選擇sa.sa_sigaction + SA_SIGINFO
sigaction(SIGPIPE,&sa,NULL); // SIGPIPE 管道斷開訊號, 產生的 error = EPIPE, 不做捕獲處理會導致程式奔潰
}
/**********************************************************************
* 函式名稱: // static int net__socket_established(int iSock)
* 功能描述: // 嘗試從Socket套接字緩衝區讀取count位元組資料
* 訪問的表: //
* 修改的表: //
* 輸入引數: // int iSock 待檢測的socket控制代碼
* 輸出引數: //
* 返 回 值: // int: 0表示傳輸建立穩定 -2表示傳輸建立斷開
* 其它說明: // 其它說明
* 修改日期 修改人 修改內容
* -----------------------------------------------
* 2021/11/02 XXXX XXXX
***********************************************************************/
static int net__socket_established(int iSock)
{
struct tcp_info info; // 建立tcp_info結構體用來儲存tcp狀態資訊
int len = sizeof(info);
getsockopt(iSock, IPPROTO_TCP, TCP_INFO, &info, (socklen_t *)&len); // 獲取TCP當前狀態資訊
if ((info.tcpi_state == TCP_ESTABLISHED)) // 判斷當前TCP是否處在建立連線狀態
{
return SUCCESS; // 返回連線狀態
}
else
{
return ERR_CONN_LOST; // 返回失聯狀態
}
}
2. 客戶端程式碼(Client.c)
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define Debug 1
#define BUF_SIZE 12800
/**********************************************************************
* 函式名稱: // static ssize_t net__read(int sock,void *buf, size_t count)
* 功能描述: // 嘗試從Socket套接字緩衝區讀取count位元組資料
* 訪問的表: //
* 修改的表: //
* 輸入引數: // int sock 可用的socket控制代碼
* 輸入引數: // void *buf 用於儲存讀取得到的位元組(一般為char)
* 輸入引數: // size_t count 期望讀取到的位元組總數, 此值需要小於等於buf空間大小/sizeof(char) >= count
* 輸出引數: //
* 返 回 值: // ssize_t: 表示實際讀取到的位元組數
* 其它說明: // 其它說明
* 修改日期 修改人 修改內容
* -----------------------------------------------
* 2021/11/02 XXXX XXXX
***********************************************************************/
ssize_t net__read(int sock,void *buf, size_t count);
/**********************************************************************
* 函式名稱: // static int packet__read(int sock,const void *buf, size_t count)
* 功能描述: // 嘗試從Socket套接字緩衝區讀取count位元組資料, 讀取到count位元組資料為止
* 訪問的表: //
* 修改的表: //
* 輸入引數: // int sock 可用的socket控制代碼
* 輸入引數: // void *buf 用於儲存讀取得到的位元組(一般為char)
* 輸入引數: // size_t count 期望讀取到的位元組總數, 此值需要小於等於buf空間大小/sizeof(char) >= count
* 輸出引數: //
* 返 回 值: // int: 表示函式執行的情況
* 其它說明: // 其它說明
* 修改日期 修改人 修改內容
* -----------------------------------------------
* 2021/11/02 XXXX XXXX
***********************************************************************/
int packet__read(int sock,const void *buf, size_t count);
int
main(int argc, char *argv[])
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd, s, j;
size_t len;
ssize_t nread;
char buf[BUF_SIZE];
if (argc < 2) {
fprintf(stderr, "Usage: %s host port msg...\n", argv[0]);
exit(EXIT_FAILURE);
}
/* Obtain address(es) matching host/port. */
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Datagram socket */
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0; /* Any protocol */
s = getaddrinfo(argv[1], argv[2], &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
exit(EXIT_FAILURE);
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully connect(2).
If socket(2) (or connect(2)) fails, we (close the socket
and) try the next address. */
for (rp = result; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype,
rp->ai_protocol);
if (sfd == -1)
continue;
if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1){
struct sockaddr_in * temp;
printf("temp=NULL\n");
if( rp->ai_addr == NULL )
{
printf("addr is null\n");
}
else
{
temp = (struct sockaddr_in *)(rp->ai_addr);
// char *ip = inet_ntoa(temp.sin_addr);
printf("%s\n",inet_ntoa(temp->sin_addr));
}
printf("temp=NULL********\n");
break; /* Success */
}
close(sfd);
}
freeaddrinfo(result); /* No longer needed */
if (rp == NULL) { /* No address succeeded */
fprintf(stderr, "Could not connect\n");
exit(EXIT_FAILURE);
}
/* Send remaining command-line arguments as separate
datagrams, and read responses from server. */
unsigned int i = 1;
int flag = 0;
for (;;i++) {
#if Debug
printf("iHeader:\n");
#endif
flag = packet__read(sfd,buf, 4); // iHeader
if(flag == 1){
#if Debug
for(j=0;j<4;j++){
printf("%d ", buf[j]);
}
printf("\n");
#endif
}else
{
continue;
}
#if Debug
printf("iReserveS1-iProfileSEQ-iReserveS2-iEncoderVal-iReserveS3-iReserveS3-iReserveS3:\n");
#endif
flag = packet__read(sfd,buf, 28); // iReserveS1-iProfileSEQ-iReserveS2-iEncoderVal-iReserveS3-iReserveS3-iReserveS3
if(flag == 1){
#if Debug
for(j=0;j<28;j++){
printf("%d ", buf[j]);
}
printf("\n");
#endif
}else
{
continue;
}
#if Debug
printf("piProfileData:\n");
#endif
flag = packet__read(sfd,buf, 12800); // piProfileData 3200*4Bytes = 12800 Bytes
if(flag == 1){
#if Debug
for(j=0;j<12800;j++){
printf("%d ", buf[j]);
}
printf("\n");
#endif
}else
{
continue;
}
#if Debug
printf("iPacketEnd:\n");
#endif
flag = packet__read(sfd,buf, 4); // iPacketEnd
if(flag == 1){
#if Debug
for(j=0;j<4;j++){
printf("%d ", buf[j]);
}
printf("\n");
#endif
}else
{
continue;
}
#if Debug
printf("######################################### Line[%d] #########################################\n",i);
#endif
}
exit(EXIT_SUCCESS);
}
ssize_t net__read(int sock,void *buf, size_t count)
{
return recv(sock, buf, count, 0);
}
int packet__read(int sock,const void *buf, size_t count)
{
ssize_t read_length;
char *data = (char *)buf;
int pos=0;
int to_process = count;
while(to_process > 0)
{
read_length = net__read(sock,&data[pos],to_process);
if(read_length > 0){
to_process -= read_length;
pos += read_length;
}
else
{
#if Debug
// printf("Current Status: To_Process(%d)-Write_Len(%d)\n",to_process,read_length);
// return -1; // 異常處理
#endif
}
}
return 1;
}
五、高速通訊程式碼測試
1. 編譯及執行
gcc -o MS MyServer.c
gcc -o MC MyClient.c
./MS
./MC 192.168.1.110 50000
2. 測試結果如下:
Server服務端:
Opening ipv4 listen socket on port 50000.
ERR:net__socket_listen-177
####################################################### sock_ok=4
packet__write successful.
Client客戶端:
21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21
iPacketEnd:
0 0 0 0
######################################### Line[11] #########################################
iHeader:
Reference
怎樣實時判斷socket連線狀態?:https://www.cnblogs.com/embedded-linux/p/7468442.html
SOCKET:SO_LINGER 選項:https://www.cnblogs.com/kuliuheng/p/3670353.html
Linux SIGPIPE訊號產生原因與解決方法:https://blog.csdn.net/u010821666/article/details/81841755