1. 程式人生 > 實用技巧 >單機百萬併發,golang 50行程式碼

單機百萬併發,golang 50行程式碼

本文首先介紹單機百萬併發的測試方法和測試結果,然後分析go語言50行程式碼實現的單機百萬併發網路伺服器背後的祕密

組網

採用6臺2核8G記憶體的雲主機作為client

採用1臺4核16G記憶體的雲主機作為server

client端設定

設定系統開啟的最大檔案數為20萬

ulimit -n 200000

修改埠可用範圍為1024到65535

echo 1024 65535 > /proc/sys/net/ipv4/ip_local_port_range

單臺client虛機建立18萬連線

配置單網絡卡多ip,每個網絡卡配置三個ip,啟動三個client程序,每個client程序指定不同的local ip建立6萬連線,總共18萬連線

server端配置

設定系統開啟的最大檔案數為100萬

ulimit -n 1000000

設定半連線佇列和全連線佇列長度

測試過程中出現了一個現象,客戶端建立了30000連線,服務端只建立了28570連線

經過排查,原因是:

1 全連線佇列滿了,如下圖,overflowed次數在增加

2 tcp_abort_on_overflow 為0,表示如果三次握手第三步的時候全連線佇列滿了那麼server扔掉client 發過來的ack(在server端認為連線還沒建立起來)

tcp_abort_on_overflow為 1,表示第三步的時候如果全連線佇列滿了,server傳送一個reset包給client,表示廢掉這個握手過程和這個連線(本來在server端這個連線就還沒建立起來)

解決方法:

設定半連線佇列長度為10000

echo 10000 >/proc/sys/net/ipv4/tcp_max_syn_backlog

設定全連線佇列長度為10000

echo 10000 >/proc/sys/net/core/somaxconn

參考 【轉】關於TCP 半連線佇列和全連線佇列 - sidesky - 部落格園

linux核心調優tcp_max_syn_backlog和somaxconn的區別-10931853-51CTO部落格

設定conntrack最大連線數

預設net.nf_conntrack_max 為 262144,設定為100萬

sysctl -w net.nf_conntrack_max=1000000

tcp最大連線數調優,可參考

Linux 核心優化-調大TCP最大連線數 - 簡書

最終測試結果

server建立起96萬連線

平時ss命令使用最多的是ss -anp,這裡需要注意在連線數非常大的時候,指定p引數命令慢的幾乎不可用,這裡只指定an引數

ss比netstat效能好,參考https://blog.csdn.net/hustsselbj/article/details/47438781

cpu和記憶體使用情況

cpu大概佔用2個核,記憶體3g

檢視cpu硬體資訊,cpu的頻率為2.4G

檢視cpu硬體資訊,參考 linux(centos)檢視cpu硬體資訊命令圖解教程 電腦維修技術網

客戶端、服務端程式碼實現

客戶端


package main

import (
	"flag"
	"fmt"
	"net"
	"os"
	"time"
)

var RemoteAddr *string
var ConcurNum *int
var LocalAddr *string

func init() {
	RemoteAddr = flag.String("remote-ip", "127.0.0.1", "ip addr of remote server")
	ConcurNum = flag.Int("concurrent-num", 100, "concurrent number of client")
	LocalAddr = flag.String("local-ip", "0.0.0.0", "ip addr of remote server")
}

func consume() {

	laddr := &net.TCPAddr{IP: net.ParseIP(*LocalAddr)}

	var dialer net.Dialer
	dialer.LocalAddr = laddr

	conn, err := dialer.Dial("tcp", *RemoteAddr+":8888")
	if err != nil {
		fmt.Println("dial failed:", err)
		os.Exit(1)
	}
	defer conn.Close()

	buffer := make([]byte, 512)

	for {
		_, err2 := conn.Read(buffer)
		if err2 != nil {
			fmt.Println("Read failed:", err2)
			return
		}

		//	fmt.Println("count:", n, "msg:", string(buffer))

	}

}

func main() {
	flag.Parse()
	for i := 0; i < *ConcurNum; i++ {
		go consume()
	}
	time.Sleep(3600 * time.Second)
}

服務端

package main

import (
	"fmt"
	"net"
	"os"
	"time"
)

var array []byte = make([]byte, 10)

func checkError(err error, info string) (res bool) {

	if err != nil {
		fmt.Println(info + "  " + err.Error())
		return false
	}
	return true
}

func Handler(conn net.Conn) {
	for {
		_, err := conn.Write(array)
		if err != nil {
			return
		}
		time.Sleep(10 * time.Second)
	}
}

func main() {

	for i := 0; i < 10; i += 1 {
		array[i] = 'a'
	}

	service := ":8888"
	tcpAddr, _ := net.ResolveTCPAddr("tcp4", service)
	l, _ := net.ListenTCP("tcp", tcpAddr)

	for {
		conn, err := l.Accept()
		if err != nil {
			fmt.Printf("accept error, err=%s\n", err.Error())
			os.Exit(1)
		}
		go Handler(conn)
	}

}

高效能網路程式設計的執行緒模型

TPC

TPC 是 Thread Per Connection 的縮寫,指每次有新的連線就新建一個執行緒去專門處理這個連線請求。

模型特點:

  • 採用阻塞式I/O模型獲取輸入資料
  • 每個連線都需要獨立的執行緒完成資料輸入,業務處理,資料返回的完整操作

存在的問題:

  • 併發數較大時,需要建立大量執行緒來處理連線,系統資源佔用較大

reactor

reactor模式的核心組成包括reactor和執行緒池。reactor負責監聽網路連線的IO是否可讀可寫,執行緒池負責具體業務的處理。在高併發的場景下,reactor採用epoll的效率非常高。

模型特點:

  • 採用非阻塞I/O,I/O多路複用
  • 採用執行緒池來處理業務

golang GPC模型

GPC 是 Goroutine Per Connection 的縮寫,指每次有新的連線就新啟動一個golang協程去專門處理這個連線請求。

模型特點:

  • 可採用阻塞IO的方式程式設計
  • 每個連線都需要獨立的協程完成資料輸入,業務處理,資料返回的完整操作

為什麼GPC可以支援單機百萬併發

GPC模型跟TPC模型看起來非常相似,為什麼GPC可以支援單機百萬併發呢?

GPC模型、TPC模型比較

  1. 棧大小:GPC模型中goroutine棧初始大小為4kB,棧的大小可以按需動態增加或減小。而TPC模型中執行緒預設棧大小為1MB。
  2. IO模型:GPC和TPC都是阻塞式程式設計。但是GPC模型底層是非阻塞IO,golang在語言層面將非阻塞IO包裝成了阻塞IO(底層實現是非阻塞IO未就緒時,讀操作返回EAGAIN,golang執行時系統將協程狀態設定為Wait,進行協程的切換)
  3. 協程、執行緒的切換: 協程的切換比執行緒切換要簡單的多,可參考linux作業系統筆記(程序)

GPC模型背後的祕密

GPC模型底層實現其實是reactor模型,golang在語言層面將這一模型封裝好,可以採用阻塞的方式編碼

GPC模型原始碼分析

golang原始碼版本為1.9.4

IO執行緒的原始碼實現

啟動一個執行緒執行sysmon函式

runtime/proc.go

// The main goroutine.
func main() {          
        g := getg()
                       
        // Racectx of m0->g0 is used only as the parent of the main goroutine.
        // It must not be used for anything else.
        g.m.g0.racectx = 0
                       
        // Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
        // Using decimal instead of binary GB and MB because
        // they look nicer in the stack overflow failure message.
        if sys.PtrSize == 8 {
                maxstacksize = 1000000000
        } else {    
                maxstacksize = 250000000
        }              
                       
        // Allow newproc to start new Ms.
        mainStarted = true 
                       
        systemstack(func() {
                //啟動執行緒,執行sysmon函式
                newm(sysmon, nil) 
        })             
      ...........

sysmon的實現
sysmon函式執行netpoll,獲得可讀寫的fd,將fd關聯的協程的狀態設定為ready

runtime/proc.go

func sysmon() {                                                                                                                                                                                
        // If a heap span goes unused for 5 minutes after a garbage collection,
        // we hand it back to the operating system.
        scavengelimit := int64(5 * 60 * 1e9)
                     
        if debug.scavenge > 0 {        
                // Scavenge-a-lot for testing.
                forcegcperiod = 10 * 1e6
                scavengelimit = 20 * 1e6
        }            
                     
        lastscavenge := nanotime()     
        nscavenge := 0                 
                     
        lasttrace := int64(0)          
        idle := 0 // how many cycles in succession we had not wokeup somebody
        delay := uint32(0)             
        for {        
                if idle == 0 { // start with 20us sleep...
                        delay = 20     
                } else if idle > 50 { // start doubling the sleep after 1ms...
                        delay *= 2     
                }    
                if delay > 10*1000 { // up to 10ms
                        delay = 10 * 1000
                }    
                usleep(delay)
                。。。。

                // poll network if not polled for more than 10ms
                lastpoll := int64(atomic.Load64(&sched.lastpoll))
                now := nanotime()   
                if lastpoll != 0 && lastpoll+10*1000*1000 < now {
                        atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
                        //netpoll中會執行epollWait,epollWait返回可讀寫的fd
                        //netpoll返回可讀寫的fd關聯的協程
                        gp := netpoll(false) // non-blocking - returns list of goroutines
                        if gp != nil { 
                                // Need to decrement number of idle locked M's
                                // (pretending that one more is running) before injectglist.
                                // Otherwise it can lead to the following situation:
                                // injectglist grabs all P's but before it starts M's to run the P's,
                                // another M returns from syscall, finishes running its G,
                                // observes that there is no work to do and no other running M's
                                // and reports deadlock.
                                incidlelocked(-1)
                                //將可讀寫fd關聯的協程狀態設定為ready
                                injectglist(gp)
                                incidlelocked(1)
                        }           
                } 
                。。。。。。
}

netpoll的實現
netpoll執行epollWait,獲取可讀寫的fd,返回可讀寫fd關聯的協程

runtime/netpoll_epoll.go

// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) *g {
        if epfd == -1 {
                return nil
        }
        waitms := int32(-1)
        if !block {
                waitms = 0
        }
        var events [128]epollevent
retry:  
        n := epollwait(epfd, &events[0], int32(len(events)), waitms)
        //      print("epoll wait\n")
        if n < 0 {
                if n != -_EINTR {
                        println("runtime: epollwait on fd", epfd, "failed with", -n)
                        throw("runtime: netpoll failed")
                }
                goto retry
        }
        var gp guintptr
        for i := int32(0); i < n; i++ {
                ev := &events[i]
                if ev.events == 0 {
                        continue
                }
                var mode int32
                if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
                        mode += 'r'
                }
                if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
                        mode += 'w'
                }
                if mode != 0 {
                        pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
                       //將pd關聯的協程加入到gp協程鏈上
                        netpollready(&gp, pd, mode)
                }
        }
        if block && gp == 0 {
                goto retry
        }
        return gp.ptr()
} 

injectglist的實現
injectglist將協程的狀態設定為ready狀態

runtime/proc.go

// Injects the list of runnable G's into the scheduler.
// Can run concurrently with GC.
func injectglist(glist *g) {
        if glist == nil {
                return  
        }               
        if trace.enabled {
                for gp := glist; gp != nil; gp = gp.schedlink.ptr() {
                        traceGoUnpark(gp, 0)
                }       
        }               
        lock(&sched.lock)
        var n int       
        for n = 0; glist != nil; n++ {
                gp := glist
                glist = gp.schedlink.ptr()
                //將waiting狀態的協程設定為runnable
                casgstatus(gp, _Gwaiting, _Grunnable)
                globrunqput(gp)
        }               
        unlock(&sched.lock)
        for ; n != 0 && sched.npidle != 0; n-- {
                startm(nil, false)
        }               
} 

服務端socket實現

net.ListenTCP的實現
ListenTCP呼叫socket函式,socket函式會通過系統呼叫建立socket、設定非阻塞、bind、listen

net/sock_posix.go

// socket returns a network file descriptor that is ready for
// asynchronous I/O using the network poller.
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
        //sysSocket函式會通過系統呼叫建立socket,並通過系統呼叫設定非阻塞
        s, err := sysSocket(family, sotype, proto)
        if err != nil {
                return nil, err 
        }   
        if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
                poll.CloseFunc(s)
                return nil, err 
        }   
        //為socket分配檔案描述符fd
        if fd, err = newFD(s, family, sotype, net); err != nil {
                poll.CloseFunc(s)
                return nil, err 
        }   
 
        // This function makes a network file descriptor for the
        // following applications:
        //  
        // - An endpoint holder that opens a passive stream
        //   connection, known as a stream listener
        //  
        // - An endpoint holder that opens a destination-unspecific
        //   datagram connection, known as a datagram listener
        //  
        // - An endpoint holder that opens an active stream or a
        //   destination-specific datagram connection, known as a
        //   dialer
        // - An endpoint holder that opens the other connection, such
        //   as talking to the protocol stack inside the kernel
        //
        // For stream and datagram listeners, they will only require
        // named sockets, so we can assume that it's just a request
        // from stream or datagram listeners when laddr is not nil but
        // raddr is nil. Otherwise we assume it's just for dialers or
        // the other connection holders.
        
        if laddr != nil && raddr == nil {
                switch sotype {
                case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
                        //listenStream會通過系統呼叫bind繫結socket地址,通過系統呼叫listen
                        //進行socket監聽,通過fd.init()函式將fd加入epoll
                        if err := fd.listenStream(laddr, listenerBacklog); err != nil {
                                fd.Close()
                                return nil, err
                        }
                        return fd, nil
                case syscall.SOCK_DGRAM:
                        if err := fd.listenDatagram(laddr); err != nil {
                                fd.Close()
                                return nil, err
                        }
                        return fd, nil
                }
        }
        if err := fd.dial(ctx, laddr, raddr); err != nil {
                fd.Close()
                return nil, err
        }
        return fd, nil

Accept的實現

net/fd_unix.go

func (fd *netFD) accept() (netfd *netFD, err error) {
        //pfd.Accept會執行accept系統呼叫,返回新的socket連線,
        //並設定新的socket連線為非阻塞
        d, rsa, errcall, err := fd.pfd.Accept()
        if err != nil {
                if errcall != "" {
                        err = wrapSyscallError(errcall, err)
                }   
                return nil, err 
        }   
        //為新的連線分配一個檔案描述符    
        if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
                poll.CloseFunc(d)
                return nil, err 
        }   
        //通過netfd.init(),將accept新返回的socket fd新增到epoll
        if err = netfd.init(); err != nil {
                fd.Close()
                return nil, err 
        }                                                                                                                                                                                      
        lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
        netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
        return netfd, nil 
} 

internal/poll/fd_unix.go

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
        if err := fd.readLock(); err != nil {
                return -1, nil, "", err 
        }   
        defer fd.readUnlock()
 
        if err := fd.pd.prepareRead(fd.isFile); err != nil {
                return -1, nil, "", err 
        }   
        for {
                //accept函式內部會執行accept系統呼叫
                //將返回的新的socket fd設定為非阻塞
                s, rsa, errcall, err := accept(fd.Sysfd)
                if err == nil {
                        return s, rsa, "", err 
                }   
                switch err {
                //socket全連線佇列為空
                case syscall.EAGAIN:
                        if fd.pd.pollable() {
                                //設定協程狀態為wait
                                if err = fd.pd.waitRead(fd.isFile); err == nil {
                                        continue
                                }   
                        }   
                case syscall.ECONNABORTED:
                        // This means that a socket on the listen
                        // queue was closed before we Accept()ed it;
                        // it's a silly error, so try again.
                        continue
                }   
                return -1, nil, errcall, err 
        }   
}

Read的實現

internal/poll/fd_unix.go

// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
        if err := fd.readLock(); err != nil {
                return 0, err 
        }   
        defer fd.readUnlock()
        if len(p) == 0 { 
                // If the caller wanted a zero byte read, return immediately
                // without trying (but after acquiring the readLock).
                // Otherwise syscall.Read returns 0, nil which looks like
                // io.EOF.
                // TODO(bradfitz): make it wait for readability? (Issue 15735)
                return 0, nil 
        }   
        if err := fd.pd.prepareRead(fd.isFile); err != nil {
                return 0, err 
        }   
        if fd.IsStream && len(p) > maxRW {
                p = p[:maxRW]
        }   
        for {
                //執行read系統呼叫
                n, err := syscall.Read(fd.Sysfd, p)
 
                if err != nil {
                        n = 0 
                        if err == syscall.EAGAIN && fd.pd.pollable() {
                                //socket fd沒有資料可讀,將協程狀態設定為wait
                                if err = fd.pd.waitRead(fd.isFile); err == nil {
                                        continue
                                }
                        }
                }
        
                err = fd.eofError(n, err)
                return n, err
        }
}

Write的實現

internal/poll/fd_unix.go

// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
        if err := fd.writeLock(); err != nil {
                return 0, err
        }       
        defer fd.writeUnlock()
        if err := fd.pd.prepareWrite(fd.isFile); err != nil {
                return 0, err
        }       
        var nn int
        for {   
                max := len(p)
                if fd.IsStream && max-nn > maxRW {
                        max = nn + maxRW
                }
                //執行write系統呼叫
                n, err := syscall.Write(fd.Sysfd, p[nn:max])
                if n > 0 {
                        nn += n
                }
                if nn == len(p) {
                        return nn, err
                }
                if err == syscall.EAGAIN && fd.pd.pollable() {
                        //socket fd不可寫,將協程狀態設定為wait
                        if err = fd.pd.waitWrite(fd.isFile); err == nil {
                                continue
                        }
                }
                if err != nil {
                        return nn, err
                }
                if n == 0 {
                        return nn, io.ErrUnexpectedEOF
                }
        }    
}         

GPC模型總結

1 新建socket、accept的socket都設定為非阻塞
2.新建socket、accept的socket的fd都加入epoll
3. Read、Write採用迴圈讀寫,如果返回EAGAIN,將協程狀態設定為wait
4. io執行緒定期執行sysmon,通過epollWait獲取可讀寫的fd,將fd關聯的協程設定為runable