1. 程式人生 > >NSQ原始碼分析(四)——inFlightPqueue和PriorityQueue優先順序佇列

NSQ原始碼分析(四)——inFlightPqueue和PriorityQueue優先順序佇列

   在Channel結構體中用到了兩種優先順序佇列pqueue.PriorityQueue和inFlightPqueue。

deferredMessages map[MessageID]*pqueue.Item
deferredPQ       pqueue.PriorityQueue
deferredMutex    sync.Mutex

inFlightMessages map[MessageID]*Message
inFlightPQ       inFlightPqueue
inFlightMutex    sync.Mutex

       其中deferredMessages和inFlightMessages 使用map儲存了MessageID和Message的對應關係,用於根據MessageID獲取對應的Message。而deferredPQ和inFlightPQ是兩種優先順序佇列。deferredPQ佇列中儲存了延時訊息和訊息投遞失敗需要等待指定時間後重新投遞的訊息,inFlightPQ佇列中儲存了正在投遞但還沒確認投遞成功的訊息。

 

一、PriorityQueue優先順序佇列

       原始碼位置在 nsq/internal/pqueue/pqueue.go檔案中

      

PriorityQueue實現了Golang原始碼包heap中的介面,是最小堆。在瞭解PriorityQueue之前需要先對堆的概念有了解,可以參考:

https://blog.csdn.net/skh2015java/article/details/83183681

PriorityQueue佇列中儲存的是Item指標,Item結構體及欄位說明如下:

type Item struct {
	Value    interface{}  //儲存的訊息內容
	Priority int64   //優先順序的時間點
	Index    int  //在切片中的索引值
}

  

   這三個方法用於排序,按Priority欄位排序

func (pq PriorityQueue) Len() int {
	return len(pq)
}

func (pq PriorityQueue) Less(i, j int) bool {
	return pq[i].Priority < pq[j].Priority
}

func (pq PriorityQueue) Swap(i, j int) {
	pq[i], pq[j] = pq[j], pq[i]
	pq[i].Index = i
	pq[j].Index = j
}

  常用方法:

  

    New函式用於初始化佇列並指定cap

func New(capacity int) PriorityQueue {
	return make(PriorityQueue, 0, capacity)
}

   

  Push函式用於向佇列中新增元素

  Pop函式用於取出佇列中優先順序最高的元素(即Priority值最小,也就是根部元素)

  PeekAndShift(max int64) 用於判斷根部的元素是否超過max,如果超過則返回nil,如果沒有則返回並移除根部元素(根部元素是最小值)

    在實際的專案中Priority欄位存的是時間戳,比如說5分鐘之後投遞本條訊息,則Priority欄位存的就是5分鐘之後的時間戳。而PeekAndShift(max int64)中max值就是當前時間,如果佇列中的根部元素大於當前時間戳max的值,則說明佇列中沒有可以投遞的訊息,故返回nil。如果小於等於,則根部元素存放的訊息可以投遞,就是就返回並移除該根部元素。

func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
	if pq.Len() == 0 {
		return nil, 0
	}

	item := (*pq)[0]  //獲取根部元素
	if item.Priority > max {
		return nil, item.Priority - max
	}
	heap.Remove(pq, 0)  //移除根部元素,並重新排列堆,使根部元素為最小值

	return item, 0
}

 

  二、inFlightPqueue優先順序佇列

     

原始碼位置:nsq/nsqd/in_flight_pqueue.go檔案中

  inFlightPqueue佇列中存放的元素是Message的指標,也是最小堆,和PriorityQueue佇列類似。inFlightPqueue佇列按Message中的pri欄位進行排序的(pri也是時間戳,是投遞訊息的超時時間)

  常用方法:

     Swap函式用於交換兩個元素的位置,索 引值也隨之改變

     Push函式向佇列中新增元素

     Pop函式移除佇列中的根部元素

     Remove(i int)函式移除佇列中指點位置的元素

     PeekAndShit函式用於判斷根部的元素是否超過max,如果超過則返回nil,如果沒有則返回並移除根部元素(根部元素是最小值)

本節主要對兩種佇列進行了解和學習,下篇的Channel會進一步探討這兩種佇列中訊息的傳遞。