Golang網路:核心API實現剖析(一)
這一章節我們將詳細描述網路關鍵API的實現,主要包括Listen、Accept、Read、Write等。 另外,為了突出關鍵流程,我們選擇忽略所有的錯誤。這樣可以使得程式碼看起來更為簡單。 而且我們只關注tcp協議實現,udp和unix socket不是我們關心的。
Listen
func Listen(net, laddr string) (Listener, error) { la, err := resolveAddr("listen", net, laddr, noDeadline) ...... switch la := la.toAddr().(type) { case *TCPAddr: l, err = ListenTCP(net, la) case *UnixAddr: ...... } ...... } // 對於tcp協議,返回的的是TCPListener func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) { ...... fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen") ...... return &TCPListener{fd}, nil } func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) { ...... return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline) } func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) { // 建立底層socket,設定屬性為O_NONBLOCK s, err := sysSocket(family, sotype, proto) ...... setDefaultSockopts(s, family, sotype, ipv6only) // 建立新netFD結構 fd, err = newFD(s, family, sotype, net) ...... if laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: // 呼叫底層listen監聽建立的套接字 fd.listenStream(laddr, listenerBacklog) return fd, nil case syscall.SOCK_DGRAM: ...... } } } // 最終呼叫該函式來建立一個socket // 並且將socket屬性設定為O_NONBLOCK func sysSocket(family, sotype, proto int) (int, error) { syscall.ForkLock.RLock() s, err := syscall.Socket(family, sotype, proto) if err == nil { syscall.CloseOnExec(s) } syscall.ForkLock.RUnlock() if err != nil { return -1, err } if err = syscall.SetNonblock(s, true); err != nil { syscall.Close(s) return -1, err } return s, nil } func (fd *netFD) listenStream(laddr sockaddr, backlog int) error { if err := setDefaultListenerSockopts(fd.sysfd) if lsa, err := laddr.sockaddr(fd.family); err != nil { return err } else if lsa != nil { // Bind繫結至該socket if err := syscall.Bind(fd.sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } } // 監聽該socket if err := syscall.Listen(fd.sysfd, backlog); // 這裡非常關鍵:初始化socket與非同步IO相關的內容 if err := fd.init(); err != nil { return err } lsa, _ := syscall.Getsockname(fd.sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil }
我們這裡看到了如何實現Listen。流程基本都很簡單,但是因為我們使用了非同步程式設計,因此,我們在Listen完該socket後,還必須將其新增到監聽佇列中,以後該socket有事件到來時能夠及時通知到。
對linux有所瞭解的應該都知道epoll,沒錯golang使用的就是epoll機制來實現socket事件通知。那我們看對一個監聽socket,是如何將其新增到epoll的監聽佇列中呢?
func (fd *netFD) init() error { if err := fd.pd.Init(fd); err != nil { return err } return nil } func (pd *pollDesc) Init(fd *netFD) error { // 利用了Once機制,保證一個程序只會執行一次 // runtime_pollServerInit: // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0 // JMP runtime·netpollServerInit(SB) serverInit.Do(runtime_pollServerInit) // runtime_pollOpen: // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0 // JMP runtime·netpollOpen(SB) ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) if errno != 0 { return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil }
這裡就是socket非同步程式設計的關鍵:
netpollServerInit()初始化非同步程式設計結構,對於epoll,該函式是netpollinit,且使用Once機制保證一個程序 只會初始化一次;
func netpollinit() { epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd >= 0 { return } epfd = epollcreate(1024) if epfd >= 0 { closeonexec(epfd) return } ...... }
netpollOpen則在socket被創建出來後將其新增到epoll佇列中,對於epoll,該函式被例項化為netpollopen。
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
OK,看到這裡,我們也就明白了,監聽一個套接字的時候無非就是傳統的socket非同步程式設計,然後將該socket新增到 epoll的事件監聽佇列中。
Accept
既然我們描述的重點的tcp協議,因此,我們看看TCPListener的Accept方法是怎麼實現的:
func (l *TCPListener) Accept() (Conn, error) {
c, err := l.AcceptTCP()
......
}
func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
......
fd, err := l.fd.accept()
......
// 返回給呼叫者一個新的TCPConn
return newTCPConn(fd), nil
}
func (fd *netFD) accept() (netfd *netFD, err error) {
// 為什麼對該函式加讀鎖?
if err := fd.readLock(); err != nil {
return nil, err
}
defer fd.readUnlock()
......
for {
// 這個accept是golang包裝的系統呼叫
// 用來處理跨平臺
s, rsa, err = accept(fd.sysfd)
if err != nil {
if err == syscall.EAGAIN {
// 如果沒有可用連線,WaitRead()阻塞該協程
// 後面會詳細分析WaitRead.
if err = fd.pd.WaitRead(); err == nil {
continue
}
} else if err == syscall.ECONNABORTED {
// 如果連線在Listen queue時就已經被對端關閉
continue
}
}
break
}
netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
......
// 這個前面已經分析,將該fd新增到epoll佇列中
err = netfd.init()
......
lsa, _ := syscall.Getsockname(netfd.sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
OK,從前面的程式設計事例中我們知道,一般在主協程中會accept新的connection,使用非同步程式設計我們知道,如果沒有 新連線到來,該協程會一直被阻塞,直到新連線到來有人喚醒了該協程。
一般在主協程中呼叫accept,如果返回值為EAGAIN,則呼叫WaitRead來阻塞當前協程,後續在該socket有事件到來時被喚醒,WaitRead以及喚醒過程我們會在後面仔細分析。
Read
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
return c.fd.Read(b)
}
func (fd *netFD) Read(p []byte) (n int, err error) {
// 為什麼對函式呼叫加讀鎖
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
// 這個又是幹嘛?
if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
for {
n, err = syscall.Read(int(fd.sysfd), p)
if err != nil {
n = 0
// 如果返回EAGIN,阻塞當前協程直到有資料可讀被喚醒
if err == syscall.EAGAIN {
if err = fd.pd.WaitRead(); err == nil {
continue
}
}
}
// 檢查錯誤,封裝io.EOF
err = chkReadErr(n, err, fd)
break
}
if err != nil && err != io.EOF {
err = &OpError{"read", fd.net, fd.raddr, err}
}
return
}
func chkReadErr(n int, err error, fd *netFD) error {
if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
return io.EOF
}
return err
}
Read的流程與Accept流程極其一致,閱讀起來也很簡單。相信不用作過多解釋,自己看吧。 需要注意的是每次Read不能保證可以讀到想讀的那麼多內容,比如緩衝區大小是10,而實際可能只讀到5,應用程式需要能夠處理這種情況。
Write
func (fd *netFD) Write(p []byte) (nn int, err error) {
// 為什麼這裡加寫鎖
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
// 這個是幹什麼?
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
// nn記錄總共寫入的資料量,每次Write可能只能寫入部分資料
for {
var n int
n, err = syscall.Write(int(fd.sysfd), p[nn:])
if n > 0 {
nn += n
}
// 如果陣列資料已經全部寫完,函式返回
if nn == len(p) {
break
}
// 如果寫入資料時被block了,阻塞當前協程
if err == syscall.EAGAIN {
if err = fd.pd.WaitWrite(); err == nil {
continue
}
}
if err != nil {
n = 0
break
}
// 如果返回值為0,代表了什麼?
if n == 0 {
err = io.ErrUnexpectedEOF
break
}
}
if err != nil {
err = &OpError{"write", fd.net, fd.raddr, err}
}
return nn, err
}
注意Write語義與Read不一樣的地方:
Write儘量將使用者緩衝區的內容全部寫入至底層socket,如果遇到socket暫時不可寫入,會阻塞當前協程; Read在某次讀取成功時立即返回,可能會導致讀取的資料量少於使用者緩衝區的大小; 為什麼會在實現上有此不同,我想可能read的優先順序比較高吧,應用程式可能一直在等著,我們不能等到資料一直讀完才返回,會阻塞使用者。 而寫不一樣,優先順序相對較低,而且使用者一般也不著急寫立即返回,所以可以將所有的資料全部寫入,而且這樣 也能簡化應用程式的寫法。
總結
上面我們基本說完了golang網路程式設計內的關鍵API流程,我們遺留了一個關鍵內容:當系統呼叫返回EAGAIN時,會 呼叫WaitRead/WaitWrite來阻塞當前協程,我會在接下來的章節中繼續分析。