1. 程式人生 > 程式設計 >從零開始開發IM(即時通訊)服務端(二)

從零開始開發IM(即時通訊)服務端(二)

好訊息:IM1.0.0版本已經上線啦,支援特性

  • 私聊傳送文字/檔案
  • 已傳送/已送達/已讀回執
  • 支援使用ldap登入
  • 支援接入外部的登入認證系統
  • 提供客戶端jar包,方便客戶端開發

github連結: github.com/yuanrw/IM

本篇將帶大家從零開始搭建一個輕量級的IM服務端,IM的整體設計思路和架構在我的上篇部落格中已經講過了,沒看過的同學請點選從零開始開發IM(即時通訊)服務端

這篇將給大家帶來更多的細節實現。我將從三個方面來闡述如何構建一個完整可靠的IM系統。

  1. 可靠性
  2. 安全性
  3. 儲存設計

可靠性

什麼是可靠性?對於一個IM系統來說,可靠的定義至少是不丟訊息訊息不重複不亂序,滿足這三點,才能說有一個好的聊天體驗。

不丟訊息

我們先從不丟訊息開始講起。

首先複習一下上一篇設計的服務端架構

im-structure.png

我們先從一個簡單例子開始思考:當Alice給Bob傳送一條訊息時,可能要經過這樣一條鏈路:

route

  1. client-->connecter
  2. connector-->transfer
  3. transfer-->connector
  4. connector-->client

在這整個鏈路中的每個環節都有可能出問題,雖然tcp協議是可靠的,但是它只能保證鏈路層的可靠,無法保證應用層的可靠。

例如在第一步中,connector收到了從client發出的訊息,但是轉發給transfer失敗,那麼這條訊息Bob就無法收到,而Alice也不會意識到訊息傳送失敗了。

如果Bob狀態是離線,那麼訊息鏈路就是:

  1. client-->connector
  2. connector-->transfer
  3. transfer-->mq

如果在第三步中,transfer收到了來自connector的訊息,但是離線訊息入庫失敗, 那麼這個訊息也是傳遞失敗了。
為了保證應用層的可靠,我們必須要有一個ack機制,使傳送方能夠確認對方收到了這條訊息。

具體的實現,我們模仿tcp協議做一個應用層的ack機制。

tcp的報文是以位元組(byte)為單位的,而我們以message單位。

ack
傳送方每次傳送一個訊息,就要等待對方的ack迴應,在ack確認訊息中應該帶有收到的id以便傳送方識別。

其次,傳送方需要維護一個等待ack的佇列。 每次傳送一個訊息之後,就將訊息和一個計時器入隊。

另外存在一個執行緒一直輪詢佇列,如果有超時未收到ack的,就取出訊息重發。

超時未收到ack的訊息有兩種處理方式:

  1. 和tcp一樣不斷髮送直到收到ack為止。
  2. 設定一個最大重試次數,超過這個次數還沒收到ack,就使用失敗機制處理,節約資源。例如如果是connector長時間未收到client的ack,那麼可以主動斷開和客戶端的連線,剩下未傳送的訊息就作為離線訊息入庫,客戶端斷連後嘗試重連伺服器即可。

不重複、不亂序

有的時候因為網路原因可能導致ack收到較慢,傳送方就會重複傳送,那麼接收方必須有一個去重機制。
去重的方式是給每個訊息增加一個唯一id。這個唯一id並不一定是全域性的,只需要在一個會話中唯一即可。例如某兩個人的會話,或者某一個群。如果網路斷連了,重新連線後,就是新的會話了,id會重新從0開始。

接收方需要在當前會話中維護收到的最後一個訊息的id,叫做lastId
每次收到一個新訊息, 就將id與lastId作比較看是否連續,如果不連續,就放入一個暫存佇列 queue中稍後處理。

例如:

  • 當前會話的lastId=1,接著伺服器收到了訊息msg(id=2),可以判斷收到的訊息是連續的,就處理訊息,將lastId修改為2。

  • 但是如果伺服器收到訊息msg(id=3),就說明訊息亂序到達了,那麼就將這個訊息入隊,等待lastId變為2後,(即伺服器收到訊息msg(id=2)並處理完了),再取出這個訊息處理。

因此,判斷訊息是否重複只需要判斷msgId>lastId && !queue.contains(msgId)即可。如果收到重複的訊息,可以判斷是ack未送達,就再傳送一次ack。

接收方收到訊息後完整的處理流程如下:

offer.png

虛擬碼如下:

class ProcessMsgNode{
    /**
     * 接收到的訊息
     */
    private Message message;
    /**
     * 處理訊息的方法
     */
    private Consumer<Message> consumer;
}

public CompletableFuture<Void> offer(Long id,Message     message,Consumer<Message> consumer) {
    if (isRepeat(id)) {
    //訊息重複
        sendAck(id);
        return null;
    }
    if (!isConsist(id)) {
    //訊息不連續
        notConsistMsgMap.put(id,new ProcessMsgNode(message,consumer));
        return null;
    }
    //處理訊息
    return process(id,message,consumer);
}

private CompletableFuture<Void> process(Long id,Message message,Consumer<Message> consumer) {
    return CompletableFuture
        .runAsync(() -> consumer.accept(message))
        .thenAccept(v -> sendAck(id))
        .thenAccept(v -> lastId.set(id))
        .thenComposeAsync(v -> {
            Long nextId = nextId(id);
            if (notConsistMsgMap.containsKey(nextId)) {
                //佇列中有下個訊息
                ProcessMsgNode node = notConsistMsgMap.get(nextId);
                return process(nextId,node.getMessage(),consumer);
            } else {
                //佇列中沒有下個訊息
                CompletableFuture<Void> future = new CompletableFuture<>();
                future.complete(null);
                return future;
            }
        })
        .exceptionally(e -> {
            logger.error("[process received msg] has error",e);
            return null;
        });
}
複製程式碼

安全性

無論是聊天記錄還是離線訊息,肯定都會在服務端儲存備份,那麼訊息的安全性,保護客戶的隱私也至關重要。
因此所有的訊息都必須要加密處理。
在儲存模組裡,維護使用者資訊和關係鏈有兩張基礎表,分別是im_user使用者表和im_relation關係連結串列。

  • im_user表用於存放使用者常規資訊,例如使用者名稱密碼等,結構比較簡單。
  • im_relation表用於記錄好友關係,結構如下:
CREATE TABLE `im_relation` (
  `id` bigint(20) COMMENT '關係id',`user_id1` varchar(100) COMMENT '使用者1id',`user_id2` varchar(100) COMMENT '使用者2id',`encrypt_key` char(33) COMMENT 'aes金鑰',`gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,`gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`),UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)
);
複製程式碼
  • user_id1user_id2是互為好友的使用者id,為了避免重複,儲存時按照user_id1<user_id2的順序存,並且加上聯合索引。
  • encrypt_key是隨機生成的金鑰。當客戶端登入時,就會從資料庫中獲取該使用者的所有的relation,存在記憶體中,以便後續加密解密。
  • 當客戶端給某個好友傳送訊息時,取出記憶體中該關係的金鑰,加密後傳送。同樣,當收到一條訊息時,取出相應的金鑰解密。

客戶端完整登入流程如下:

login process

  1. client呼叫rest介面登入。
  2. client呼叫rest介面獲取該使用者所有relation
  3. client向connector傳送greet訊息,通知上線。
  4. connector拉取離線訊息推送給client。
  5. connector更新使用者session。

那為什麼connector要先推送離線訊息再更新session呢?我們思考一下如果順序倒過來會發生什麼:

  1. 使用者Alice登入伺服器
  2. connector更新session
  3. 推送離線訊息
  4. 此時Bob傳送了一條訊息給Alice

如果離線訊息還在推送的過程中,Bob傳送了新訊息給Alice,伺服器獲取到Alice的session,就會立刻推送。這時新訊息就有可能夾在一堆離線訊息當中推過去了,那這時,Alice收到的訊息就亂序了。

而我們必須保證離線訊息的順序在新訊息之前。

那麼如果先推送離線訊息,之後才更新session。在離線訊息推送的過程中,Alice的狀態就是“未上線”,這時Bob新傳送的訊息只會入庫im_offlineim_offline表中的資料被讀完之後才會“上線”開始接受新訊息。這也就避免了亂序。

儲存設計

儲存離線訊息

當使用者不線上時,離線訊息必然要儲存在服務端,等待使用者上線再推送。理解了上一個小節後,離線訊息的儲存就非常容易了。增加一張離線訊息表im_offline,表結構如下:

CREATE TABLE `im_offline` (
  `id` int(11) COMMENT '主鍵',`msg_id` bigint(20) COMMENT '訊息id',`msg_type` int(2) COMMENT '訊息型別',`content` varbinary(5000) COMMENT '訊息內容',`to_user_id` varchar(100) COMMENT '收件人id',`has_read` tinyint(1) COMMENT '是否閱讀',`gmt_create` timestamp COMMENT '建立時間',PRIMARY KEY (`id`)
);
複製程式碼

msg_type用於區分訊息型別(chat,ack),content加密後的訊息內容以byte陣列的形式儲存。
使用者上線時按照條件to_user_id=使用者id拉取記錄即可。

防止離線訊息重複推送

我們思考一下多端登入的情況,Alice有兩臺裝置同時登陸,在這種併發的情況下,我們就需要某種機制來保證離線訊息只被讀取一次。

這裡利用CAS機制來實現:

  1. 首先取出所有has_read=false的欄位。
  2. 檢查每條訊息的has_read值是否為false,如果是,則改為true。這是原子操作。
update im_offline set has_read = true where id = ${msg_id} and has_read = false
複製程式碼
  1. 修改成功則推送,失敗則不推送。

相信到這裡,同學們已經可以自己動手搭建一個完整可用的IM服務端了。更多問題歡迎評論區留言~~

IM1.0.0版本已上線,github連結: github.com/yuanrw/IM
覺得對你有幫助請點個star吧~!