golang高併發限流操作 ping / telnet
需求
當需要同時ping/telnet多個ip時,可以通過引入ping包/telnet包實現,也可以通過go呼叫cmd命令實現,不過後者呼叫效率較差,所以這裡選擇ping包和telnet包
還有就是高併發的問題,可以通過shell指令碼或者go實現高併發,所以我選擇的用go自帶的協程實現,但是如果要同時處理1000+個ip,考慮到機器的效能,需要ratelimit控制開闢的go協程數量,這裡主要寫一下我的建議和淌過的坑
ping
參考連結: https://github.com/sparrc/go-ping
import "github.com/sparrc/go-ping" import "time" func (p *Ping) doPing(timeout time.Duration,count int,ip string) (err error) { pinger,cmdErr := ping.NewPinger(ip) if cmdErr != nil { glog.Error("Failed to ping " + p.ipAddr) err = cmdErr return } pinger.Count = count pinger.Interval = time.Second pinger.Timeout = timeout // true的話,代表是標準的icmp包,false代表可以丟包類似udp pinger.SetPrivileged(false) // 執行 pinger.Run() // 獲取ping後的返回資訊 stats := pinger.Statistics() //延遲 latency = float64(stats.AvgRtt) // 標準的往返總時間 jitter = float64(stats.StdDevRtt) //丟包率 packetLoss = stats.PacketLoss return }
注意: pinger.Run() 這裡執行的時候是阻塞的,如果併發量大的時候,程式會卡死在這裡,所以當有高併發的需求時建議如下處理:
go pinger.Run()
time.Sleep(timeout)
telnet
package main import ( "github.com/reiver/go-telnet" ) func doTelnet(ip string,port int) { var caller telnet.Caller = telnet.StandardCaller address := ip + ":"+ strconv.Itoa(port) // DialToAndCall 檢查連通性並且呼叫 telnet.DialToAndCall(address,caller) } }
bug出現報錯:
lookup tcp/: nodename nor servname provided,or not known
解決:
修改string(port)為strconv.Itoa(port)
DialToAndCall這種方式telnet無法設定超時時間,預設的超時時間有1分鐘,所以使用DialTimeout這個方式實現telnet
import "net" func doTelnet(ip string,ports []string) map[string]string { // 檢查 emqx 1883,8083,8080,18083 埠 results := make(map[string]string) for _,port := range ports { address := net.JoinHostPort(ip,port) // 3 秒超時 conn,err := net.DialTimeout("tcp",address,3*time.Second) if err != nil { results[port] = "failed" } else { if conn != nil { results[port] = "success" _ = conn.Close() } else { results[port] = "failed" } } } return results }
shell高併發
本質就是讀取ip.txt檔案裡的ip,然後呼叫ping方法,實現高併發也是藉助&遍歷所有的ip然後同一交給作業系統去處理高併發
while read ip do { doPing(ip) } & done < ip.txt
go高併發限速
import ( "context" "fmt" "log" "time" "sync" "golang.org/x/time/rate" ) func Limit(ips []string)([]string,[]string,error) { //第一個引數是每秒鐘最大的併發數,第二個引數是桶的容量,第一次的時候每秒可執行的數量就是桶的容量,建議這兩個值都寫成一樣的 r := rate.NewLimiter(10,10) ctx := context.Background() wg := sync.WaitGroup{} wg.Add(len(ips)) lock := sync.Mutex{} var success []string var fail []string defer wg.Done() for _,ip:=range ips{ //每次消耗2個,放入一個,消耗完了還會放進去,如果初始是5個,所以這段程式碼再執行到第4次的時候筒裡面就空了,如果當前不夠取兩個了,本次就不取,再放一個進去,然後返回false err := r.WaitN(ctx,2) if err != nil { log.Fatal(err) } go func(ip string) { defer func() { wg.Done() }() err := doPing(time.Second,2,ip) lock.Lock() defer lock.Unlock() if err != nil { fail = append(fail,ip) return } else { success = append(success,ip) } }(ip) } // wait等待所有go協程結束 wg.wait() return success,fail,nil } func main() { ips := [2]string{"192.168.1.1","192.168.1.2"} success,err := Limit(ips) if err != nil { fmt.Printf("ping error") } }
這裡注意一個併發實現的坑,在for迴圈裡使用goroutine時要把遍歷的引數傳進去才能保證每個遍歷的引數都被執行,否則只能執行一次
(拓展)管道、死鎖
先看個例子:
func main() { go print() // 啟動一個goroutine print() } func print() { fmt.Println("*******************") }
輸出結果:
*******************
沒錯,只有一行,因為當go開闢一個協程想去執行print方法時,主函式已經執行完print並打印出來,所以goroutine還沒有機會執行程式就已經結束了,解決這個問題可是在主函式里加time.sleep讓主函式等待goroutine執行完,也可以使用WaitGroup.wait等待goroutine執行完,還有一種就是通道
通道分無緩衝通道和緩衝通道
無緩衝通道
無緩衝通道也就是定義長度為0的通道,存入一個數據,從無緩衝通道取資料,若通道中無資料,就會阻塞,還可能引發死鎖,同樣資料進入無緩衝通道,如果沒有其他goroutine來拿走這個資料,也會阻塞,記住無緩衝資料並不儲存資料
func main() { var channel chan string = make(chan string) go func(message string) { channel<- message // 存訊息 }("Ping!") fmt.Println(<-messages) // 取訊息 }
快取通道
顧名思義,快取通道可以儲存資料,goroutine之間不會發生阻塞,for迴圈讀取通道中的資料時,一定要判斷當管道中不存在資料時的情況,否則會發生死鎖,看個例子
channel := make(chan int,3) channel <- 1 channel <- 2 channel <- 3 // 顯式關閉通道 close(channel) for v := range channel { fmt.Println(v) // 如果現有資料量為0,跳出迴圈,與顯式關閉隧道效果一樣,選一個即可 if len(ch) <= 0 { break } }
但是這裡有個問題,通道中資料是可以隨時存入的,所以我們遍歷的時候無法確定目前的個數就是通道的總個數,所以推薦使用select監聽通道
// 建立一個計時通道 timeout := time.After(1 * time.Second) // 監聽3個通道的資料 select { case v1 := <- c1: fmt.Printf("received %d from c1",v1) case v2 := <- c2: fmt.Printf("received %d from c2",v2) case v3 := <- c3: fmt.Printf("received %d from c3",v3) case <- timeout: is_timeout = true // 超時 break } }
以上為個人經驗,希望能給大家一個參考,也希望大家多多支援我們。如有錯誤或未考慮完全的地方,望不吝賜教。