1. 程式人生 > >Sneaker工作方式簡析 # W19

Sneaker工作方式簡析 # W19

Sneaker是基於Ruby的高效能RabbitMQ消費者,可以便捷地嵌入Rails應用,以常駐程序的方式處理RabbitMQ訊息,執行模式類似Sidekiq。 本文簡單介紹下Sneaker背後的工作方式,內容大概包含以下幾個方面:

  • Sneaker的工作模型
  • Bunny如何接收、分發訊息

Sneaker的工作方式

Sneaker藉助ServerEngine實現程序模型搭建,可以有多程序、多執行緒等四種工作方式,都大同小異。本文只介紹預設的多程序方式,類似Unicorn的Master-Workers程序模型:

  • Master建立(fork)Worker程序,只接收外部命令,可控制Worker程序的工作狀態
  • Workers程序處理MQ訊息,各自獨立,工作內容完全一樣
  • Master和Workers通過pipe通訊
  • Worker程序可以有多個

這是一個sneaker的業務worker,通過它來進行業務邏輯處理,每個業務worker會去消費一個特定queue的訊息。

# 業務worker  
# app/workers/comsumer_worker.rb
class ComsumerWorker 
  include Sneakers::Worker
  from_queue 'downloads',
             :prefetch => 50,
             :exchange
=> 'dummy', :heartbeat => 5, :amqp_heartbeat => 10 def work(msg) # 處理業務邏輯 ack! end end # rake任務
WORKERS=ComsumerWorker rake sneakers:run

Sneaker可以通過上面的rake任務來啟動,如果不通過WORKERS環境變數指定要執行的業務worker,則會預設執行所有的業務worker任務。

啟動的流程為以下幾個階段:

執行結果是:

  • 有一個或多個子程序,彼此相互隔離,工作內容一樣,會同時執行所有業務worker
  • 子程序中,每個業務worker會預設有獨自的網路連線,也可以在子程序中共用用同一個連線
  • 子程序中,每個業務worker預設有一個Concurrent::FixedThreadPool執行緒池(大小可配置),被回撥的業務邏輯程式碼會被丟到執行緒池中併發執行, 子程序中也可以共用同一個執行緒池
  • 每個queue的消費者會將MQ過來的訊息通過回撥函式往上層傳遞,並觸發業務邏輯回撥,業務邏輯會被丟到執行緒池中
  • 執行緒池一直不停地執行池中的任務

Sneaker就是不斷地把消費者傳遞過來的訊息和業務邏輯打包在一起,丟到對應的執行緒池中,等待被執行。一圖勝千言:

總結一下要點:

  • 訊息是由消費者通過回撥函式傳遞迴應用,丟入執行緒池,達到高效能、解耦的作用
  • 實際的業務邏輯是線上程池中被執行的,執行緒池一直被動等待任務

應用端的執行邏輯入上圖,相對簡單。接下來,將注意力放到下面幾個問題:

  • MQ中的訊息是如何到達消費者的?
  • 回撥邏輯是如何觸發的?

Bunny如何工作

Bunny是協議庫,用來處理和MQ之間的網路連線、AMQP物件的抽象、解析併發送AMQP訊息, 有以下幾個核心物件:

  • session,網路連線,是一個TCP長連線
  • channel,基於session, 可建立多個,每個channel有一個id
  • 消費者,基於channel,可有多個,消費對應queue的訊息,有自己的tag
  • queue,對應MQ的queue
  • exchange,對應MQ的exchange

一個session中可以存在多個channel,由id來區分彼此;一個channel中可以存在多個消費者,以tag來區分彼此。

AMQP的訊息體比較複雜,大概分為三部分:

  • headers (類比HTTP協議的headers)
  • payload (類比HTTP協議的訊息體)
  • end (類比HTTP協議第二個換行符)

headers中有method(類似HTTP),content-type,length等等屬性。 同一個TCP通道,可以傳輸多個channel、多個消費者的訊息。因為method為basic.delivery的AMQP幀資料中包含了:

  • channel ID, 區分通道
  • cusumer TAG, 區分消費者
  • queue_name, 區分佇列

通過解析這些引數,就可以將幀資料轉發給對應的channel,channel再將資料轉發給消費者,消費者就會向前面說的那樣觸發回撥。

幀資料是通過session而來,也就是從TCP長連線的socket中讀出來的。session會建立一個執行緒,不停地通過非阻塞的方式讀取socket中的資料:

#https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/cruby/socket.rb#L49
def read_fully(count, timeout = nil)
      # 有刪減
      value = ''
      begin
        loop do
          value << read_nonblock(count - value.bytesize) # 非阻塞讀
          break if value.bytesize >= count
        end
      rescue EOFError
      end
      value
    end # read_fully

順帶介紹一下,Bunny如何讀取完整的一幀資料:

#https://github.com/ruby-amqp/bunny/blob/master/lib/bunny/transport.rb#L241
    def read_next_frame(opts = {}) # 有刪減
      header              = read_fully(7) # 先讀前七個位元組,header
      type, channel, size = AMQ::Protocol::Frame.decode_header(header) # 解析header,得到資料長度size
      payload             = if size > 0
                              read_fully(size) # 讀取完整的資料,payload
                            else
                              ''
                            end
      frame_end = read_fully(1) # 讀取結尾

      AMQ::Protocol::Frame.new(type, payload, channel)
    end

上面的執行緒把從socket中讀到的資料,打包成一個程式碼塊,寫入對應channel的一個佇列中,這個程式碼塊中包含了對應消費者的業務邏輯回撥。

每個channel除了有上面所說的佇列之外,還會有一個執行緒,這個執行緒不斷地從佇列中pop出程式碼塊來執行, 也就是這樣觸發了消費者的回撥。這裡也是一個解耦合的設計,通過一個佇列作為中轉站,將程式碼邏輯很巧妙地做了分層。

以一張圖結束本文:

國慶快樂!