併發程式設計 & 網路程式設計
go的併發程式設計
併發程式設計開發將一個過程按照並行演算法拆分為多個可以獨立執行的程式碼塊,從而充分利用多核和多處理器提高系統吞吐率
順序,並行與併發
- 順序是指發起執行的程式只能有一個
- 併發是指同時發起(同時處理)的程式可以有多個(單車道並排只能有一輛,可同時駛入路段多輛車)
- 並行是指同時執行(同時做)的程式可以有多個(多車道並排可以有多個車)
例程:
Go語言中每個併發執行的單元叫做Goroutine,使用go關鍵字後接函式呼叫來建立一個Goroutine
例程
併發的執行單元就是例程
go + 函式:啟動一個例程
主例程:main函式
工作例程:go+函式
說明
當主程序結束,工作例程也會隨著結束,不管例程有沒有執行完,所以得想辦法讓例程執行完,主程序再結束。
操作
- 寫一個time.sleep主程序執行完,等待多長時間,這樣給例程時間結束,不過也不能確定例程是否完成
- 定義計數訊號量:定義一共有多少例程,在工作例程執行完,讓訊號量產生,在主例程去等待,當主程序沒有收到訊號量產生,就一直等待例程的結束。
func PrintChars(name string) { for ch := 'A'; ch <= 'Z'; ch++ { fmt.Printf("%s: %c\n", name, ch) runtime.Gosched() // 結束cpu的佔用,所以當前迴圈會退出,等下個程式執行 } } func main() { go PrintChars("1") go PrintChars("2") // 作為例程,當主程序結束,例程也就跟著結束 PrintChars("3") // 做為主程序 time.Sleep(time.Second * 3) // 主程序結束,子程序也結束,為了防止子程序沒有結束,主程序等待幾秒 }
計數訊號量
用來等待例程結束
&sync.WaitGroup{}
func PrintChars(name string, group *sync.WaitGroup) { for ch := 'A'; ch <= 'Z'; ch++ { fmt.Printf("%s: %c\n", name, ch) runtime.Gosched() // 結束cpu的佔用,所以當前迴圈會退出,等下個程式執行 } group.Done() // 產生一個訊號量 } func main() { group := &sync.WaitGroup{} // 宣告一個函式 group.Add(2) // 定義有2個訊號量 go PrintChars("1", group) go PrintChars("2", group) PrintChars("3", group) group.Wait() // 當定義的2個訊號量沒有出現就一直等待,出現就結束 }
匿名例程
因為閉包使用函式外變數,當例程執行時外部變數已經發生變化,導致答應內容不正確,可在建立例程時通過函式傳遞引數(值拷貝)方式避免
func main() {
group := &sync.WaitGroup{} // 宣告一個變數
n := 2
group.Add(n) // 定義有2個訊號量
for i := 1; i <= n; i++ {
go func(id int) {
for ch := 'A'; ch <= 'Z'; ch++ {
fmt.Printf("%d:%d:%c\n", id, i, ch)
runtime.Gosched()
}
group.Done()
}(i) // 匿名一定要傳參,否則例程執行比for迴圈晚,for為2了,go才開始執行,那閉包中的i獲取的值巨不正確
}
group.Wait()
}
- 小案例
之所以這裡結果不固定,是因為並不是順序執行,使用例程隨機執行,所以影響結果。
func main() {
var counter int
group := &sync.WaitGroup{} // 建立變數
incr := func() { // 建立匿名函式
defer group.Done()
for i := 0; i <= 100; i++ {
counter++
runtime.Gosched()
}
}
desc := func() {
defer group.Done()
for i := 0; i <= 100; i++ {
counter--
runtime.Gosched()
}
}
for i := 0; i < 10; i++ { // 迴圈1-10
group.Add(2) // 定義2個例程
go incr() // 例程1執行函式
go desc() // 例程2執行函式
}
group.Wait() // 等待2個例程執行完成
fmt.Println(counter) // -8 結果不固定
}
管道
chan型別為管道型別
var 變數名 chan string: string定義管道型別
初始化:var 變數名 chan string = make(chan string)
func main() {
var notice chan string = make(chan string) // 定義管道,並設定管道型別為string
fmt.Printf("%T %v\n", notice, notice) // chan string 0xc0000160c0
fmt.Println(len(channel)) // 獲取管道中元素數量,當全部讀取後,管道為0
channel := make(chan string, 2)
channel <- "2"
fmt.Println(<-channel)
close(channel) // 關閉管道後能讀取管道,但是不能寫入
channel <- "2" // 報錯
// 遍歷管道中的元素,必須close,不然讀管道會卡死
channel := make(chan string, 2)
channel <- "2"
channel <- "3"
close(channel)
for ch := range channel { // 這裡會一直讀管道,會導致死鎖,需要某個例程能結束管道
fmt.Println(len(channel))
}
}
案例①
func main() {
var notice chan string = make(chan string) // 定義管道,並設定管道型別為string
fmt.Printf("%T %v\n", notice, notice) // chan string 0xc0000160c0
go func() {
fmt.Println("go start")
notice <- "xxx" // 這裡發訊息到管道,主例程收到訊息,才會執行最後的end
fmt.Println("go end")
}()
fmt.Println("start") // 主例程執行
fmt.Println(<-notice) // 這裡如果沒有收到訊息會阻塞,所以會執行匿名函式
fmt.Println("end") // 保證了end最後執行,不會在例程前面執行
}
案例②
- 一次的結果完成,傳送訊息一次
- 遍歷10次管道,當收到管道1次訊息,說明遍歷1次成功
func PrintChars(name int, channel chan int) {
for ch := 'A'; ch <= 'Z'; ch++ {
fmt.Printf("%d: %c\n", name, ch)
runtime.Gosched()
}
channel <- 0
}
func main() {
var channel chan int = make(chan int)
for i := 0; i < 10; i++ {
go PrintChars(i, channel)
}
for i := 0; i < 10; i++ {
<-channel
}
fmt.Println("over")
}
只讀寫管道
雖然讀管道和寫管道不是同一個名稱,不過底層都是用的一個管道,所以寫管道最終將資料寫到channel中,讀管道通過channel去讀取。
var channel chan int = make(chan int, 5)
// 只讀管道
var rchannel <-chan int = channel // 只讀管道和只寫管道底層還是用的同一個管道
// 只寫管道
var wchannel chan<- int = channel
wchannel <- 1
fmt.Println(<-rchannel) // 1
- 例程只讀寫管道
var channel chan int = make(chan int, 5)
go func(channel <-chan int) { // 定義管道為只讀管道
fmt.Println(<-channel) // 0
}(channel) // 將全域性的管道傳入
go func(channel chan<- int) { // 定義管道為只寫管道
channel <- 0 // 寫入0到管道中,底層到了channel中,能被上面的例程讀取
}(channel) // 將全域性管道傳入
time.Sleep(time.Second * 3)
Select Case
類似於switch case,從任意一個通道讀取資料都結束
func main() {
channel01 := make(chan int, 2)
channel02 := make(chan int, 2)
go func() {
channel01 <- 1
}()
go func() {
channel02 <- 2
}()
select { // select,case實現隨意哪個通道能讀取資料都可以
case <-channel01:
fmt.Println("channel01")
case <-channel02:
fmt.Println("channel02")
}
}
- select case 遍歷元素儲存到切片
func main() {
channel := make(chan int, 1)
slice := make([]int, 10)
// 隨機寫入數字到通道,通過切片儲存
for i := 0; i < 10; i++ {
select {
case channel <- 0:
case channel <- 1:
case channel <- 2:
case channel <- 3:
case channel <- 4:
case channel <- 5:
case channel <- 6:
case channel <- 7:
}
slice[i] = <-channel // 不能append儲存元素,切片已經被初始化長度為10,前10都是0
}
fmt.Println(slice) // [3 5 6 5 5 6 5 2 0 3]
}
- 練習
設定超時時間為3秒,一旦3秒沒有執行完畢,就認為超時,不再執行,否則,執行改程式碼
// 方法一:
func main() {
rand.Seed(time.Now().Unix())
// 生成兩個管道
result := make(chan int)
timeout := make(chan int)
// 建立例程,建立1-10的隨機數,休眠隨機數秒數再寫入資訊到管道
go func() {
interval := time.Duration(rand.Int()%10) * time.Second
fmt.Println(interval)
time.Sleep(interval)
result <- 0
}()
// 建立例程,等待3秒再寫入訊息到管道,如果上面的例程大於三秒,此例程就優先執行
go func() {
time.Sleep(3 * time.Second)
timeout <- 0
}()
// 優先接受到管道資訊的輸出
select {
case <-result:
fmt.Println("執行完成")
case <-timeout:
fmt.Println("執行超時")
}
}
// 方法二:
// 使用time.After方法返回只讀型別的通道,指定時間為3秒
func main() {
rand.Seed(time.Now().Unix())
result := make(chan int)
// 建立例程,建立1-10的隨機數,休眠隨機數秒數再寫入資訊到管道
go func() {
interval := time.Duration(rand.Int()%10) * time.Second
fmt.Println(interval)
time.Sleep(interval)
result <- 0
}()
// 優先接受到管道資訊的輸出
select {
case <-result:
fmt.Println("執行完成")
case <-time.After(3 * time.Second): // time.After返回只讀管道,但只讀一次
fmt.Println("執行超時")
}
}
after & tick
time.After:返回通道型別,只能讀一次
time.Tick: 返回通道型別,能多次讀取,但有指定時間
func main() {
fmt.Println(time.Now())
fmt.Println("after")
channel := time.After(3 * time.Second)
fmt.Println(<-channel) // time.After只能讀一次,不能再次讀取
fmt.Println("tick")
ticker := time.Tick(3 * time.Second)
fmt.Println(<-ticker) // time.Tick可以讀取多次,指定秒數後可以再次讀取
fmt.Println(<-ticker)
fmt.Println(<-ticker)
for now := range ticker { // 實現迴圈,每隔3秒執行一次
fmt.Println(now)
}
}
鎖
- sync.Metex 互斥鎖
func main() {
var counter int
group := &sync.WaitGroup{}
// 使用鎖機制,保證搶佔到鎖和釋放鎖裡面的操作必須執行完
lock := &sync.Mutex{} // 使用sync包裡面的互斥鎖方法
incr := func() {
defer group.Done()
for i := 0; i <= 100; i++ {
lock.Lock() // 加鎖
counter++
lock.Unlock() // 釋放鎖
runtime.Gosched()
}
}
desc := func() {
defer group.Done()
for i := 0; i <= 100; i++ {
lock.Lock() // 加鎖
counter--
runtime.Gosched()
lock.Unlock() // 釋放鎖
}
}
for i := 0; i < 10; i++ {
group.Add(2)
go incr()
go desc()
}
group.Wait()
fmt.Println(counter) // 0
}
- 原子性
- automic.AddInt32(&Int32,int)
- 作用有限,針對int32和int64
func main() {
var counter int32 = 0
group := &sync.WaitGroup{}
incr := func() {
defer group.Done()
for i := 0; i <= 100; i++ {
atomic.AddInt32(&counter, 1) // 使用automic保證原子性,傳入指標,和加1
runtime.Gosched()
}
}
decr := func() {
defer group.Done()
for i := 0; i <= 100; i++ {
atomic.AddInt32(&counter, -1)
}
}
for i := 0; i <= 10; i++ {
group.Add(2)
go incr()
go decr()
}
group.Wait()
fmt.Println(counter) // 0
}
- 讀寫鎖
RWMutex
Lock: 鎖
RLock:讀鎖
ULock:釋放鎖
RULock:讀釋放鎖
場景:
當資料有事務正在寫操作,那麼讀的資料會存在髒資料,所以需要針對不同場景加鎖
RLock --> Rlock: 兩個讀鎖,不阻塞,使用URLock釋放鎖
- 死鎖
產生死鎖原因:
- 超過管道中指定數量
- 有寫管道,但沒讀管道
- 有讀管道,沒有寫管道
func main() {
channel := make(chan string, 2) // 定義管道中的數量
channel <- "x" // 寫管道
channel <- "y"
fmt.Println(<-channel) // 讀管道
fmt.Println(<-channel)
}
sync
sync.Once
就算for迴圈10次,但定義了sync.Once也只執行1次
func main() {
// 使用sync.Once函式只執行一次
var once sync.Once
for i := 0; i < 10; i++ {
once.Do(func() {
fmt.Println(i) // 0
})
}
}
sync.Map
實現儲存為map
store儲存
load讀取
delete刪除
func main() {
var users sync.Map
users.Store(10, "tcy")
users.Store(20, "twg")
if value, ok := users.Load(10); ok { // 因為是map型別,所以需要將key傳入
fmt.Println(value, ok) // tcy true 只有這個結果可以輸出
}
if value, ok := users.Load(30); ok { // 沒有30的key,判斷false不執行
fmt.Println(value)
}
users.Delete(10) // 刪除key為10的map
if value, ok := users.Load(10); ok {
fmt.Println(value)
}
}
sync.pool
建立物件池,避免重複建立物件銷燬物件
func main() {
// 物件池
pool := sync.Pool{ // 為結構體型別
New: func() interface{} { // 結構體名為new,值為函式,返回介面
fmt.Println("new")
return 1
},
}
x := pool.Get() // 因為首次池子沒有物件,這裡建立物件
fmt.Println(x) // 1
pool.Put(x) // 使用完成放回池子
x = pool.Get() // 再次使用,並沒有建立
x = pool.Get() // 但再次使用就需要建立,因為put放回去的已經被上個get使用,結果為new
}
runtime
執行時
func main() {
fmt.Println(runtime.GOROOT()) // 檢視goroot目錄
fmt.Println(runtime.NumCPU()) // 檢視cpu的數量,作用於限制一個程序能使用多少個cpu
fmt.Println(runtime.GOMAXPROCS(1)) // 設定系統只能使用1核
fmt.Println(runtime.NumGoroutine()) // 例程數量
}
點對點聊天
Socket: 套接字,是作業系統提供TCP和UDP的操作API,應用程式使用套接字時,可以設定對端的IP地址,埠號,實現資料的傳送和接受。
net模組:
net包提供了對socket程式設計的支援,socket程式設計分服務端和客戶端程式設計,對服務端可以使用函式Listen建立監聽,對於客戶端使用函式Dial連線伺服器
TCP服務端開發:
- 建立監聽服務
- 迴圈接受客戶端連線
- 資料處理(向客戶端傳送資料/讀客戶端傳送的資料)
- 關閉監聽服務
TCP客戶端開發:
- 連線伺服器
- 資料處理(向服務端傳送資料/讀服務端傳送的資料)
- 關閉連線
- 建立簡單服務端,客戶端一次性連線(簡易版)
// server
func main() {
addr := "0.0.0.0:9999"
listen, err := net.Listen("tcp", addr) // 服務端建立監聽
if err != nil {
fmt.Println(err)
os.Exit(-1) // 出現錯誤退出
}
fmt.Println("Listen:", addr)
client, err := listen.Accept() // 獲取客戶端連線
client.Write([]byte(time.Now().Format("2006-01-02 15:04:04"))) // 給客戶端傳送資料
client.Close() // 關閉客戶端
listen.Close() // 關閉服務端
}
// client
func main() {
// 客戶端讀取資料
addr := "127.0.0.1:9999"
conn, err := net.Dial("tcp", addr) // 客戶端建立連線伺服器
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
// 連線成功後,讀取服務端傳送的資料
bytes := make([]byte, 1024)
if n, err := conn.Read(bytes); err == nil {
fmt.Println(string(bytes[:n])) // 使用string轉換位元組切片為字串
} else {
fmt.Println(err)
}
conn.Close() // 關閉客戶端連線
}
- 客戶端與服務端聊天,持續互動,完整版
服務端:
先寫資料到客戶端,再讀取客戶端返回的資料
客戶端:
先讀取服務端傳送的資訊,再發送資料到服務端
// 服務端
func main() {
addr := "0.0.0.0:9999"
listen, err := net.Listen("tcp", addr) // 服務端建立Listen監聽
if err != nil {
fmt.Println(err)
os.Exit(-1) // 出現錯誤退出
}
fmt.Println("Listen:", addr)
defer listen.Close() // 關閉服務端,延遲關閉服務端
input := bufio.NewScanner(os.Stdin) // 將當前控制檯輸入為bufio
for {
client, err := listen.Accept() // 獲取客戶端連線
if err == nil {
reader := bufio.NewReader(client) // 將客戶端請求放入到bufio中
writer := bufio.NewWriter(client)
fmt.Printf("客戶端%s連線成功\n", client.RemoteAddr())
for {
// 先寫資料再讀客戶端傳送過來的資料
fmt.Print("請輸入需要給客戶端傳送的資料:q為退出 ")
input.Scan() // 掃描上面輸入的結果
test := input.Text() // 獲取結果賦值給變數test
if test == "q" { // 如果服務端寫入q表示退出
break
}
_, err := writer.WriteString(test + "\n") // 將控制檯寫入資料通過bufio傳送至客戶端
writer.Flush() // flush表示傳送操作
if err != nil {
break
}
line, err := reader.ReadString('\n') // 讀取客戶端傳送過來的資料
if err != nil {
break
}
fmt.Println("客戶端:", strings.TrimSpace(line))
}
fmt.Printf("客戶端%s關閉", client.RemoteAddr()) // 獲取客戶端地址
client.Close() // 關閉獲取的客戶端
}
}
}
// 客戶端
func main() {
addr := "127.0.0.1:9999"
conn, err := net.Dial("tcp", addr) // 與服務端建立連線
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
defer conn.Close() // 延遲關閉與服務端的連線
reader := bufio.NewReader(conn) // 將服務端連線資訊存放在bufio中
writer := bufio.NewWriter(conn)
input := bufio.NewScanner(os.Stdin) // 將控制檯輸入資訊存放在bufio中
for {
line, err := reader.ReadString('\n') // 讀取服務端傳送過來的資料
if err != nil {
break
}
fmt.Println("伺服器:", line)
fmt.Print("請輸入 q為退出:")
input.Scan() // 與服務端交互發送至服務端
if input.Text() == "q" {
break
}
_, err = writer.WriteString(input.Text() + "\n") // 將控制檯資訊傳送至服務端
writer.Flush()
if err != nil {
break
}
}
}