1. 程式人生 > >定時器實現超時傳送/接收和定期檢測非活動連線

定時器實現超時傳送/接收和定期檢測非活動連線

《Linux高效能伺服器程式設計》閱讀筆記:

1. socket的傳送/接收超時

 在Linux網路程式設計基礎–socket常用選項中講道,socket選項SO_RCVTIMEOSO_SNDTIMEO分別用來設定socket接收資料超時和傳送資料超時時間,這兩個選項僅對與資料接收/傳送相關的socket系統呼叫都有效,具體如下:
這裡寫圖片描述
 在程式中上面的系統呼叫可以根據其返回值和errno來判斷是否超時,進而決定是否開始處理定時任務。
 以客戶端的connect()為例,該函式是向客戶端發起連線請求,通過三次握手和服務端建立通訊連線。需要注意這個操作不一定100%成功,可能某次握手失敗了,這時候TCP協議要求重新從第一次開始握手。我們可以通過SO_SNDTIMEO可以設定建立連線過程的超時時間,在超時時間範圍內沒有成功建立連線則失敗返回。

#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>

int main(void)
{
    const char* ip = "192.168.239.136";
    int port = 9660;
    int ret;

    //建立套接字
    int sockfd = socket(PF_INET, SOCK_STREAM, 0
); //設定套接字的傳送超時時間為10s struct timeval timeout; timeout.tv_sec = 10; timeout.tv_usec = 0; socklen_t len = sizeof(timeout); ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len); //連線目標伺服器 struct sockaddr_in addr; bzero(&addr, sizeof(addr)); addr.sin_family = AF_INET; inet_pton(AF_INET, ip, &addr.sin_addr); addr.sin_port = htons(port); ret = connect(sockfd, (struct
sockaddr*)&addr, sizeof(addr)); if (ret == -1) { if (errno == EINPROGRESS) { printf("connect timeout\n"); return -1; } printf("errno occur when connect to server\n"); perror("connect"); return -1; } return 0; }

2. 服務端處理非活動的連線

 超時事件也是網路程式需要處理的事件,比如下面講到的服務端定期檢驗一個客戶端連線的活動狀態。通常伺服器程式需要管理眾多定時事件,所以需要程式有效組織這些事件,使之能在預期的時間點觸發且不影響程式邏輯。
 在這裡我們將所有定時器及對應超時時間存放在升序雙向連結串列中,在超時處理函式中依次處理所有到期的定時器事件,以實現對定時事件的統一管理。升序定時器連結串列定義如下:

#ifndef LST_TIMER
#define LST_TIMER

#include <time.h>
#define BUFFER_SZ   64

struct cli_data;


//定時器節點和使用者資料相互包含,這樣可以通過整個程式使用的定時器找到使用者資料,也可以通過使用者資料找到整個程式使用的定時器連結串列
//定時器節點
class util_timer
{
public:    
    time_t expire;               /* 任務的超時時間 */
    void (*cb_func)(cli_data* ); /* 任務回撥函式 */
    util_timer* pre;
    util_timer* next;
    cli_data *user_data;

    util_timer() : pre(NULL), next(NULL) {}
};

//使用者資料結構
struct cli_data
{
    sockaddr_in addr;
    int sockfd;
    char buf[BUFFER_SZ];
    util_timer* timer;
};

//連結串列操作函式類
class sort_timer_lst
{
public:
    void add_timer(util_timer* timer)   //升序插入
    {
        if (!timer) return;

        /* 增加到頭節點 */
        if (!head)
        {
            head = tail = timer;
            return;
        }

        //插入位置是頭節點之前的位置
        if (timer->expire < head->expire)
        {
            timer->next = head;
            head->pre = timer;
            head = timer;
            return;
        }
        //插入位置是其它位置
        add_timer(timer, head);
    } 

    //定時器時間被延長時間後往連結串列尾方向移動
    void adjust_timer(util_timer* timer)
    {
        if (!timer) return;

        util_timer* tmp = timer->next;
        if (!tmp || (timer->expire < tmp->expire))
            return;

        if (timer == head)  //若是頭節點則往第2個節點後的位置移動
        {
            head = head->next;
            head->pre = NULL;
            timer->next = NULL;
            add_timer(timer, head);
        }
        else //非頭節點,從連結串列中刪除該節點並在頭節點後合適的位置插入
        {
            timer->pre->next = timer->next;
            timer->next->pre = timer->pre;
            add_timer(timer, timer->next);
        }       
    }

    void del_timer(util_timer* timer)
    {
        if (!timer) return;

        //連結串列中只有一個節點且等於該節點
        if ((timer == head) && (timer == tail))
        {
            delete timer;
            head = NULL;
            tail = NULL;
            return;
        }

        if (timer == head)
        {
            head = head->next;
            head->pre = NULL;
            delete timer;
            return;
        }

        if (timer == tail)
        {
            tail = tail->pre;
            tail->next = NULL;
            delete timer;
            return;
        }

        timer->pre->next = timer->next;
        timer->next->pre = timer->pre;
        delete timer;
    }

    //SIGALARM每次被觸發執行一次tick()函式(訊號處理函式->tick()->回撥函式)
    void tick()
    {
        if (!head) return;

        printf("time tick\n");
        time_t cur = time(NULL);
        util_timer* tmp = head;

        while (tmp)
        {
            if (cur < tmp->expire) break;
            tmp->cb_func(tmp->user_data);

            //處理完定時任務後將該定時器刪除並重置連結串列頭節點
            head = tmp->next;
            if (head)
                head->pre = NULL;

            delete tmp;
            tmp = head;
        }

    }

private:
    util_timer* head;   //指向頭節點
    util_timer* tail;   //指向尾節點

    //在非頭節點後的合適位置插入timer
    void add_timer(util_timer* timer, util_timer* lst_head) //升序插入
    {
        util_timer* pre = lst_head;
        util_timer* tmp = pre->next;

        while (tmp)
        {
            if (timer->expire < tmp->expire)
            {
                pre->next = timer;
                timer->next = tmp;
                tmp->pre = timer;
                timer->pre = pre;
                break;
            }
            pre = tmp;
            tmp = tmp->next;
        }

        //最後一個位置
        if (!tmp)
        {
            pre->next = timer;
            timer->pre = pre;
            timer->next = NULL;
            tail = timer;   
        }
    }
};

 sort_timer_lst::tick()函式是超時訊號SIGALARM的超時函式中被呼叫的,sort_timer_lst::tick()又會呼叫對應已經超時的定時器的回撥函式。在這裡我們要實現關閉非活動的連線,即在回撥函式中關閉該連線。
 程式的主體脈絡如下:
 (1)為每個連線的客戶端都分配一個定時器、設定超時時間和超時回撥函式(回撥函式關閉連線),並將定時器加入升序雙向連結串列中。
 (2)發生超時後執行超時訊號處理函式。為了統一事件源,在處理函式中(用管道技術)通知程式主迴圈。
 (3)在主迴圈中判斷是超時事件對應連結串列節點中的tick()函式,在tick()中呼叫每個已經超時的對應客戶端的超時回撥函式(此時升序連結串列發揮重大作用),即關閉連線。
 (4)主迴圈中若判斷是接收到資料,則接收完資料後重置該客戶端連線的定時器超時時間。

#include <stdio.h>
#include <stdlib.h>
#include <arpa/inet.h>
#include <libgen.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>
#include "lst_timer.h"

#define ERRP(con, ret, ...) do                              \
{                                                           \
    if (con)                                                \
    {                                                       \
        perror(__VA_ARGS__);                                \
        ret;                                                \
    }                                                       \
}while(0)

#define FD_LIMIT 65535
#define TIMESLOT 5
#define MAX_EVENT_NUMBER 1024

static const char* ip = "192.168.239.136";
static int port = 9660;

static int pipe_fd[2];
static int epfd;
static sort_timer_lst timer_lst;

//訊號處理函式,只是將該訊號值寫入管道寫端
void sig_handler(int sig)
{
    int errno_bak = errno;
    int msg = sig;
    send(pipe_fd[1], (char*)&msg, 1, 0);
    errno = errno_bak;
}

void add_signal(int sig)
{
    struct sigaction sa;
    bzero(&sa, sizeof(sa));
    sa.sa_handler = sig_handler;
    sa.sa_flags |= SA_RESTART;
    sigfillset(&sa.sa_mask);
    sigaction(sig, &sa, NULL);
}

int set_fd_nonblock(int fd)
{
    int old_opt = fcntl(fd, F_GETFL);
    int new_opt = old_opt | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_opt);

    return old_opt;
}

void add_fd_to_epoll(int epfd, int fd)
{
    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;   //可讀事件 | 邊沿觸發
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);    
    set_fd_nonblock(fd);
}

void cb_func(cli_data* user_data)
{
    //關閉非活動連線前先將該連線從epoll監聽表中移除
    epoll_ctl(epfd, EPOLL_CTL_DEL, user_data->sockfd, 0);
    close(user_data->sockfd);
    printf("close fd %d\n", user_data->sockfd);
}

void timer_handler()
{
    timer_lst.tick();
    alarm(TIMESLOT);
}

int main(void)
{
    int ret;
    bool is_stop = false;
    bool timeout = false;
    cli_data* users = new cli_data[FD_LIMIT];
    epoll_event events[MAX_EVENT_NUMBER] = {0};

    //建立監聽套接字
    int listen_fd = socket(PF_INET, SOCK_STREAM, 0);
    ERRP(listen_fd < 0, return -1, "socket");

    //命名套接字
    struct sockaddr_in addr;
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &addr.sin_addr);
    addr.sin_port = htons(port);    
    ret = bind(listen_fd, (struct sockaddr*)&addr, sizeof(struct sockaddr));
    ERRP(ret < 0, goto ERR1, "bind");

    //建立監聽佇列
    ret = listen(listen_fd, 5);
    ERRP(ret < 0, goto ERR1, "listen");

    //建立epoll核心事件列表
    epfd = epoll_create(5);
    ERRP(epfd < 0, goto ERR1, "epoll_create");

    //將描述符的可讀事件加入epoll核心時間表
    add_fd_to_epoll(epfd, listen_fd);

    //建立管道,在訊號處理函式中通過該管道和程式主迴圈通訊,以快速完畢訊號處理事件
    socketpair(PF_UNIX, SOCK_STREAM, 0, pipe_fd);
    ERRP(epfd < 0, goto ERR2, "socketpair");
    set_fd_nonblock(pipe_fd[1]);        //設定管道寫端為非阻塞
    add_fd_to_epoll(epfd, pipe_fd[0]);  //將管道讀端加入epoll核心事件表

    //註冊相關訊號的處理函式
    add_signal(SIGALRM);
    add_signal(SIGTERM);

    alarm(TIMESLOT);    
    while (!is_stop)
    {
        int num = epoll_wait(epfd, events, MAX_EVENT_NUMBER, -1);

        //系統產生SIGALRM超時訊號,會中斷epoll的監聽,即errno=EINTR,num=-1,此時不break
        //隨後sig_handler(int sig)得到執行,該函式向管道寫端pipe_fd[1]寫入訊號值,epoll再次返回,此時num=1
        if ((num < 0) && (errno != EINTR))
        {
            printf("epoll failer\n");
            break;
        }

        for (int i = 0; i < num; i++)
        {
            int fd = events[i].data.fd;

            //有新的客戶端連線
            if (fd == listen_fd)
            {
                struct sockaddr_in cli_addr;
                socklen_t len = sizeof(struct sockaddr_in);
                int connfd = accept(listen_fd, (struct sockaddr* )&cli_addr, &len);

                //將客戶端連線描述符加入epoll監聽表中
                add_fd_to_epoll(epfd, connfd);
                users[connfd].addr = cli_addr;
                users[connfd].sockfd = connfd;

                //為客戶端分配客戶端定時器,並加入客戶端數使用者資料結構中
                util_timer* timer = new util_timer;
                timer->user_data = &users[connfd];
                timer->cb_func = cb_func;
                time_t cur = time(NULL);
                timer->expire = cur + 3 * TIMESLOT; //一開始定義超時事件為15s,後面將是5s
                users[connfd].timer = timer;
                timer_lst.add_timer(timer);
            }
            //訊號處理函式往管道寫端寫入訊號值
            else if ((fd == pipe_fd[0]) && (events[i].events & EPOLLIN))
            {
                char signals[1024] = {0};
                ret = recv(pipe_fd[0], signals, sizeof(signals), 0);
                if (ret == -1)
                    continue;
                 else if(ret == 0)
                    continue;
                else
                {
                    for (int i = 0; i < ret; ++i)
                    {
                        switch (signals[i])
                        {
                            case SIGALRM:
                                timeout = true;
                                break;
                            case SIGTERM:
                                is_stop = true;

                        }
                    }
                }
            }
            //客戶端發來資料
            else if (events[i].events & EPOLLIN)
            {
                bzero(users[fd].buf, BUFFER_SZ);
                ret = recv(fd, users[fd].buf, BUFFER_SZ - 1, 0);
                printf("get %d bytes of client data %s from %d\n", ret, users[fd].buf, fd);

                util_timer* timer = users[fd].timer;
                if (ret < 0)
                {
                    if (errno != EAGAIN)
                    {
                        cb_func(&users[fd]);
                        if (timer)
                        {
                            timer_lst.del_timer(timer);
                        }
                    }
                }
                else if (ret == 0)
                {
                    cb_func(&users[fd]);
                    if (timer)
                    {
                        timer_lst.del_timer(timer);
                    }
                }
                else
                {
                    if (timer)
                    {
                        time_t cur = time(NULL);
                        //重置超時時間
                        timer->expire = cur + 3 * TIMESLOT;
                        printf("adjust timer once\n");
                        timer_lst.adjust_timer(timer);           
                    }
                }
            }
            else {
                printf("noting\n");
            }
        }

        //通過timer_handler()執行定時器的tick(),進而呼叫回撥函式,回撥函式中關閉連線
        if (timeout)
        {
            timer_handler();
            timeout = false;
        }
    }

    close(pipe_fd[1]);
    close(pipe_fd[0]);
    delete[] users;

ERR2:
    close(epfd);
ERR1:
    close(listen_fd);

    return 0;
}