Sneaker工作方式簡析 # W19
Sneaker是基於Ruby的高效能RabbitMQ消費者,可以便捷地嵌入Rails應用,以常駐程序的方式處理RabbitMQ訊息,執行模式類似Sidekiq。 本文簡單介紹下Sneaker背後的工作方式,內容大概包含以下幾個方面:
- Sneaker的工作模型
- Bunny如何接收、分發訊息
Sneaker的工作方式
Sneaker藉助ServerEngine
實現程序模型搭建,可以有多程序、多執行緒等四種工作方式,都大同小異。本文只介紹預設的多程序方式,類似Unicorn的Master-Workers
程序模型:
- Master建立(fork)Worker程序,只接收外部命令,可控制Worker程序的工作狀態
- Workers程序處理MQ訊息,各自獨立,工作內容完全一樣
- Master和Workers通過pipe通訊
- Worker程序可以有多個
這是一個sneaker的業務worker
,通過它來進行業務邏輯處理,每個業務worker會去消費一個特定queue
的訊息。
# 業務worker
# app/workers/comsumer_worker.rb
class ComsumerWorker
include Sneakers::Worker
from_queue 'downloads',
:prefetch => 50,
:exchange => 'dummy',
:heartbeat => 5,
:amqp_heartbeat => 10
def work(msg) # 處理業務邏輯
ack!
end
end
# rake任務
WORKERS=ComsumerWorker rake sneakers:run
Sneaker可以通過上面的rake任務來啟動,如果不通過WORKERS環境變數指定要執行的業務worker,則會預設執行所有的業務worker
任務。
啟動的流程為以下幾個階段:
執行結果是:
- 有一個或多個子程序,彼此相互隔離,工作內容一樣,會同時執行所有
業務worker
- 子程序中,每個業務worker會預設有獨自的網路連線,也可以在子程序中共用用同一個連線
- 子程序中,每個業務worker預設有一個
Concurrent::FixedThreadPool
執行緒池(大小可配置),被回撥的業務邏輯程式碼會被丟到執行緒池中併發執行, 子程序中也可以共用同一個執行緒池 - 每個queue的消費者會將MQ過來的訊息通過回撥函式往上層傳遞,並觸發業務邏輯回撥,業務邏輯會被丟到執行緒池中
- 執行緒池一直不停地執行池中的任務
Sneaker就是不斷地把消費者
傳遞過來的訊息和業務邏輯打包在一起,丟到對應的執行緒池中,等待被執行。一圖勝千言:
總結一下要點:
- 訊息是由消費者通過回撥函式傳遞迴應用,丟入執行緒池,達到高效能、解耦的作用
- 實際的業務邏輯是線上程池中被執行的,執行緒池一直被動等待任務
應用端的執行邏輯入上圖,相對簡單。接下來,將注意力放到下面幾個問題:
- MQ中的訊息是如何到達消費者的?
- 回撥邏輯是如何觸發的?
Bunny如何工作
Bunny是協議庫,用來處理和MQ之間的網路連線、AMQP物件的抽象、解析併發送AMQP訊息, 有以下幾個核心物件:
- session,網路連線,是一個TCP長連線
- channel,基於session, 可建立多個,每個channel有一個id
- 消費者,基於channel,可有多個,消費對應queue的訊息,有自己的tag
- queue,對應MQ的queue
- exchange,對應MQ的exchange
一個session中可以存在多個channel,由id來區分彼此;一個channel中可以存在多個消費者,以tag來區分彼此。
AMQP的訊息體比較複雜,大概分為三部分:
- headers (類比HTTP協議的headers)
- payload (類比HTTP協議的訊息體)
- end (類比HTTP協議第二個換行符)
headers中有method(類似HTTP),content-type,length等等屬性。 同一個TCP通道,可以傳輸多個channel、多個消費者的訊息。因為method為basic.delivery
的AMQP幀資料中包含了:
- channel ID, 區分通道
- cusumer TAG, 區分消費者
- queue_name, 區分佇列
通過解析這些引數,就可以將幀資料轉發給對應的channel,channel再將資料轉發給消費者,消費者就會向前面說的那樣觸發回撥。
幀資料是通過session而來,也就是從TCP長連線的socket中讀出來的。session會建立一個執行緒,不停地通過非阻塞的方式讀取socket中的資料:
#https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/cruby/socket.rb#L49
def read_fully(count, timeout = nil)
# 有刪減
value = ''
begin
loop do
value << read_nonblock(count - value.bytesize) # 非阻塞讀
break if value.bytesize >= count
end
rescue EOFError
end
value
end # read_fully
順帶介紹一下,Bunny如何讀取完整的一幀資料:
#https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/transport.rb#L241
def read_next_frame(opts = {}) # 有刪減
header = read_fully(7) # 先讀前七個位元組,header
type, channel, size = AMQ::Protocol::Frame.decode_header(header) # 解析header,得到資料長度size
payload = if size > 0
read_fully(size) # 讀取完整的資料,payload
else
''
end
frame_end = read_fully(1) # 讀取結尾
AMQ::Protocol::Frame.new(type, payload, channel)
end
上面的執行緒把從socket中讀到的資料,打包成一個程式碼塊,寫入對應channel的一個佇列中,這個程式碼塊中包含了對應消費者
的業務邏輯回撥。
每個channel除了有上面所說的佇列之外,還會有一個執行緒,這個執行緒不斷地從佇列中pop
出程式碼塊來執行, 也就是這樣觸發了消費者的回撥。這裡也是一個解耦合的設計,通過一個佇列作為中轉站,將程式碼邏輯很巧妙地做了分層。
以一張圖結束本文:
國慶快樂!