1. 程式人生 > >Kafka兩級排程實現分散式協調任務分配Golang版

Kafka兩級排程實現分散式協調任務分配Golang版

背景

基於Kafka訊息佇列的兩級協調排程架構

Kafka內部為了協調內部的consumer和kafka connector的工作實現了一個複製協議, 主要工作分為兩個步驟:

  1. 通過worker(consumer或connect)獲取自身的topic offset等元資料資訊,交給kafka的broker完成Leader/Follower選舉
  2. worker Leader節點獲取到kafka儲存的partation和member資訊,來進行二級分配,實現結合具體業務的負載均衡分配

從功能實現上兩級排程,一級排程負責將Leader選舉,二級排程則是worker節點完成每個成員的任務的分配

主要是學習這種架構設計思想,雖然這種方案場景非常有限

基於訊息佇列實現分散式協調設計

一級協調器設計:一級協調器主要是指的Coordinator部分,通過記錄成員的元資料資訊,來進行Leader選舉,比如根據offset的大小來決定誰是Leader 二級協調器設計:二級協調器主要是指的Leader任務分配部分, worker節點獲取到所有的任務和節點資訊,就可以根據合適的演算法來進行任務的分配,最終廣播到訊息佇列

值得我們學習的地方, 通常在kafka這種場景下,如果要針對不同的業務實現統一排程,還是蠻麻煩的, 所以比如將具體任務的分配工作從架構中遷移出去, 在broker端只負責通用層的Leader選舉即可, 將具體業務的分配工作,從主業務架構分離出去,由具體業務去實現

程式碼實現

核心設計

根據設計,我們抽象出: MemoryQueue、Worker、 Coordinator、GroupRequest、GroupResponse、Task、Assignment集合核心元件

MemoryQueue: 模擬訊息佇列實現訊息的分發,充當kafka broker角色 Worker: 任務執行和具體業務二級協調演算法 Coordinator: 位於訊息佇列內部的一個協調器,用於Leader/Follower選舉 Task: 任務 Assignment: Coordnator根據任務資訊和節點資訊構建的任務分配結果 GroupRequest: 加入叢集請求 GroupResponse: 響應資訊

MemoryQueue

核心資料結構

// MemoryQueue 記憶體訊息佇列
type MemoryQueue struct {
	done             chan struct{}
	queue            chan interface{}
	wg               sync.WaitGroup
	coordinator      map[string]*Coordinator
	worker           map[string]*Worker
}

其中coordinator用於標識每個Group組的協調器,為每個組都建立一個分配器

節點加入叢集請求處理

MemoryQueue 接收事件型別,然後根據事件型別進行分發,如果是GroupRequest事件,則分發給handleGroupRequest進行處理 handleGroupRequest內部先獲取對應group的coordinator,然後根據當前資訊buildGroupResponse發回訊息佇列

事件分發處理

func (mq *MemoryQueue) handleEvent(event interface{}) {
	switch event.(type) {
	case GroupRequest:
		request := event.(GroupRequest)
		mq.handleGroupRequest(&request)
	case Task:
		task := event.(Task)
		mq.handleTask(&task)
	default:
		mq.Notify(event)
	}
	mq.wg.Done()
}

加入Group組請求處理

其中Coordnator會呼叫自己的getLeaderID方法,來根據當前組內的各成員的資訊來選舉一個Leader節點

// getGroupCoordinator 獲取指定組的協調器
func (mq *MemoryQueue) getGroupCoordinator(group string) *Coordinator {
	coordinator, ok := mq.coordinator[group]
	if ok {
		return coordinator
	}
	coordinator = NewCoordinator(group)
	mq.coordinator[group] = coordinator
	return coordinator
}

func (mq *MemoryQueue) handleGroupRequest(request *GroupRequest) {
	coordinator := mq.getGroupCoordinator(request.Group)
	exist := coordinator.addMember(request.ID, &request.Metadata)
	// 如果worker之前已經加入該組, 就不做任何操作
	if exist {
		return
	}
	// 重新構建請求資訊
	groupResponse := mq.buildGroupResponse(coordinator)
	mq.send(groupResponse)
}

func (mq *MemoryQueue) buildGroupResponse(coordinator *Coordinator) GroupResponse {
	return GroupResponse{
		Tasks:       coordinator.Tasks,
		Group:       coordinator.Group,
		Members:     coordinator.AllMembers(),
		LeaderID:    coordinator.getLeaderID(),
		Generation:  coordinator.Generation,
		Coordinator: coordinator,
	}
}

Coordinator

核心資料結構

// Coordinator 協調器
type Coordinator struct {
	Group      string
	Generation int
	Members    map[string]*Metadata
	Tasks      []string
	Heartbeats map[string]int64
}

Coordinator內部通過Members資訊,來儲存各個worker節點的元資料資訊, 然後Tasks儲存當前group的所有任務, Heartbeats儲存workerd額心跳資訊, Generation是一個分代計數器,每次節點變化都會遞增

通過offset選舉Leader

通過儲存的worker的metadata資訊,來進行主節點的選舉

// getLeaderID 根據當前資訊獲取leader節點
func (c *Coordinator) getLeaderID() string {
	leaderID, maxOffset := "", 0
	// 這裡是通過offset大小來判定,offset大的就是leader, 實際上可能會更加複雜一些
	for wid, metadata := range c.Members {
		if leaderID == "" || metadata.offset() > maxOffset {
			leaderID = wid
			maxOffset = metadata.offset()
		}
	}
	return leaderID
}

Worker

核心資料結構

// Worker 工作者
type Worker struct {
	ID          string
	Group       string
	Tasks       string
	done        chan struct{}
	queue       *MemoryQueue
	Coordinator *Coordinator
}

worker節點會包含一個coordinator資訊,用於後續向該節點進行心跳資訊的傳送

分發請求訊息

worker接收到不同的事件型別,根據型別來進行處理, 其中handleGroupResponse負責接收到服務端Coordinator響應的資訊,裡面會包含leader節點和任務資訊,由worker 來進行二級分配, handleAssign則是處理分配完後的任務資訊

// Execute 接收到分配的任務進行請求執行
func (w *Worker) Execute(event interface{}) {
	switch event.(type) {
	case GroupResponse:
		response := event.(GroupResponse)
		w.handleGroupResponse(&response)
	case Assignment:
		assign := event.(Assignment)
		w.handleAssign(&assign)
	}
}

GroupResponse根據角色型別進行後續業務邏輯

GroupResponse會將節點分割為兩種:Leader和Follower, Leader節點接收到GroupResponse後需要繼續進行分配任務,而Follower則只需要監聽事件和傳送心跳

func (w *Worker) handleGroupResponse(response *GroupResponse) {
	if w.isLeader(response.LeaderID) {
		w.onLeaderJoin(response)
	} else {
		w.onFollowerJoin(response)
	}
}

Follower節點

Follower節點進行心跳傳送

// onFollowerJoin 當前角色是follower
func (w *Worker) onFollowerJoin(response *GroupResponse) {
	w.Coordinator = response.Coordinator
	go w.heartbeat()
}
// heartbeat 傳送心跳
func (w *Worker) heartbeat() {
	// timer := time.NewTimer(time.Second)
	// for {
	// 	select {
	// 	case <-timer.C:
	// 		w.Coordinator.heartbeat(w.ID, time.Now().Unix())
	// 		timer.Reset(time.Second)
	// 	case <-w.done:
	// 		return
	// 	}
	// }
}

Leader節點

Leader節點這個地方我將排程分配分為兩個步驟: 1)通過節點數和任務數將任務進行分片 2)將分片後的任務分配給各個節點,最終傳送回佇列

// onLeaderJoin 當前角色是leader, 執行任務分配併發送mq
func (w *Worker) onLeaderJoin(response *GroupResponse) {
	fmt.Printf("Generation [%d] leaderID [%s]\n", response.Generation, w.ID)
	w.Coordinator = response.Coordinator
	go w.heartbeat()
	// 進行任務分片
	taskSlice := w.performAssign(response)

	// 將任務分配給各個worker
	memerTasks, index := make(map[string][]string), 0
	for _, name := range response.Members {
		memerTasks[name] = taskSlice[index]
		index++
	}

	// 分發請求
	assign := Assignment{LeaderID: w.ID, Generation: response.Generation, result: memerTasks}
	w.queue.send(assign)
}

// performAssign 根據當前成員和任務數
func (w *Worker) performAssign(response *GroupResponse) [][]string {

	perWorker := len(response.Tasks) / len(response.Members)
	leftOver := len(response.Tasks) - len(response.Members)*perWorker

	result := make([][]string, len(response.Members))

	taskIndex, memberTaskCount := 0, 0
	for index := range result {
		if index < leftOver {
			memberTaskCount = perWorker + 1
		} else {
			memberTaskCount = perWorker
		}
		for i := 0; i < memberTaskCount; i++ {
			result[index] = append(result[index], response.Tasks[taskIndex])
			taskIndex++
		}
	}
	

測試資料

啟動一個佇列,然後加入任務和worker,觀察分配結果

	// 構建佇列
	queue := NewMemoryQueue(10)
	queue.Start()

	// 傳送任務
	queue.send(Task{Name: "test1", Group: "test"})
	queue.send(Task{Name: "test2", Group: "test"})
	queue.send(Task{Name: "test3", Group: "test"})
	queue.send(Task{Name: "test4", Group: "test"})
	queue.send(Task{Name: "test5", Group: "test"})

	// 啟動worker, 為每個worker分配不同的offset觀察是否能將leader正常分配
	workerOne := NewWorker("test-1", "test", queue)
	workerOne.start(1)
	queue.addWorker(workerOne.ID, workerOne)

	workerTwo := NewWorker("test-2", "test", queue)
	workerTwo.start(2)
	queue.addWorker(workerTwo.ID, workerTwo)

	workerThree := NewWorker("test-3", "test", queue)
	workerThree.start(3)
	queue.addWorker(workerThree.ID, workerThree)

	time.Sleep(time.Second)
	workerThree.stop()
	time.Sleep(time.Second)
	workerTwo.stop()
	time.Sleep(time.Second)
	workerOne.stop()

	queue.Stop()

執行結果: 首先根據offset, 最終test-3位Leader, 然後檢視任務分配結果, 有兩個節點2個任務,一個節點一個任務, 然後隨著worker的退出,又會進行任務的重新分配

Generation [1] leaderID [test-1]
Generation [2] leaderID [test-2]
Generation [3] leaderID [test-3]
Generation [1] worker [test-1]  run tasks: [test1||test2||test3||test4||test5]
Generation [1] worker [test-2]  run tasks: []
Generation [1] worker [test-3]  run tasks: []
Generation [2] worker [test-1]  run tasks: [test1||test2||test3]
Generation [2] worker [test-2]  run tasks: [test4||test5]
Generation [2] worker [test-3]  run tasks: []
Generation [3] worker [test-1]  run tasks: [test1||test2]
Generation [3] worker [test-2]  run tasks: [test3||test4]
Generation [3] worker [test-3]  run tasks: [test5]
Generation [4] leaderID [test-2]
Generation [4] worker [test-1]  run tasks: [test1||test2||test3]
Generation [4] worker [test-2]  run tasks: [test4||test5]
Generation [5] leaderID [test-1]
Generation [5] worker [test-1]  run tasks: [test1||test2||test3||test4||test5]

總結

其實在分散式場景中,這種Leader/Follower選舉,其實更多的是會選擇基於AP模型的consul、etcd、zk等, 本文的這種設計,與kafka自身的業務場景由很大的關係, 後續有時間,還是繼續看看別的設計, 從kafka connet借鑑的設計,就到這了

未完待續 關注公共號: 布衣碼農

更多精彩內容可以檢視