1. 程式人生 > 其它 >服務中沒有listen_Odoo 中的 IM(即時通訊)實現分析

服務中沒有listen_Odoo 中的 IM(即時通訊)實現分析

技術標籤:服務中沒有listen

背景介紹

Odoo 是最好的開源企業應用系統,沒有之一。雖然有些技術已經落伍了,Odoo 的前端通過 JQuery 做的 MVC 跟當今的 React 的革命性前端程式設計正好差一個時代,不過Odoo 14 已經開始了 OWL 貌似要追上了。

丁貴金:為什麼 Odoo 選擇自行開發 OWL 貓頭鷹 Javascript 框架​zhuanlan.zhihu.com

經過近20年的持續發展,Odoo 積累了企業運營所需要的各種軟體;它本身還是一個完整的 WEB 應用開發框架(非常非常獨特的前端和後段統一的開發框架),可以自由開發各種 WEB 應用;開發中的遇到任何問題,可以通過閱讀已開源的Odoo 應用,看看類似的特性是如何做到的。

本文關注 Odoo 的即時通訊特性,Odoo 的即時通訊是 Odoo 框架的基礎部分,研究過後,發現其總體實現還是比較簡單的,基於 Odoo 是一個極端模組化的系統,Odoo 的即時通訊為 Odoo 模組化系統支援專門進行了設計,非常值得研究學習。

Odoo 即時通訊可以讓企業內部人員進行實時溝通,也可以讓企業內部人員和網站客戶進行實時溝通;同時 Odoo 將即時通訊的訊息與Bot 整合,為營銷自動化和服務智慧化乃至應用智慧化互動提供了基礎。

Odoo 即時通訊還在基礎架構上為 Odoo 應用提供了社交化支援,應用的資料可以很容易增加訂閱者,當資料更改的時候,訂閱者可以收到訊息,使用者也可以直接為資料新增評論,評論會推送給資料的訂閱者。所以我們經常能在 Odoo 中看到針對一個具體的資料表單下面可以有使用者的評論和這個資料本身更改的歷史資訊。

核心技術

資料庫訊息佇列

先說最重要的,Odoo 即時通訊使用了 PostgreSQL 資料庫的 listen 和 notify 的機制完成。這個機制是 PostgreSQL 資料庫私有的,其它資料庫未必支援。Odoo 依賴 PostgreSQL,這是原因之一。參考這裡可以瞭解更多關於 PostgreSQL listen notify 的資訊。

使用資料庫的 listen 和 notify 可以讓連線資料庫的各個客戶端之間進行實時通訊。值得注意的是,這個特性是 PostgreSQL 所特有的。

在 Odoo 中並沒有讓每個客戶都去使用資料庫的 listen 和 notify 這個功能,而是由 Odoo 統一使用的。客戶通過瀏覽器訪問 Odoo,只要說明自己關心的 Channels,並且通過 Event 非同步等待 (Event.wait())。Odoo 統一訪問 listen,然後根據 listen 返回的資料,解析出來這些資料是哪些 Channels,從而通知 Event (Event.set()),這樣客戶就知道自己有訊息要處理。

通過長連線

連線資料庫的客戶端不是 Odoo 的客戶端,資料庫的客戶端實際上是 Odoo 的服務端,是 Python 程式碼連線 資料庫;而 Odoo 客戶端是通過 Javascript 實現的 Web 應用,它通過長連線方式與 Odoo 後臺保持資訊的實時性。長連線的連結地址 URL 是 /longpolling/poll ,Odoo 客戶端會發起這個連線請求,如果有這個請求關注的 Channels 的訊息,那麼這個請求就會立即返回,如果沒有訊息,這個連線會嘗試保持 TIMEOUT 秒,目前 TIMEOUT 是50秒。Channels 就是會話標記,可以理解為一個聊天室、一個群等等,客戶 poll 資料的時候要寫上它關注的 Channels。

長連線的 URL 是 /longpolling/poll,在服務端為每次客戶請求建立一個 Event,然後把這個客戶想關注的 Channels(這個 Channels 是個陣列)關聯這個 Event,非同步等待這個Event;一個客戶連線只有一個 Event,但是多個 Channels,這些 Channels 關聯這個Event,一旦這些 Channels 中任何一個 Channel 有訊息,那麼這個 Event 就會被通知,客戶的長連線就會返回,客戶就知道有訊息了,可以進行下一步動作,比如獲取訊息的詳情。如果系統中沒有任何訊息跟客戶的這些 Channels 有關係,客戶就會一直在 Event 上非同步等待,直到超時。超時結束後長連線就會結束,客戶端會重新發起連線請求,再次進入等待的狀態。

非同步處理

如果很多使用者同時使用 Odoo,那麼 Odoo 為每個客戶保持一個連線,這是無疑問的,因為沒有連線就沒有辦法實時推送資料;但是每個連線是在一個執行緒裡面呢,還是多個呢?答案就是Odoo 只為 longpolling 維護了一個執行緒或者一個程序(gevent)。即一個程序或者執行緒,但是多個連線。如果你啟動 Odoo 的時候使用了 worker 引數,就意味這 Odoo 要以多程序方式運作,如果沒有指定 woker 就是多執行緒方式,如果你啟動的是執行緒模式,longpolling 將是一個執行緒,如果你啟動的是 worker (程序)模式那麼 Odoo 會通過 Popen 一個全新程序,這個全新程序的命令列 加上 gevent,很怪異吧,確實就是這麼幹的。

def long_polling_spawn(self):
        nargs = stripped_sys_argv()
        cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:]
        popen = subprocess.Popen(cmd)
        self.long_polling_pid = popen.pid

把原來的命令列插入一個 gevent,再啟動一遍。當然後續的程式碼會判斷如果是以 gevent 啟動命令的,這是要啟動 longpolling。

gevent 在 Python 3 asyncio 的大環境下是個過時的技術了,它使用了 Monkey Patch 的方式對 Python 庫進行了非同步化,感覺程式碼的書寫方式還是一樣,但是已經非同步化了。好處是程式碼在沒有 gevent 的時候可以同步跑,引入 gevent 後不用改變程式碼邏輯就可以非同步化。有人會問,非同步化有啥好處啊?非同步化可以讓 Odoo 同時處理多個連線,就這麼簡單,如果沒有非同步化,一個連線就佔用了 Odoo,如果這個連線的任務沒有完成,別的連線就進不來,解決這個問題的老方法是啟動更多程序,但是程序的方式太重了,隨著網際網路服務的普及,開發人員發現實際上只需要維護 I/O 並不需要啟動多個執行緒或者多個程序,只需要維護好檔案描述符,並且能夠正確發現這些描述符什麼時候該讀什麼時候該寫。selelct,poll,epoll 一步一步把非同步I/O的效能榨乾了最後一滴血。

在 Python 2 的時候,Python 沒有內建非同步 I/O 的功能,所以 gevent,Tornado 都是解決 Python 非同步 I/O 問題的。Odoo 使用了 gevent,當 longpolling 服務正在服務一個客戶端的時候,也沒有任何訊息給這個客戶端,那麼這個客戶端將保持連線 50 秒,這個等待過程並不會抓著 CPU 不放,因為它只是等待一個事件的發生,並不需要計算。

def loop(self):
        """ Dispatch postgres notifications to the relevant polling threads/greenlets """
        _logger.info("Bus.loop listen imbus on db postgres")
        with odoo.sql_db.db_connect('postgres').cursor() as cr:
            conn = cr._cnx
            cr.execute("listen imbus")
            cr.commit();
            while True:
                # 非同步監聽檔案描述符,三個陣列分別是指可讀監控,可寫監控,異常監控,
                # 如果陣列中的檔案描述符符合監控的條件就會返回
                if select.select([conn], [], [], TIMEOUT) == ([], [], []):
                    # 如果超時了
                    pass
                else:
                    # 發現 conn 這個檔案描述符可以讀
                    conn.poll()
                    channels = []
                    # 讀出所有的 notifies
                    while conn.notifies:
                        # 每個 notify 的資料 payload 就是 channel
                        channels.extend(json.loads(conn.notifies.pop().payload))
                    # dispatch to local threads/greenlets
                    events = set()
                    for channel in channels:
                        # 注意這裡面用到 self.channels 這裡面是 channel 和 event 的對映字典
                        # 通過 set 能夠排除重複的 event,每個channel 對應一個 event 的集合,
                        # 就是可能很多客戶在等候這個 channel,當然也可能沒有任何客戶在等候這個 channel
                        # 那麼就是空集合
                        events.update(self.channels.pop(hashable(channel), set()))
                    for event in events:
                        # event 能夠在多個協程之間通訊,客戶使用 event wait在非同步等待,這裡面通過
                        # event set 通知等待可以結束了。
                        event.set()

上邊的這段程式碼在 bus 模組裡面,Odoo 只有一個執行緒或者 gevent 程式去 listen 系統所有的 imbus 上的訊息,notify imbus 的訊息都會讓 select 返回準備好的檔案描述符(不是空的,所以就不會等於 ([],[],[])),收到資料庫的訊息通知後,通過分析訊息知道這些訊息是什麼 Channels (實際上,訊息的內容就是 Channel 的 Hash),通過 Channel 找到其對應的 Event,Event 一定對應一個客戶端的長連線。執行 Event set 來通知那些 wait 在 Event 上的客戶。具體過程可以閱讀我在程式碼裡面寫的註釋。

Channels 的 Overload

每次 longpolling 的 poll 請求都要帶上這個使用者想要關注的 Channels,而 使用者怎麼知道自己要 polling 什麼 Channels 呢?

Channels 一般來自兩種可能,一個是同一種應用導致的會話數量的增加,比如線上客服,每個新訪客都有可能跟 Odoo 的使用者建立一個 Channel 就是會話,這樣就會有很多會話。

還有一種可能就是,Odoo 有很多應用,每個應用都會有自己建立或者判斷 Channel 的方式,線上客服是 Odoo 的一個應用,CRM 也是一個應用,每個應用對 Channel 的標記和維護方法各不相同,一般是一個元組 (db,table,id) 再 hashable 或者文字化一下,就變成字串,作為 Channel 的唯一標記,具體有多少個這樣的 Channels 也是儲存在各自應用的表裡面。所以 bus 應用的 Controller 提供了一個可以 Overload 的機會來修改 Channels,就是 _poll。

# override to add channels
    def _poll(self, dbname, channels, last, options):
        # update the user presence
        if request.session.uid and 'bus_inactivity' in options:
            request.env['bus.presence'].update(options.get('bus_inactivity'))
        request.cr.close()
        request._cr = None        
        return dispatch.poll(dbname, channels, last, options)

‘ override to add channels‘ 輕描淡寫的註釋暴露了它存在的意義。 一個客戶通過瀏覽器與 Odoo 建立的長連線就是一個longpolling 的HTTP 請求,這個HTTP請求通過 _poll 這個函式呼叫 dispatch poll 去非同步等待資料庫的 notify,過載這個函式可以有機會在真正執行 dispatch poll 之前收集 Channels。

再看 mail 應用下的 controller 對這個函式的 overload。

# --------------------------
# Extends BUS Controller Poll
# --------------------------
def _poll(self, dbname, channels, last, options):
    if request.session.uid:
        partner_id = request.env.user.partner_id.id

    if partner_id:
        channels = list(channels)       # do not alter original list
        for mail_channel in request.env['mail.channel'].search([('channel_partner_ids', 'in', [partner_id])]):
            channels.append((request.db, 'mail.channel', mail_channel.id))
            # personal and needaction channel
        channels.append((request.db, 'res.partner', partner_id))
        channels.append((request.db, 'ir.needaction', partner_id))
    return super(MailChatController, self)._poll(dbname, channels, last, options)

把在 mail (就是討論應用)中需要的 channels 都圈出來提供給 bus 應用去處理。

看懂上段程式碼需要一點點背景知識,Odoo 中所有的 人/使用者/客戶/訪客/公司 都是用 res.partner 這個表來維護的。

上段程式碼潛藏了一個 Odoo 的知識,如何搜尋 many2many 的欄位 (channel partner ids),因為 many2many 是 Odoo 加了一箇中間表實現的,在搜尋過程中可以看出,這些中間資訊已經被隔離了。

channel partner ids 是在 mail channel 中對應的 partner id,在 res partner 表中也有 partner 對應的 mail channel。這是一個多對多的關係,一個 mail channel 可以含有多個 partner,一個partner 可以在多個 mail channel 中,這很自然,人可以在很多對話中,對話中含有很多人 。這段程式碼搜尋 mail channel 表中 channel partner ids 中包含使用者 id 的所有記錄,然後把這些記錄按照(db,table,id)的形式合成Channel。

最後加上 res partner 和 ir needaction 關於partner id 的 Channel,不負責任的猜測一下(尚未查證),res partner 這個 Channel 的含義可能是當這個使用者以使用者身份訂閱其他資料記錄的修改的 Channel,因為它指向這個訪問者,所以如果以訪問者身份去操作導致的訊息可能需要這個Channel;ir needaction 這個 Channel 的含義可能是需要這個使用者執行 Activity 的時候傳送的通知所使用的 Channel,顧名思義哈,Odoo 中有 Activity 模組,這也是一個基礎模組,各個應用模組都可以使用,它的意思是提醒 Odoo 使用者去進行一些計劃行為動作,比如讓使用者打電話,開會,處理工單等等。

其它

Odoo 的即時通訊幾乎都在 bus 這個 addon 下面,但是在odoo 全域性的程式碼中也有很多配合的 code,比如上文提到的 gevent 命令列;還有更加複雜的部分,就是 WSGI 和 資料連線的處理部分,由於 longpolling 同時重用了普通 httprequest 和資料庫執行環境 (registry,Enviroments,Enviroment,cusor),這段程式碼比較亂,不如 addon 裡面的結構清晰,當然可能也是為了讓 addon 結構清晰,不得不做出的妥協。值得說明的是,當 longpolling 的請求來的時候,WSGI 請求自帶的 Odoo 資料庫執行環境會被拋棄,而是每次請求重新再次建立:

event.wait(timeout=timeout)
with registry.cursor() as cr:
  env = api.Environment(cr, SUPERUSER_ID, {})
  notifications = env['bus.bus'].poll(channels, last, options)

讓我們知道了 Odoo 如何每次建立資料環境。如果不是每次建立環境那麼這裡的資料操作別的客戶不會同時發現的。

通過分析 Odoo 的 IM 實現過程可以看出 Odoo 的技術的確有點過時了,跟蹤的不夠猛。因為 Python 3 已經支援 asyncio 了,關於 asyncio 可以讀讀這個 blog 。

如果通過 asyncio 去實現,我的思路是在 asyncio 中加入 postgresql connection 的描述符,就是上邊用來select 的,watching 這個描述符。當有資料的時候 callback 就會執行,再去通過 asyncio 的 locks 中的 Event 去 set()。用 asyncio.wait_for(event.wait(), timeout) 來響應使用者的請求,使用者的 HTTP 請求就會被阻塞直到 Event 被 set 或者超時,而 CPU 會被讓出,完美。

`loop.``add_reader`(*fd*, *callback*, **args*)

Start monitoring the *fd* file descriptor for read availability and invoke *callback* with the specified arguments once *fd* is available for reading.

這樣就用原生的 Python 3 解決了,不需要引入 gevent,也不需要引入非同步的 PostgreSQL Python 庫,重用原來的 psycopg2 阻塞庫。