1. 程式人生 > >amqp協議與pika庫淺析

amqp協議與pika庫淺析

AMQP協議

簡介

高階訊息佇列協議使得遵從該規範的客戶端應用和訊息中介軟體伺服器的全功能互操作成為可能。
為了完全實現訊息中介軟體的互操作性,需要充分定義網路協議和訊息代理服務的功能語義。
一套確定的訊息交換功能,也就是“高階訊息交換協議模型”。AMQP模型包括一套用於路由和儲存訊息的功能模組,以及一套在這些模組之間交換訊息的規則。
一個網路線級協議(資料傳輸格式),客戶端應用可以通過這個協議與訊息代理和它實現的AMQP模型進行互動通訊。
有關AMQP協議的文件可登陸官網,或者參考中文版博文amqp中文翻譯文件
文件已經詳細解釋了

一.AMQP整體概述

下面的圖顯示了整體AMQ模型:
這裡寫圖片描述


在AMQP之前的伺服器中,它們會通過實現了特定型別路由和快取的龐大引擎來完成. AMQ模組使用較小的模組結合更多樣和穩健的方案來實現. 它把這些任務分成了兩個不同角色:
 交換器, 它接受來自生產者的訊息並將它們路由到訊息佇列.
 訊息佇列, 它儲存訊息訊息並把它們轉發給消費者應用程式.
AMQP提供了執行時程式語義,主要有兩方面:
1. 執行時通過該協議可建立任意的交換器和訊息佇列型別的能力(有些是在標準中定義的,但可以新增其他作為伺服器擴充套件)。
2. 執行時通過協議包裝交換器和訊息佇列來建立任何需要的訊息處理系統的能力.
1.訊息佇列(Message Queue)
訊息佇列用於在記憶體或磁碟上儲存訊息, 並將它們依次投遞給一個或多個消費者應用程式.訊息佇列是訊息儲存和分發的實體. 每個訊息佇列是完全獨立的,且是一個相當聰明的物件。
訊息佇列有多個屬性:私有的或共享的, 持久的或臨時的,客戶端命名的或伺服器端命名的等等.
通過選擇希望的屬性,我們可以使用訊息佇列來實現傳統的中介軟體實體,如:
 共享儲存轉發佇列:它可以持有訊息,並以round-robin方式在消費者之間分發訊息.儲存轉發佇列通常是在多個消費者之間是持久化的.
 私有回覆佇列:它可以持有訊息,並把訊息轉發給單個消費者. 回覆佇列通常是臨時的,服務端命名的,且對於某個消費者來說是私有的.
 私有訂閱佇列:它可持有來自不同訂閱源的訊息,並將它們轉發給單個消費者.
訂閱佇列通常是臨時的,伺服器端命名的,並對於某個消費者來說是私有的.
2.交換器接收來自生產者應用程式的訊息,並將它們按照事先約定的規則路由到訊息佇列中.
這些預先約定的規則或條件稱為繫結. 交換器會與路由引擎匹配.
也就是說,他們會檢查訊息,並使用他們的繫結表來決定如何將這些訊息轉發到訊息佇列或其他交換器中。交換器永遠不會儲存資訊。“交換器”一詞是指一類演算法或演算法例項。
更確切的說,我們談到了交換器型別和交換器例項.
AMQP定義了許多交換器型別,它們覆蓋了常見訊息路由分發的基礎型別. AMQP伺服器提供了這些交換器的預設例項.使用AMQP的應用程式可額外建立它們自己的交換器例項.交換器型別是命名的,這樣建立交換器的應用程式就可以告知伺服器他們使用的交換器型別.
交換器實現也可以是命名的,這樣應用程式可指定如何繫結佇列來發布訊息.
交換器還可以做更多的訊息路由.它們可作為伺服器內的智慧路由代理,按需接受訊息和生產訊息. 交換器概念的目的是定義一套模型或標準,使得可以合理地擴充套件AMQP伺服器,因為可擴充套件性會對互操作產生影響.

下面的圖展示了通過AMQ模組伺服器的訊息流:
這裡寫圖片描述
客戶端與服務端的訊息流如圖所示。

1.訊息佇列屬性
當客戶端程式建立了訊息佇列時,它可以選擇一些重要的屬性:
 name - 如果沒有指定,伺服器會選擇一個名稱,並將其提供給客戶端.一般來說,當應用程式共享訊息佇列時,它們已經對訊息佇列名稱作了事先的約定,當一個應用程式需要出於其自身目的來要求佇列時,它可讓伺服器提供一個名稱.
 exclusive - 如果設定了,佇列將只屬於當前連線,且在連線關閉時刪除.
 durable - 如果設定了, 訊息會進行儲存,並在伺服器重啟時啟用. 當伺服器重啟時,它可能會丟失瞬時訊息.
2. 佇列生命週期
這裡主要有兩種訊息佇列生命週期:
 持久化訊息佇列:它們可被多個消費者共享,並可獨立地存在- 即.不管是否有消費者接收它們,它都可以繼續存在收集訊息.
 臨時訊息佇列:對某個消費者是私有的,只能繫結到此消費者.當消費者斷開連線時,訊息佇列將被刪除.
也存在一些變化,如共享訊息佇列會在最後一個消費才斷開連線時刪除訊息佇列.下面的圖展示了臨時訊息佇列建立和刪除的過程:
這裡寫圖片描述

1.協議命令 (類&方法)
中介軟體是複雜的,我們在設計協議結構的挑戰是要馴服其複雜性。
我們的方法是基於類來建立傳統API模型,這個類中包含方法,並定義了方法明確應該做什麼.
這會導致大量的命令集合,但一個命令應該相對容易理解.
AMQP命令組合在類中.每個類都覆蓋了一個特定功能領域.有此類是可選的 -每個節點都實現了需要支援的類.
有兩種不同方法對話:
 同步請求-響應,在其中一個節點發送請求,另一個節點發送回復.
同步請求和響應適用於效能不是關鍵的地方.
 非同步通知, 在其中,一個節點發送訊息但不希望得到回覆.非同步方法適用於效能是至關重要的地方.
為使處理方法簡單,我們為每個非同步請求定義了不同的回覆. 也就是說,沒有哪個方法可作為兩個不同請求的回覆.這意味著一個節點,傳送一個同步請求後,可以接受和處理傳入的方法,直到得到一個有效的同步答覆. 這使得AMQP與更加傳統的RPC協議是有區別的.

二.AMQP協議主要類及流程

1).Connection類

AMQP是一個連線協議. 連線設計為長期的,且可運載多個通道. 連線生命週期是這樣的:
 client開啟與伺服器的TCP/IP連線併發送一個協議頭(protocol header).這只是client傳送的資料,而不是作為方法格式的資料.
 server使用其協議版本和其它屬性,包括它支援安全機制列表(Start方法)進行響應.
 client選擇一種安全機制(Start-Ok).
 server開始認證過程, 它使用SASL的質詢-響應模型(challenge-response model). 它向客戶端傳送一個質詢(Secure).
 client向server傳送一個認證響應(Secure-Ok). 例如,對於使用”plain”機制,響應會包含登入使用者名稱和密碼.
server 重複質詢(Secure) 或轉到協商,傳送一系列引數,如最大幀大小(Tune).
 client接受或降低這些引數(Tune-Ok).
 client 正式開啟連線並選擇一個虛擬主機(Open).
 伺服器確認虛擬主機是一個有效的選擇 (Open-Ok).
 客戶端現在使用希望的連線.
 一個節點(client 或 server) 結束連線(Close).
 另一個節點對連線結束握手(Close-Ok).
 server 和 client關閉它們的套接字連線.
沒有為不完全開啟的連線上的錯誤進行握手. 根據成功協議頭協商(後面有詳細定義),在傳送或收到Open 或Open-Ok之前,如果一個節點檢測到錯誤,這個節點必須關閉socket,而不需要傳送任何進一步的資料。

2).Channel 類

AMQP是一個多通道協議. 通道提供了一種方式來將一個重量級TCP/IP連線分成多個輕量級連線.
這使得協議對於防火牆更加友好,因為埠使用是可預測的. 這也意味著傳輸調整和網路服務質量可以得到更好的利用.
通道是獨立的,它們可以同時執行不同的功能,可用頻寬會在當前活動之間共享.
這是令人期待的,我們鼓勵多執行緒客戶端應用程式經常使用”每個通道一個執行緒”程式設計模型.
然而,從單個client開啟一個或多個AMQP servers連線也是完全可以接受的.
通道生命週期如下:
1. client開啟一個新通道(Open).
2. server確認新通道準備就緒(Open-Ok).
3. client和server按預期來使用通道.
4. 一個節點(client或server) 關閉了通道(Close).
5. 另一個節點對通道關閉進行握手(Close-Ok).

3).Exchange 類

交換器類讓應用程式來管理伺服器上的交換器。這個類可以讓應用程式指令碼自己佈線(而不是依賴於一些配置介面)。注:大多數應用程式不需要這個級別的複雜度,傳統的中介軟體是不太可能能夠支援這種語義。
交換器生命週期如下:
1. client 請求server確保交換器是否存在(Declare). client可細化到,”如果交換器不存在則進行建立”,或 "如果交換器不存在,警告我,不需要建立”.
2. client釋出訊息到交換器.
3. client可選擇刪除交換器(Delete).

4).Queue 類

queue類可讓應用程式來管理伺服器上的訊息佇列. 在幾乎所有消費訊息的應用程式中,這是基本步驟,至少要驗證期望的訊息佇列是否實際存在.
持久化訊息佇列的生命週期相當簡單:
1. client斷言訊息佇列存在(Declare, 使用”passive”引數).
2. server確認訊息佇列存在(Declare-Ok).
3. client從訊息佇列中讀取訊息。
臨時訊息佇列的生命週期更加有趣:
1. client建立訊息佇列(Declare,不提供佇列名稱,伺服器會分配一個名稱). server 確認(Declare-Ok).
2. client 在訊息佇列上啟動一個消費者. 消費者的精確功能是由Basic類定義的。
3. client 取消消費者, 要麼是顯示取消,要麼是通過關閉通道/連線隱式取消的
4. 當最後一個消費者從訊息佇列中消失的時候,在過了禮貌性超時後,server會刪除訊息佇列.
AMQP 像訊息佇列一樣為主題訂閱實現了分發機制. 這使結構更多有趣,訂閱可以在合作訂閱應用程式池中進行負載均衡.
訂閱生命週期涉及到額外的繫結階段:
1. client 建立訊息佇列(Declare),server進行確認(Declare-Ok).
2. client 繫結訊息佇列到一個topic交換器 (Bind),server進行確認(Bind-Ok).
3. client像前面的例子來使用訊息佇列.

5).Basic 類

Basic 類實現本規範中描述的訊息功能.它支援如下主要語義:
 從client傳送訊息給server, 非同步發生(Publish)
 啟動和停止消費者(Consume, Cancel)
 從server傳送訊息給client, 非同步發生(Deliver, Return)
 應答訊息(Ack, Reject)
 同步從訊息佇列中取訊息 (Get).

6).Transaction 類

AMQP 支援兩種型別的事務:
1. 自動事務: 每個釋出的訊息和應答都處理為獨立事務.
2. Server 本地事務, 伺服器會快取釋出的訊息和應答,並會根據需要由client來提交它們.
Transaction 類(“tx”) 使應用程式可訪問第二種型別,即伺服器事務。這個類的語義是:
1. 應用程式要求在每個通道中都有事務(Select).
2. 應用程式做一些工作(Publish, Ack).
3. 應用程式提交或回滾工作(Commit, Roll-back).
4. 應用程式做一些工作,迴圈往復。

三.AMQP 傳輸架構

這個章節解釋了命令是如何對映到線路協議的.

1). 一般描述

AMQP是二進位制協議. 資訊被組織成各種型別的幀(frames). Frames可以攜帶協議方法和其它資訊.所有 幀(frames)都有同樣的格式: 幀頭(frame header),幀負載(frame payload)和幀尾(frame end).幀負載( frame payload)的格式依賴於幀型別(frame type).
我們假設有一個可靠的面向流的網路傳輸層(TCP/IP或相當的).
在單個套接字連線中,可以存在多個獨立控制執行緒,它們被稱為通道.
每個幀都使用通道編號來編號.通過交織它們的幀,不同的通道共享連線。對於任何給定的通道,幀執行在一個嚴格的序列,這樣可以用來驅動一個協議解析器(通常是一個狀態機).
我們使用一組小的資料型別,如位,整數,字串和欄位表來構造幀。幀欄位是緊密包裝的,不會使得它們緩慢或解析複雜。從協議規範中生成框架層是相對簡單的。
線路級格式的設計是可擴充套件性,一般可以用於任意的高層協議(不只是AMQP)。我們假設AMQP將來會擴充套件、改進,隨時間的推移線路級格式仍然會得到支援。

2).資料型別

AMQP資料型別用於方法幀中,它們是:
 Integers ( 1到8個位元組),用來表示大小,數量,範圍等等. Integers通常是無符號的,在幀中可能是未對齊的.
 Bits,用來表示開/關值.位被包裝成位元組。
 短字串(short string),用來儲存短的文字屬性.短字串限制為255個位元組,可以在無緩衝區溢位的情況下進行解析.
 長字串(long string),用來儲存二進位制資料塊.
 欄位表(Field tables),用來儲存名稱-值對(name-value pairs). 欄位值型別可以是字串,整數等等.

3).協議協商

AMQP client 和server 可對協議進行協商.這就是說當client連線時,server可處理client能接受或修改的操作.當兩個點對結果達成一致時, 連線會繼續前行.協商是一種有用的技術,因為它讓我們可以斷言假設和前提條件。在AMQP,我們協商協議的下面方面:
 實現協議和版本. server 可在同一個埠上儲存多個協議.
 加密引數和兩者之間的認證.這是功能層的一部分,以前解釋過。
 最大幀大小,通道數量,以及其它操作限制.
達成一致的限制可能會使兩者重新分配關鍵快取區以避免死鎖.每個傳入的幀要麼服從達成的限制(這是安全的),或者超過它們(在這種情況下,另一方必須斷開連線).這非常符合"它要麼工作,要麼就完全不工作”的AMQP哲學.
兩個節點達成一致的最低限度為:
 伺服器必須告訴客戶端它提出了什麼限制。
 客戶端進行響應,並可能減少其連線的限制。

4). 限制幀

TCP/IP是一個流協議,即沒有限制幀的內建機制. 現有協議可以幾種不同的方式解決這個問題:
 每個連線中只發送單個幀.這很簡單,但很慢.
 在流中新增幀定界符.這很簡單,但解析較慢.
 計算幀的大小, 並在每個幀的前面傳送大小。這是簡單和快速,和我們的選擇.

5). 幀細節

所有的幀都由一個頭(header,7個位元組),任意大小的負載(payload),和一個檢測錯誤的幀結束(frame-end)位元組組成:
這裡寫圖片描述
要讀取一個幀,我們必須:
1. 讀取header,檢查幀型別(frame type)和通道(channel).
2. 根據幀型別,我們讀取負載並進行處理.
3. 讀取幀結束位元組.
在實際實現中,如果效能很關鍵的話,我們應該使用讀前緩衝(read-ahead buffering)”或“收集讀取(gathering reads)”,以避免為了讀一個幀而做三次獨立的系統呼叫。

5.1 方法幀

方法幀可以攜帶高階協議命令(我們稱之為方法(methods)).一個方法幀攜帶一個命令. 方法幀負載有下面的格式:
這裡寫圖片描述
要處理一個方法幀,我們必須:
1. 讀取方法幀負載.
2. 將其拆包成結構. 方法通常有相同的結構,因此我們可以快速對方法進行拆包.
3. 檢查在當前上下文中是否允許出現方法.
4. 檢查方法引數是否有效.
5.執行方法.
方法主體(bodies) 由AMQP資料欄位(位,整數, 字串和字串表組成)構成. 編組程式碼直接從協議規範中生成,因此是非常快速地.

5.2 內容幀

內容是我們通常AMQP伺服器在客戶端與客戶端之間傳送和應用資料. 粗略地說,內容是由一組屬性加上一個二進位制資料部分組成的。它所允許的屬性集合由Basic類定義,而這些屬性的形式為內容頭幀(content header frame)。其資料可以是任何大小,也有可能被分解成幾個(或多個)塊,每一個都有內容體幀(content body frame)。
看一個特定通道的幀,當它們線上路上傳輸時,我們可能會看到下面這樣的東西:
這裡寫圖片描述
某些方法(如Basic.Publish, Basic.Deliver等等.)通常情況下定義為傳輸內容.
當一個節點發送像這樣的方法幀時,它總是會遵循一個內容頭幀(conent header frame)和零個或多個內容體幀(content body frame)的形式.
一個內容頭幀有下面的格式:
某些方法(如Basic.Publish, Basic.Deliver等等.)通常情況下定義為傳輸內容.
當一個節點發送像這樣的方法幀時,它總是會遵循一個內容頭幀(conent header frame)和零個或多個內容體幀(content body frame)的形式.
一個內容頭幀有下面的格式:
這裡寫圖片描述
我們將內容體放置在不同的幀中(並不包含在方法中),因此AMQP可支援零拷貝技術,這樣其內容就不需要編組或編碼. 我們將內容屬性安放在它們自己的幀中,以便收件人可以有選擇地丟棄他們不想處理的內容。

5.3 心跳幀

心跳是一種設計用來撤銷(undo)TCP/IP功能的技術,也就是說在長時間超時後,它有能力通過關閉broker物理連線來進行恢復.在某些情景下,我們需要快速知道節點連線是否斷開了,或者是由於什麼原因不能響應了.因為心跳可以在較低水平上進行,我們在傳輸層次上按節點交換的特定幀型別來處理,而不是按類方法.

6). 錯誤處理

AMQP使用異常來處理錯誤.任何操作錯誤(未找到訊息佇列,訪問許可權不足)都會導致一個通道異常. 任何結構化的錯誤(無效引數,壞序列的方法.)都會導致一個連線異常.異常會關閉通道或連線,同時也會向客戶端應用返回響應碼和響應文字.我們使用了類似於HTTP等協議和其它大多數協議中的三位回覆程式碼和文字回覆文字方案.

7). 關閉通道和連線

連線或通道,對於客戶端來說,當其傳送Open時則被認為是“開啟”的,對於伺服器端來說,當其傳送Open-Ok時則被認為是開啟的。基於這一點,一個希望關閉通道或連線的對等體也必須使用握手協議來這樣做。
可出於任何原因,可能會正常地或異常地關閉一個通道或連線-因此必須仔細小心。
對於突然或意外關閉,並不能得到快速探測,因此當發生異常時,我們可能會丟失錯誤回覆程式碼。
正確的設計是對於所有關閉必須進行握手,使我們關閉後對方知道相應的情況。
當一個節點決定關閉一個通道或連線時,它傳送一個Close方法。接收節點必須使用Close-Ok來響應Close,然後雙方可以關閉他們的通道或連線。請注意,如果節點忽略了關閉,當兩個節點同時傳送Close時,可能會發生死鎖。

此上只作為概述,詳細文件說明請查閱官方文件。

pika淺析

Pika Python AMQP Client Library,pika是一個實現了AMQP協議的Python客戶端。
本次分析的是pika 0.5.0版本,本次啟動RabbitMQ作為服務端。
在上文的介紹中,客戶端的架構和實現思路。
Connection類介紹

1.參考demo_get.py

import sys
import pika
import asyncore
import time

conn = pika.AsyncoreConnection(pika.ConnectionParameters(
        (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',
        credentials = pika.PlainCredentials('guest', 'guest')))                 # 選用非同步io處理,並輸入連線地址和認證使用者名稱與密碼

print 'Connected to %r' % (conn.server_properties,)

qname = (len(sys.argv) > 2) and sys.argv[2] or 'test'                           # 設定佇列名稱

ch = conn.channel()                                                             # 建立頻道
ch.queue_declare(queue=qname, durable=True, exclusive=False, auto_delete=False) # 申請建立一個佇列名稱

while conn.is_alive():                                                          # 如果連線沒有斷開就一直查詢服務端佇列中是否有資料
    result = ch.basic_get(queue = qname)                                                        
    print result                                                                
    if isinstance(result, pika.spec.Basic.GetEmpty):                            # 如果返回的結果為空
        pass
    elif isinstance(result, pika.spec.Basic.GetOk):                             # 如果申請佇列成功
        ch.basic_ack(delivery_tag = result.delivery_tag)                        # 則回覆服務端該通道的值
    else:
        raise Exception("Hmm, that's unexpected. basic_get should have returned either "
                        "Basic.GetOk or Basic.GetEmpty",
                        result)
    time.sleep(1)

主要是想服務端申請一個新的訊息佇列,然後一直獲取該佇列裡是否有資料

2.參考demo_send.py

import sys
import pika
import asyncore

conn = pika.AsyncoreConnection(pika.ConnectionParameters(
        (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',
        credentials=pika.PlainCredentials('guest', 'guest')))                       # 選用非同步io處理,並輸入連線地址和認證使用者名稱與密碼

ch = conn.channel()                                                                 # 建立頻道
ch.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)    # 申請建立一個佇列名稱,如果該佇列已經存在就返回該佇列值

ch.basic_publish(exchange='',
                 routing_key="test",
                 body="Hello World!",
                 properties=pika.BasicProperties(
                        content_type = "text/plain",
                        delivery_mode = 2, # persistent
                        ),
                 block_on_flow_control = True)                                      # 在頻道上使用預設的exchange,路由為test,內容為body

conn.close()                                                                        # 關閉連線,先將資料送出
pika.asyncore_loop()                                                                # 將釋出的資料發出                          

3.參考demo_receive.py

import sys
import pika
import asyncore

conn = pika.AsyncoreConnection(pika.ConnectionParameters(
        (len(sys.argv) > 1) and sys.argv[1] or '127.0.0.1',
        credentials = pika.PlainCredentials('guest', 'guest'),
        heartbeat = 10))                                                # 選用非同步io處理,並輸入連線地址和認證使用者名稱與密碼,並設定心跳時間

print 'Connected to %r' % (conn.server_properties,)

qname = (len(sys.argv) > 2) and sys.argv[2] or 'test'

ch = conn.channel()                                                     # 建立頻道 
ch.queue_declare(queue=qname, durable=True, exclusive=False, auto_delete=False)  # 申請建立一個佇列名稱,如果該佇列已經存在就返回該佇列值

def handle_delivery(ch, method, header, body):
    print "method=%r" % (method,)
    print "header=%r" % (header,)
    print "  body=%r" % (body,)
    ch.basic_ack(delivery_tag = method.delivery_tag)                    # 消費完成後向伺服器傳送消費完成標誌

ch.basic_consume(handle_delivery, queue = qname)                        # 消費指定佇列上的資料,
pika.asyncore_loop()                                                    # 開啟迴圈
print 'Close reason:', conn.connection_close

通過這幾個例子我們先分析connection類,在pika中提供了兩個連結型別非同步連結和阻塞連結,現在分析非同步連結

class AsyncoreConnection(pika.connection.Connection):
    def delayed_call(self, delay_sec, callback):
        add_oneshot_timer_rel(delay_sec, callback)          # 新增時間和回撥函式到時間堆timer_heap中

    def connect(self, host, port):
        self.dispatcher = RabbitDispatcher(self)            # 初始化一個例項
        self.dispatcher.create_socket(socket.AF_INET, socket.SOCK_STREAM)  # 新建一個連線例項
        self.dispatcher.connect((host, port or spec.PORT))  # 連線server端

    def disconnect_transport(self):
        if self.dispatcher:                                 # 如果該例項有則關閉該連線例項
            self.dispatcher.close()

    def drain_events(self):
        loop(count = 1)                                     # 處理網路io事件

基礎類 connection

class Connection:
    def __init__(self, parameters, wait_for_open = True, reconnection_strategy = None):
        self.parameters = parameters                                  # 引數
        self.reconnection_strategy = reconnection_strategy or NullReconnectionStrategy() # 重連引數處理方法

        self.connection_state_change_event = event.Event()                      # handler新增刪除呼叫等處理類

        self._reset_per_connection_state()                            # 重置連線狀態
        self.reconnect()                                              # 重新連線

        if wait_for_open:
            self.wait_for_open()                                      # 等待server返回open

    def _reset_per_connection_state(self):
        self.state = codec.ConnectionState()                          # 連線狀態
        self.server_properties = None                                 # 伺服器連線屬性
        self.outbound_buffer = simplebuffer.SimpleBuffer()            # 讀寫緩衝區
        self.frame_handler = self._login1                             # 幀處理方法
        self.channels = {}                                            # 通道
        self.next_channel = 0                                         # 下一個頻道
        self.connection_open = False                                  # 連線是否開啟
        self.connection_close = None                                  # 連線是否關閉
        self.bytes_sent = 0                                           # 已經發送的資料長度
        self.bytes_received = 0                                       # 已經接受的資料長度
        self.heartbeat_checker = None                                 # 心跳包監測

    def delayed_call(self, delay_sec, callback):                      # 延遲呼叫函式子類必須實現
        """Subclasses should override to call the callback after the
        specified number of seconds have elapsed, using a timer, or a
        thread, or similar."""
        raise NotImplementedError('Subclass Responsibility')

    def reconnect(self):                                              # 重新連線
        self.ensure_closed()                                          # 確保該連線已經關閉
        self.reconnection_strategy.on_connect_attempt(self)           # 呼叫重連策略例項的嘗試連線函式
        self._reset_per_connection_state()                            # 重置連線狀態
        try:
            self.connect(self.parameters.host, self.parameters.port or spec.PORT)   # 重新連線
            self.send_frame(self._local_protocol_header())                 # 傳送協議頭資訊
        except:
            self.reconnection_strategy.on_connect_attempt_failure(self)  # 如果連線失敗呼叫重連策略例項的連線失敗函式
            raise

    def connect(self, host, port):                                    # 連線
        """Subclasses should override to set up the outbound
        socket."""
        raise NotImplementedError('Subclass Responsibility')

    def _local_protocol_header(self):
        return codec.FrameProtocolHeader(1,
                                         1,
                                         spec.PROTOCOL_VERSION[0],
                                         spec.PROTOCOL_VERSION[1])   # 將協議訊息封裝好

    def on_connected(self):
        self.reconnection_strategy.on_transport_connected(self)

    def handle_connection_open(self):
        self.reconnection_strategy.on_connection_open(self)
        self.connection_state_change_event.fire(self, True)             # 

    def handle_connection_close(self):
        self.reconnection_strategy.on_connection_closed(self)
        self.connection_state_change_event.fire(self, False)

    def addStateChangeHandler(self, handler, key = None):
        self.connection_state_change_event.addHandler(handler, key)     # 新增處理handler和Key
        if self.connection_open:
            handler(self, True)
        elif self.connection_close:
            handler(self, False)

    def delStateChangeHandler(self, key):
        self.connection_state_change_event.delHandler(key)              # 刪除handler

    def _set_connection_close(self, c):
        if not self.connection_close:                                   # 如果連線沒有關閉
            self.connection_close = c                                   
            for chan in self.channels.values():                         # 遍歷頻道列表
                chan._set_channel_close(c)                              # 關閉通道連線
            self.connection_open = False                                # 連線開啟標誌位設定false
            self.handle_connection_close()                              # 關閉連線

    def close(self, code = 200, text = 'Normal shutdown'):
        if self.connection_open:                                        # 如果開啟連線標誌位開啟
            self.connection_open = False                                # 設定標誌位為false
            c = spec.Connection.Close(reply_code = code,                # 
                                      reply_text = text,
                                      class_id = 0,
                                      method_id = 0)
            self._rpc(0, c, [spec.Connection.CloseOk])                  # 呼叫遠端rpc方法
            self._set_connection_close(c)                               # 設定連線關閉
        self._disconnect_transport()                                    # 關閉傳輸通道                        

    def ensure_closed(self):
        if self.is_alive():
            self.close()

    def _disconnect_transport(self, reason = None):
        self.disconnect_transport()                                     # 關閉連線例項
        self.on_disconnected(reason)                                    # 關閉連線

    def disconnect_transport(self):
        """Subclasses should override this to cause the underlying
        transport (socket) to close."""
        raise NotImplementedError('Subclass Responsibility')

    def on_disconnected(self, reason = 'Socket closed'):
        self._set_connection_close(spec.Connection.Close(reply_code = 0,
                                                         reply_text = reason,
                                                         class_id = 0,
                                                         method_id = 0))          # 傳送連線關閉的方法
        self.reconnection_strategy.on_transport_disconnected(self)

    def suggested_buffer_size(self):
        b = self.state.frame_max                                        # 如果幀資料沒有設定 則設定131072
        if not b: b = 131072
        return b

    def on_data_available(self, buf):                                   # 處理接受到的資料
        while buf:
            (consumed_count, frame) = self.state.handle_input(buf)      # 將二進位制資料解析成amqp幀資料
            self.bytes_received = self.bytes_received + consumed_count
            buf = buf[consumed_count:]
            if frame:
                self.frame_handler(frame)                               # 當進行驗證登入連線後,最後改方法呼叫_generic_frame_handler,如果為心跳
                                                                        # 則不處理,如果是頻道的方法則呼叫channel的方法處理
    def _next_channel_number(self):
        tries = 0
        limit = self.state.channel_max or 32767
        while self.next_channel in self.channels:
            self.next_channel = (self.next_channel + 1) % limit
            tries = tries + 1
            if self.next_channel == 0:
                self.next_channel = 1
            if tries > limit:
                raise NoFreeChannels()
        return self.next_channel

    def _set_channel(self, channel_number, channel):
        self.channels[channel_number] = channel

    def _ensure_channel(self, channel_number):
        if self.connection_close:                                   # 檢查連線沒有關閉
            raise ConnectionClosed(self.connection_close)
        return self.channels[channel_number]._ensure()              # 確保頻道沒有關閉

    def reset_channel(self, channel_number):                        # 刪除頻道號對應的handler和頻道號
        if channel_number in self.channels:
            del self.channels[channel_number]

    def send_frame(self, frame):
        marshalled_frame = frame.marshal()                          # 將通道和傳送的方法序列化
        self.bytes_sent = self.bytes_sent + len(marshalled_frame)   # 記錄總的傳送資料量
        self.outbound_buffer.write(marshalled_frame)                # 將資料寫入到寫緩衝區中
        #print 'Wrote %r' % (frame, )

    def send_method(self, channel_number, method, content = None): 
        self.send_frame(codec.FrameMethod(channel_number, method))      # 傳送幀,
        props = None
        body = None
        if isinstance(content, tuple):
            props = content[0]                                      # 如果是元組,則第一個資料為頭部屬性幀
            body = content[1]                                       # 第二個資料為內容幀
        else:
            body = content                                          # 只有內容幀
        if props: 
            length = 0
            if body: length = len(body)
            self.send_frame(codec.FrameHeader(channel_number, length, props))       # 傳送頭部幀資料
        if body:
            maxpiece = (self.state.frame_max - \
                        codec.ConnectionState.HEADER_SIZE - \
                        codec.ConnectionState.FOOTER_SIZE)                          # 獲取一個幀的最大長度
            body_buf = simplebuffer.SimpleBuffer( body )                            # 寫入到緩衝區
            while body_buf:
                piecelen = min(len(body_buf), maxpiece)                             # 比較如果要發出的資料大小與最大傳輸幀資料大小比較
                piece = body_buf.read_and_consume( piecelen )                       # 從緩衝區中讀出資料
                self.send_frame(codec.FrameBody(channel_number, piece))             # 傳送內容幀直到緩衝區為空

    def _rpc(self, channel_number, method, acceptable_replies):
        channel = self._ensure_channel(channel_number)                  # 檢查連線沒有關閉
        self.send_method(channel_number, method)                        # 傳送相應頻道的方法
        return channel.wait_for_reply(acceptable_replies)               # 註冊回撥函式

    def _login1(self, frame):
        if isinstance(frame, codec.FrameProtocolHeader):                                     # 第一次連結時,返回的幀頭一定是協議頭
            raise ProtocolVersionMismatch(self._local_protocol_header(),
                                          frame)

        if (frame.method.version_major, frame.method.version_minor) != spec.PROTOCOL_VERSION:   # 檢查客戶端與服務端支援的協議是否是一個版本
            raise ProtocolVersionMismatch(self._local_protocol_header(),
                                          frame)

        self.server_properties = frame.method.server_properties                             # 儲存服務端的屬性

        credentials = self.parameters.credentials or default_credentials                    # 認證資訊
        response = credentials.response_for(frame.method)                                   # 封裝使用者名稱與密碼資訊
        if not response:
            raise LoginError("No acceptable SASL mechanism for the given credentials",
                             credentials)
        self.send_method(0, spec.Connection.StartOk(client_properties = \
                                                      {"product": "Pika Python AMQP Client Library"},
                                                    mechanism = response[0],
                                                    response = response[1]))                 # 通過0號頻道返回服務端開啟ok函式併發送使用者名稱與密碼驗證
        self.erase_credentials()
        self.frame_handler = self._login2                                                    # 更換幀處理函式

    def erase_credentials(self):
        """Override if in some context you need the object to forget
        its login credentials after successfully opening a connection."""
        pass

    def _login2(self, frame):
        channel_max = combine_tuning(self.parameters.channel_max, frame.method.channel_max)
        frame_max = combine_tuning(self.parameters.frame_max, frame.method.frame_max)
        heartbeat = combine_tuning(self.parameters.heartbeat, frame.method.heartbeat)
        if heartbeat:
            self.heartbeat_checker = HeartbeatChecker(self, heartbeat)                       # 設定心跳包的引數
        self.state.tune(channel_max, frame_max)                                              # 設定好協議中的頻道號與頻道最大長度
        self.send_method(0, spec.Connection.TuneOk(
            channel_max = channel_max,
            frame_max = frame_max,
            heartbeat = heartbeat))                                                          # 給服務端返回TuneOk方法
        self.frame_handler = self._generic_frame_handler                                     # 更新幀處理函式
        self._install_channel0()                                                             # 註冊0頻道
        self.known_hosts = \
                         self._rpc(0, spec.Connection.Open(virtual_host = \
                                                               self.parameters.virtual_host,
                                                           insist = True),
                                   [spec.Connection.OpenOk]).known_hosts
        self.connection_open = True
        self.handle_connection_open()

    def is_alive(self):
        return self.connection_open and not self.connection_close

    def _install_channel0(self):
        c = channel.ChannelHandler(self, 0)
        c.async_map[spec.Connection.Close] = self._async_connection_close

    def channel(self):
        return channel.Channel(channel.ChannelHandler(self))                # 新建一個頻道

    def wait_for_open(self):
        while (not self.connection_open) and \
                (self.reconnection_strategy.can_reconnect() or (not self.connection_close)):
            self.drain_events()                                             # 確定連結已經開啟

    def drain_events(self):
        """Subclasses should override as required to wait for a few
        events -- perhaps running the dispatch loop once, or a small
        number of times -- and dispatch them, and then to return
        control to this method's caller, which will be waiting for
        something to have been set by one of the event handlers."""
        raise NotImplementedError('Subclass Responsibility')

    def _async_connection_close(self, method_frame, header_frame, body):
        self.send_method(0, spec.Connection.CloseOk())
        self._set_connection_close(method_frame.method)

    def _generic_frame_handler(self, frame):
        #print "GENERIC_FRAME_HANDLER", frame
        if isinstance(frame, codec.FrameHeartbeat):                                 # 如果是心跳幀則不處理
            pass # we already counted the received bytes for our heartbeat checker
        else: 
            self.channels[frame.channel_number].frame_handler(frame)                # 如果是其他型別的幀呼叫對應頻道的幀處理函式

connection主要就是接受服務端發來的資料,並根據協議解析成對應的幀,將要傳送出去的資料進行解析成amqp協議的二進位制資料。
其中connection對接受的資料解析主要是由ConnectionState類來實現,並將解析出來的資料返回給handler繼續處理。

class ConnectionState:
    HEADER_SIZE = 7                                     # 頭部資料大小
    FOOTER_SIZE = 1                                     # 尾部資料大小

    def __init__(self):
        self.channel_max = None
        self.frame_max = None
        self._return_to_idle()                          # 將資料重置

    def tune(self, channel_max, frame_max):
        self.channel_max = channel_max                  # 設定服務端返回的最大頻道數
        self.frame_max = frame_max                      # 服務端的最大幀長度

    def _return_to_idle(self):
        self.inbound_buffer = []
        self.inbound_available = 0
        self.target_size = ConnectionState.HEADER_SIZE                          # 先解析頭部資料大小
        self.state = self._waiting_for_header                                   # 等待解析頭部資料

    def _inbound(self):
        return ''.join(self.inbound_buffer)                                     # 將解析出的資料拼接

    def handle_input(self, received_data):
        total_bytes_consumed = 0                                                # 處理資料的統計

        while True:
            if not received_data:                                               # 如果處理完畢或者傳入資料為空,則返回處理的資料大小
                return (total_bytes_consumed, None)

            bytes_consumed = self.target_size - self.inbound_available          # target_size會根據接受的幀型別不同而變化,第一次接受頭幀,長度就為七
            if len(received_data) < bytes_consumed:                             
                bytes_consumed = len(received_data)

            self.inbound_buffer.append(received_data[:bytes_consumed])          # 處理頭部的資料
            self.inbound_available = self.inbound_av