1. 程式人生 > 其它 >Docker安裝Kafka-單個部署 叢集部署

Docker安裝Kafka-單個部署 叢集部署

Linux系統呼叫

原文:https://www.cnblogs.com/ycw0923/p/12913925.html

一.為何要有系統呼叫

unix核心分為使用者態和核心態,在使用者態下程式不內直接訪問核心資料結構或者核心程式,只有在核心態下才可訪問。請求核心服務的程序使用系統呼叫的特殊機制,每個系統呼叫都設定了一組識別程序請求的引數,通過執行CPU指令完成使用者態向核心態的轉換。

二.系統呼叫過程

32位系統中,通過int $0x80指令觸發系統呼叫。其中EAX暫存器用於傳遞系統呼叫號,引數按順序賦值給EBX、ECX、EDX、ESI、EDI、EBP這6個暫存器。

64位系統則是使用syscall指令來觸發系統呼叫,同樣使用EAX暫存器傳遞系統呼叫號,RDI、RSI、RDX、RCX、R8、R9這6個暫存器則用來傳遞引數。

下面以64位系統中的42號,connect系統呼叫作為例子

connect是socket網路通訊中的函式,是客戶端與服務端連線時所用到的函式,connect接受三個引數,分別是客戶端的檔案描述符,sockaddr結構體,以及地址長度(ipv4為4)。若成功連線,返回0,否則返回-1

下面是客戶端的原始碼

#include <sys/socket.h>
#include <sys/types.h>
#include <netdb.h>
#include <stdlib.h>
#include <stdio.h>
#include <memory.h>
#include <unistd.h>
#include "rio.h"
#define MAXLINE 100

int open_clientfd(char*,char*);

int main(int argc,char** argv){
    int clientfd;
    char* host,*port,buf[MAXLINE];
    rio_t rio;
    if(argc != 3){
        fprintf(stderr,"usage: %s <host> <port>\n",argv[0]);
        exit(0);
    }
    host = argv[1];
    port = argv[2];

    clientfd = open_clientfd(host,port);
    rio_readinitb(&rio,clientfd);
    while(fgets(buf,MAXLINE,stdin)!=NULL){
        rio_writen(clientfd,buf,strlen(buf));
        rio_readlineb(&rio,buf,MAXLINE);
        fputs(buf,stdout);
    }
    close(clientfd);
    exit(0);
}

int open_clientfd(char* hostname,char* port){
    int clientfd;
    struct addrinfo hints,*listp,*p;
    memset(&hints,0,sizeof(struct addrinfo));
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_NUMERICSERV;
    hints.ai_flags |= AI_ADDRCONFIG;
    getaddrinfo(hostname,port,&hints,&listp);
    //getaddrinfo會返回所有可用的套接字
    for(p=listp;p;p=p->ai_next){
        if((clientfd = socket(p->ai_family,p->ai_socktype,p->ai_protocol))<0)
            continue;
        if(connect(clientfd,p->ai_addr,p->ai_addrlen)!=-1)//引數分別為客戶端的檔案描述符,addr地址結構,已經地址長度
            break;//成功建立連線
        close(clientfd);//建立失敗,嘗試另一個套接字
    }
    freeaddrinfo(listp);
    if(!p) return -1;
    return clientfd;
}

服務端是採用基於I/O多路複用的併發事件驅動伺服器,基於select函式

#include <sys/socket.h>
#include <sys/types.h>
#include <sys/select.h>
#include <netdb.h>
#include <stdlib.h>
#include <stdio.h>
#include <memory.h>
#include <unistd.h>
#include <errno.h>
#include "rio.h"

#define LISTENQ 1024
#define MAXLINE 100

typedef struct{
    int maxfd;
    fd_set read_set;
    fd_set ready_set;
    int nready;
    int maxi;
    int clientfd[FD_SETSIZE];
    rio_t clientrio[FD_SETSIZE];
}pool;

int bytes_cnt = 0;

int open_listenfd(char*);
void echo(int);
void command();
void init_pool(int,pool*);
void add_client(int,pool*);
void check_clients(pool*);

int main(int argc,char** argv){
    int listenfd,connfd;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    char client_hostname[MAXLINE]; char client_port[MAXLINE];
    static pool pool;

    if(argc != 2){
        fprintf(stderr,"usage: %s <port>\n",argv[0]);
        exit(0);
    }

    listenfd = open_listenfd(argv[1]);
    init_pool(listenfd,&pool);

    while(1){
        pool.ready_set = pool.read_set;
        pool.nready = select(pool.maxfd+1,&pool.ready_set,NULL,NULL,NULL);

        if(FD_ISSET(listenfd,&pool.ready_set)){
            clientlen = sizeof(struct sockaddr_storage);
            connfd = accept(listenfd,(struct sockaddr *)&clientaddr,&clientlen);
            add_client(connfd,&pool);
            getnameinfo((struct sockaddr *)&clientaddr,clientlen,client_hostname,MAXLINE,client_port,MAXLINE,0);
            printf("連線到:(%s,%s)\n",client_hostname,client_port);
        }
        check_clients(&pool);
    }
}

int open_listenfd(char* port){
    int listenfd; int optval = 1;
    struct addrinfo hints,*listp,*p;
    memset(&hints,0,sizeof(struct addrinfo));
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
    hints.ai_flags |= AI_NUMERICSERV;
    getaddrinfo(NULL,port,&hints,&listp);
    for(p=listp;p;p=p->ai_next){
        if((listenfd = socket(p->ai_family,p->ai_socktype,p->ai_protocol))<0)
            continue;
        setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,(const void*)&optval,sizeof(int));
        if(bind(listenfd,p->ai_addr,p->ai_addrlen)==0)
            break;
        close(listenfd);
    }
    freeaddrinfo(listp);
    if(!p) return -1;
    //建立成功,開始監聽
    //LISTENQ是等待的連線請求佇列
    if(listen(listenfd,LISTENQ)<0){
        close(listenfd);
        return -1;
    }
    return listenfd;
}

void echo(int connfd){
    size_t n;
    char buf[MAXLINE];
    rio_t rio;
    rio_readinitb(&rio,connfd);
    while((n = rio_readlineb(&rio,buf,MAXLINE)) != 0){
        printf("伺服器接受到: %d 位元組\n",(int)n);
        printf("%s\n",buf);
        rio_writen(connfd,buf,n);
    }
}

void command(){
    char buf[MAXLINE];
    if(!fgets(buf,MAXLINE,stdin))
        exit(0);
    printf("%s",buf);
}

void init_pool(int listenfd,pool* p){
    int i;
    p->maxi = -1;
    for(i=0;i<FD_SETSIZE;i++)
        p->clientfd[i]=-1;
    p->maxfd = listenfd;
    FD_ZERO(&p->read_set);
    FD_SET(listenfd,&p->read_set);
}

void add_client(int connfd,pool* p){
    int i;
    p->nready--;
    for(i=0;i<FD_SETSIZE;i++){
        if(p->clientfd[i]<0){
            p->clientfd[i] = connfd;
            rio_readinitb(&p->clientrio[i],connfd);

            FD_SET(connfd,&p->read_set);
            if(connfd > p->maxfd)
                p->maxfd = connfd;
            if(i > p->maxi)
                p->maxi = i;
            break;
        }
    }
    if(i == FD_SETSIZE)
        printf("add_client error: 客戶端過多");
}

void check_clients(pool* p){
    int i,connfd,n;
    char buf[MAXLINE];
    rio_t rio;
    for(i=0;i<=p->maxi && p->nready>0;i++){
        connfd = p->clientfd[i];
        rio = p->clientrio[i];

        if((connfd>0) && (FD_ISSET(connfd,&p->ready_set))){
            p->nready--;
            if((n = rio_readlineb(&rio,buf,MAXLINE))!=0){
                bytes_cnt += n;
                printf("伺服器收到 %d (總共%d) 位元組 在 檔案描述符%d ",n,bytes_cnt,connfd);
                rio_writen(connfd,buf,n);
                printf("內容:%s\n",buf);
            }
            else{
                close(connfd);
                FD_CLR(connfd,&p->read_set);
                p->clientfd[i] = -1;
            }
        }
    }
}

修改connect函式,以彙編指令的形式進入系統呼叫

通過gdb檢視connect函式傳參用到的暫存器

其中connect的系統呼叫號為0x2a

        asm volatile(
                     "movl %1,%%edi\n\t"
                     "movq %2,%%rsi\n\t"
                     "movl %3,%%edx\n\t"
                     "movl $0x2a,%%eax\n\t"
                     "syscall\n\t"
                     "movq %%rax,%0\n\t"
                     :"=m"(ret)
                     :"a"(clientfd),"b"(p->ai_addr),"c"(p->ai_addrlen)
        );

測試是否通過彙編正常呼叫connect函式,服務端監聽45678埠

客戶端試圖連線到45678埠

看來是可以正常觸發的,其中50962是客戶端程序的埠號

接下來重新靜態編譯客戶端程式 gcc clis.c -o ciis -static,如果不是靜態編譯,在qemu下是不能正常執行的,提示. /not found(缺少lib動態連結庫)

然後重新打包系統根目錄rootfs

開啟qemu,通過gdb在entry_SYSCALL_64處打斷點

進入home目錄後,執行./ciis localhost 1256

由於每次按下鍵盤都會觸發一箇中斷,每個中斷都會進入斷點,所以除錯的過程非常慢

進入entry_syscall後,會儲存暫存器的值到pt_regs結構體中

ENTRY(entry_SYSCALL_64)
    UNWIND_HINT_EMPTY
    /*
     * Interrupts are off on entry.
     * We do not frame this tiny irq-off block with TRACE_IRQS_OFF/ON,
     * it is too small to ever cause noticeable irq latency.
     */

    swapgs
    /* tss.sp2 is scratch space. */
    movq    %rsp, PER_CPU_VAR(cpu_tss_rw + TSS_sp2)
    SWITCH_TO_KERNEL_CR3 scratch_reg=%rsp
    movq    PER_CPU_VAR(cpu_current_top_of_stack), %rsp

    /* Construct struct pt_regs on stack */
    pushq    $__USER_DS                /* pt_regs->ss */
    pushq    PER_CPU_VAR(cpu_tss_rw + TSS_sp2)    /* pt_regs->sp */
    pushq    %r11                    /* pt_regs->flags */
    pushq    $__USER_CS                /* pt_regs->cs */
    pushq    %rcx                    /* pt_regs->ip */
GLOBAL(entry_SYSCALL_64_after_hwframe)
    pushq    %rax                    /* pt_regs->orig_ax */

進入entry_syscalll_64後,會儲存現在暫存器的值放入pt_regs結構體中

繼續單步執行,執行call dosyscallc_64函式

do_syscall64定義在common.c中

regs->ax = sys_call_table[nr](regs);

這句會查詢對應的系統呼叫號,然後傳入regs結構體,regs中儲存著各個暫存器的值,之後會把呼叫返回值傳給ax暫存器

最後會執行sysret指令恢復堆疊

USERGS_SYSERT64是個巨集展開,其擴充套件呼叫 swapgs 指令交換使用者 GS 和核心GS, sysret 指令執行從系統呼叫處理退出

至此,一段系統呼叫結束

總結

作業系統對於中斷處理流程一般為:

  1. 關中斷:CPU關閉中段響應,即不再接受其它外部中斷請求
  2. 儲存斷點:將發生中斷處的指令地址壓入堆疊,以使中斷處理完後能正確地返回。
  3. 識別中斷源:CPU識別中斷的來源,確定中斷型別號,從而找到相應的中斷服務程式的入口地址。
  4. 保護現場所:將發生中斷處理有關暫存器(中斷服務程式中要使用的暫存器)以及標誌暫存器的記憶體壓入堆疊。
  5. 執行中斷服務程式:轉到中斷服務程式入口開始執行,可在適當時刻重新開放中斷,以便允許響應較高優先順序的外部中斷。
  6. 恢復現場並返回:把“保護現場”時壓入堆疊的資訊彈回原暫存器,然後執行中斷返回指令(IRET),從而返回主程式繼續執行。

在核心初始化時,會執行trap_init函式,把中斷向量表拷貝到指定位置,syscall_64.c中定義著系統呼叫表sys_call_table,在cpu_init時完成初始化。執行int 0x80時,硬體找到在中斷描述符表中的表項,在自動切換到核心棧 (tss.ss0 : tss.esp0) 後根據中斷描述符的 segment selector 在 GDT / LDT 中找到對應的段描述符,從段描述符拿到段的基址,載入到 cs ,將 offset 載入到 eip。最後硬體將 ss / sp / eflags / cs / ip / error code 依次壓到核心棧。返回時,iret 將先前壓棧的 ss / sp / eflags / cs / ip 彈出,恢復使用者態呼叫時的暫存器上下文。

syscall則是64位系統中,為了加速系統呼叫通過引入新的 MSR 來存放核心態的程式碼和棧的段號和偏移量,從而實現快速跳轉。