1. 程式人生 > 其它 >go語言ZeroMQ程式設計(帶聊天室的實現)

go語言ZeroMQ程式設計(帶聊天室的實現)

關於ZeroMQ(又稱ØMQ)有很多傳神的解說,我這裡給各位看官上一段比較凡爾賽的描述。

Underneath the brown paper wrapping of ZeroMQ’s socket API lies the world of messaging patterns.

"在ZeroMQ發黃的封皮下面,流淌的是全世界的訊息通訊模式",一翻譯就沒有文藝範兒了,總之口氣很大(敢自稱Zero就已經體現了作者的雄心),聲稱ZeroMQ涵蓋了所有的通訊模式,包括程序內/程序間通訊、TCP、廣播等等模式。

閒言少敘,書歸正傳。網上講go語言ZeroMQ程式設計的例子比較少,今天列一些常見的通訊模式,給初學者一個指引,希望大家少走彎路。

REQ-REP

即request-response模式,典型的client-server架構,client傳送request給server,server返回response給client,一個server可以同時跟多個client建立連線。

在ZeroMQ中凡是涉及到“1對多”的模式,都是“1”的那一方呼叫Bind()函式在某個埠上開始監聽,“多”的那一方呼叫Connect()函式請求跟“1”方建立連線。在普通的socket程式設計中,必須先啟動Server,再啟動Client,否則Client的Connect()函式會失敗從而退出程式。然而ZeroMQ的Connect()函式實際上是非同步的,如果如果Server端還沒有啟起來它不會以失敗告終,而是立即返回,你可以緊接著呼叫Send()函式。Send()又支援阻塞模式和非阻塞模式,阻塞模式會一直等待對方就緒(比如至少要等連線建立好吧)再發送資料,非阻塞模式會先把訊息存到本的緩衝佇列裡然後立即返回。Server端由於維護了多條連線,它會為每個連線都單獨建立一個緩衝佇列。

在req-rep模式中,每臺機器send(傳送資料)和revc(接收資料)必須交替呼叫,否則會報錯。

import (
	"fmt"
	"strconv"
	"time"

	zmq "github.com/pebbe/zmq4"
)

func startServer(port int) {
	//REP 表示server端
	socket, _ := zmq.NewSocket(zmq.REP)
	//Bind 繫結埠,並指定傳輸層協議
	socket.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))
	fmt.Printf("bind to port %d\n", port)
	defer socket.Close()

	for {
		//Recv和Send必須交替進行
		resp, _ := socket.Recv(0)     //0表示阻塞模式
		socket.Send("Hello "+resp, 0) //同步傳送
	}
}

func startClient(port int, msg string) {
	//REQ 表示client端
	socket, _ := zmq.NewSocket(zmq.REQ)
	//Connect 請求建立連線,並指定傳輸層協議
	socket.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
	fmt.Println("connect to server")
	defer socket.Close()

	for i := 0; i < 10; i++ {
		//Send和Recv必須交替進行
		socket.Send(msg, zmq.DONTWAIT) //非阻塞模式,非同步傳送(只是將資料寫入本地buffer,並沒有真正傳送到網路上)
		resp, _ := socket.Recv(0)
		fmt.Printf("receive [%s]\n", resp)
		time.Sleep(5 * time.Second)
	}
}

DEALER-ROUTER

DEALER-ROUTER跟REQ-REP模式比較接近,都是client-server構架,但是DEALER-ROUTER有它的2個特別之處:

  1. send()和revc()不需要交替呼叫
  2. ROUTER端的訊息需要分成多幀傳送,至少2幀,因為第一幀需要傳送對端的地址。訊息正文如果需要分成多幀傳送,那麼除後一幀外其他幀在send時都需要設定SNDMORE標識。對於DEALER發過來的一條訊息,ROUTER需要呼叫2次Recv(),第一次Recv()讀出來的是對端的地址
import (
	"fmt"
	"strconv"
	"time"

	zmq "github.com/pebbe/zmq4"
)

func router(port int) {
	//ROUTER 表示server端
	socket, _ := zmq.NewSocket(zmq.ROUTER)
	//Bind 繫結埠,並指定傳輸層協議
	socket.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))
	fmt.Printf("bind to port %d\n", port)
	defer socket.Close()

	for {
		//Send和Recv沒必要交替進行
		addr, _ := socket.RecvBytes(0) //接收到的第一幀表示對方的地址UUID
		resp, _ := socket.Recv(0)
		socket.SendBytes(addr, zmq.SNDMORE) //第一幀需要指明對方的地址,SNDMORE表示訊息還沒發完
		socket.Send("Hello", zmq.SNDMORE)   //如果不用SNDMORE表示這已經是最後一幀了,下一次Send就是下一段訊息的第一幀了,需要指明對方的地址
		socket.Send(resp, 0)
	}
}

func dealer(port int, msg string) {
	//DEALER 表示client端
	socket, _ := zmq.NewSocket(zmq.DEALER)
	//Connect 請求建立連線,並指定傳輸層協議
	socket.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
	fmt.Println("connect to server")
	defer socket.Close()

	for i := 0; i < 10; i++ {
		//Send和Recv沒必要交替進行
		socket.Send(msg, 0) //非阻塞模式,非同步傳送(只是將資料寫入本地buffer,並沒有真正傳送到網路上)
		resp1, _ := socket.Recv(0)
		resp2, _ := socket.Recv(0)
		fmt.Printf("receive [%s %s]\n", resp1, resp2)
		time.Sleep(5 * time.Second)
	}
}

SUB-PUB

PUB代表釋出者publisher,SUB代表訂閱者subscriber,釋出者傳送一條訊息,所有的訂閱者都會收到,是典型的廣播模式。

訂閱者只接收特定主題的訊息,釋出者在每條訊息前面加一個特定的字首表示主題。訂閱者通過呼叫SetSubscribe(prefix string)函式過濾出特定字首的訊息,如果給SetSubscribe()傳的是空字串則訂閱者不過濾任何訊息,全部接收。

顯然釋出者只能呼叫Send,訂閱者只能呼叫Recv。

import (
	"fmt"
	"strconv"
	"time"

	zmq "github.com/pebbe/zmq4"
)

func publish(port int, prefix string) {
	ctx, _ := zmq.NewContext()
	defer ctx.Term()

	//PUB 表示publisher角色
	publisher, _ := ctx.NewSocket(zmq.PUB)
	defer publisher.Close()
	//Bind 繫結埠,並指定傳輸層協議
	publisher.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))

	//publisher會把訊息傳送給所有subscriber,subscriber可以動態加入
	for i := 0; i < 5; i++ {
		//publisher只能呼叫send方法
		publisher.Send(prefix+"Hello my followers", 0)
		publisher.Send(prefix+"How are you", 0)
		fmt.Printf("loop %d send over\n", i+1)
		time.Sleep(10 * time.Second)
	}
	publisher.Send(prefix+"END", 0)
}

func subscribe(port int, prefix string) {
	//SUB 表示subscriber角色
	subscriber, _ := zmq.NewSocket(zmq.SUB)
	defer subscriber.Close()

	//Bind 繫結埠,並指定傳輸層協議
	subscriber.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
	subscriber.SetSubscribe(prefix) //只接收字首為prefix的訊息
	fmt.Printf("listen to port %d\n", port)

	for {
		//接收廣播
		if resp, err := subscriber.Recv(0); err == nil {
			resp = resp[len(prefix):] //去掉字首
			fmt.Printf("receive [%s]\n", resp)
			if resp == "END" {
				break
			}
		} else {
			fmt.Println(err)
			break
		}
	}
}

PUSH-PULL

跟PUB-SUB模式一樣,PUSH-PULL也只能實現訊息的單向傳輸,但是pusher會採用輪詢法選擇一個worker把訊息傳送給它,其他worker都收不到這條訊息,所以PUSH-PULL模式常用來做任務的分發。

import (
	"fmt"
	"strconv"
	"time"

	zmq "github.com/pebbe/zmq4"
)

func push(port int) {
	ctx, _ := zmq.NewContext()
	defer ctx.Term()

	//PUSH 表示pusher角色
	pusher, _ := ctx.NewSocket(zmq.PUSH)
	defer pusher.Close()
	//Bind 繫結埠,並指定傳輸層協議
	pusher.SetSndhwm(110)
	pusher.Bind("tcp://127.0.0.1:" + strconv.Itoa(port))

	//pusher把訊息送給一個puller(採用公平輪轉的方式選擇一個puller),puller可以動態加入
	for i := 0; i < 5; i++ {
		pusher.Send("Hello my followers", 0)
		pusher.Send("How are you", 0)
		fmt.Printf("loop %d send over\n", i+1)
		time.Sleep(5 * time.Second)
	}
	pusher.Send("END", 0)
}

func pull(port int) {
	//PULL 表示puller角色
	puller, _ := zmq.NewSocket(zmq.PULL)
	defer puller.Close()

	//Bind 繫結埠,並指定傳輸層協議
	puller.Connect("tcp://127.0.0.1:" + strconv.Itoa(port))
	fmt.Printf("listen to port %d\n", port)

	for {
		//接收廣播
		if resp, err := puller.Recv(0); err == nil {
			fmt.Printf("receive [%s]\n", resp)
			if resp == "END" {
				break
			}
		} else {
			fmt.Println(err)
			break
		}
	}
}

聊天室後端架構

最後來一個綜合練習,實現一個聊天室的後端。採用hub-client架構,client代表聊天室中發言的使用者,client通過DEALER-ROUTER模式把發言內容發給hub(hub不需要專門響應該client,所以不能採用REQ-REP模式,REQ-REP要求send和recv必須交替進行),hub通過PUB-SUB模式把訊息廣播給所有client。

import (
	"bufio"
	"encoding/base64"
	"fmt"
	"os"
	"strconv"
	"strings"

	zmq "github.com/pebbe/zmq4"
)

func hub(subPort, pubPort int) {
	//接收所有client的訊息
	socket, _ := zmq.NewSocket(zmq.ROUTER)
	socket.Bind("tcp://127.0.0.1:" + strconv.Itoa(subPort))
	fmt.Printf("bind to port %d\n", subPort)
	defer socket.Close()

	//把訊息廣播給所有client
	ctx, _ := zmq.NewContext()
	defer ctx.Term()
	publisher, _ := ctx.NewSocket(zmq.PUB)
	defer publisher.Close()
	publisher.Bind("tcp://127.0.0.1:" + strconv.Itoa(pubPort))

	for {
		//把接收到的client的訊息再廣播給所有client
		if addr, err := socket.RecvBytes(0); err == nil { //第一幀讀出對端的地址
			client := base64.StdEncoding.EncodeToString(addr) //用對端地址來標識訊息是誰發出來的
			if resp, err := socket.Recv(0); err == nil {
				if _, err := publisher.Send(client+"say: "+resp, 0); err != nil { //在訊息前加上傳送者的標識
					fmt.Println(err)
					break
				}
			} else {
				fmt.Println(err)
				break
			}
		} else {
			fmt.Println(err)
			break
		}
	}
}

func client(pubPort, subPort int) {
	//把訊息廣播給hub
	socket, _ := zmq.NewSocket(zmq.DEALER)
	socket.Connect("tcp://127.0.0.1:" + strconv.Itoa(pubPort))
	fmt.Println("connect to server")
	defer socket.Close()

	//訂閱hub的訊息
	subscriber, _ := zmq.NewSocket(zmq.SUB)
	defer subscriber.Close()
	subscriber.Connect("tcp://127.0.0.1:" + strconv.Itoa(subPort))
	subscriber.SetSubscribe("")

	go func() {
		for {
			//把接收到的client的訊息再廣播給所有client
			if resp, err := subscriber.Recv(0); err == nil {
				fmt.Println(resp)
			} else {
				fmt.Println(err)
				break
			}
		}
	}()

	fmt.Println("please type message")
	reader := bufio.NewReader(os.Stdin)
	for {
		text, _ := reader.ReadString('\n')
		text = strings.Replace(text, "\n", "", -1)
		socket.Send(text, 0)
	}
}
我的Go語言實戰課 Go語言實現工業級搜尋引擎