1. 程式人生 > >(轉)開源專案t-io

(轉)開源專案t-io

石墨文件:https://shimo.im/docs/tHwJJcvKl2AIiCZD/

(二期)18、開源t-io專案解讀

【課程18】BIO、...AIO.xmind0.4MB

【課程18】t-io簡介.xmind0.2MB

【課程18】兩個官方例子.xmind0.3MB

【課程18】同步異...阻塞.xmind0.3MB

【課程18預習】百萬...t-io.xmind0.3MB

 

t-io是什麼

官網相關

官網:https://t-io.org/

宣傳:不僅僅是百萬級網路通訊框架,讓天下沒有難開發的網路通訊

git地址:https://gitee.com/tywo45/t-io

t-io手冊:https://t-io.org/doc/index.html

 

t-io.pdf3MB

 

t-io與websocket

t-io:是一個網路框架,從這一點來說是有點像 netty 的,但 t-io 為常見和網路相關的業務(如 IM、訊息推送、RPC、監控)提供了近乎於現成的解決方案,即豐富的程式設計 API,極大減少業務層的程式設計難度。

 

websocket: WebSocket協議是基於TCP的一種新的網路協議。它實現了瀏覽器與伺服器全雙工(full-duplex)通訊——允許伺服器主動傳送資訊給客戶端。

 

預備知識AIO、NIO、Socket 同步、非同步、阻塞、非阻塞

先來個例子理解一下概念,以銀行取款為例: 

  • 同步 : 自己親自出馬持銀行卡到銀行取錢(使用同步IO時,Java自己處理IO讀寫);
  • 非同步 : 委託一小弟拿銀行卡到銀行取錢,然後給你(使用非同步IO時,Java將IO讀寫委託給OS處理,需要將資料緩衝區地址和大小傳給OS(銀行卡和密碼),OS需要支援非同步IO操作API);

  • 阻塞 : ATM排隊取款,你只能等待(使用阻塞IO時,Java呼叫會一直阻塞到讀寫完成才返回);
  • 非阻塞 : 櫃檯取款,取個號,然後坐在椅子上做其它事,等號廣播會通知你辦理,沒到號你就不能去,你可以不斷問大堂經理排到了沒有,大堂經理如果說還沒到你就不能去(使用非阻塞IO時,如果不能讀寫Java呼叫會馬上返回,當IO事件分發器會通知可讀寫時再繼續進行讀寫,不斷迴圈直到讀寫完成)

 

IO的方式通常分為幾種,同步阻塞的BIO、同步非阻塞的NIO、非同步非阻塞的AIO。

同步阻塞BIO
  • 一個連線一個執行緒

在JDK1.4出來之前,我們建立網路連線的時候採用BIO模式,需要先在服務端啟動一個ServerSocket,然後在客戶端啟動Socket來對服務端進行通訊,預設情況下服務端需要對每個請求建立一堆執行緒等待請求,而客戶端傳送請求後,先諮詢服務端是否有執行緒相應,如果沒有則會一直等待或者遭到拒絕請求,如果有的話,客戶端會執行緒會等待請求結束後才繼續執行。

 

優化:弄一個執行緒池來管理執行緒。即偽非同步阻塞IO

同步非阻塞NIO
  • 一個請求一個執行緒

NIO本身是基於事件驅動思想來完成的,其主要想解決的是BIO的大併發問題:每個客戶端請求必須使用一個執行緒單獨來處理。問題在於系統本身對執行緒總數有一定限制,容易癱瘓。

 

NIO,當socket有流可讀或可寫入socket時,作業系統會相應的通知引用程式進行處理,應用再將流讀取到緩衝區或寫入作業系統。  也就是說,這個時候,已經不是一個連線就要對應一個處理執行緒了,而是有效的請求,對應一個執行緒,當連線沒有資料時,是沒有工作執行緒來處理的。

 

BIO與NIO一個比較重要的不同,是我們使用BIO的時候往往會引入多執行緒,每個連線一個單獨的執行緒;而NIO則是使用單執行緒或者只使用少量的多執行緒,每個連線共用一個執行緒。

 

** NIO的最重要的地方是當一個連線建立後,不需要對應一個執行緒,這個連線會被註冊到多路複用器上面,所以所有的連線只需要一個執行緒就可以搞定,當這個執行緒中的多路複用器進行輪詢的時候,發現連線上有請求的話,才開啟一個執行緒進行處理,也就是一個請求一個執行緒模式。

非同步非阻塞AIO

對於讀操作而言,當有流可讀取時,作業系統會將可讀的流傳入read方法的緩衝區,並通知應用程式;對於寫操作而言,當作業系統將write方法傳遞的流寫入完畢時,作業系統主動通知應用程式。  即可以理解為,read/write方法都是非同步的,完成後會主動呼叫回撥函式。

 

總結

Java對BIO、NIO、AIO的支援:

  • Java BIO : 同步並阻塞,伺服器實現模式為一個連線一個執行緒,即客戶端有連線請求時伺服器端就需要啟動一個執行緒進行處理,如果這個連線不做任何事情會造成不必要的執行緒開銷,當然可以通過執行緒池機制改善。
  • Java NIO : 同步非阻塞,伺服器實現模式為一個請求一個執行緒,即客戶端傳送的連線請求都會註冊到多路複用器上,多路複用器輪詢到連線有I/O請求時才啟動一個執行緒進行處理。
  • Java AIO(NIO.2) : 非同步非阻塞,伺服器實現模式為一個有效請求一個執行緒,客戶端的I/O請求都是由OS先完成了再通知伺服器應用去啟動執行緒進行處理

BIO、NIO、AIO適用場景分析:

  • BIO方式適用於連線數目比較小且固定的架構,這種方式對伺服器資源要求比較高,併發侷限於應用中,JDK1.4以前的唯一選擇,但程式直觀簡單易理解。
  • NIO方式適用於連線數目多且連線比較短(輕操作)的架構,比如聊天伺服器,併發侷限於應用中,程式設計比較複雜,JDK1.4開始支援。
  • AIO方式使用於連線數目多且連線比較長(重操作)的架構,比如相簿伺服器,充分呼叫OS參與併發操作,程式設計比較複雜,JDK7開始支援。
t-io框架 使用場景

常用關鍵類
  • ChannelContext(通道上下文)
  • 每一個 tcp 連線的建立都會產生一個 ChannelContext 物件
  • (1)ServerChannelContext
  • ChannelContext 的子類,當用 tio 作 tcp 伺服器時,業務層接觸的是這個類的例項。
  • (2)ClientChannelContext
  • ChannelContext 的子類,當用 tio 作 tcp 客戶端時,業務層接觸的是這個類的例項

  • GroupContext(服務配置與維護)
  • GroupContext 就是用來配置執行緒池、確定監聽埠,維護客戶端各種資料等的
  • ClientGroupContext
  • ServerGroupContext

我們在寫 TCP Server 時,都會先選好一個埠以監聽客戶端連線,再建立 N 組執行緒池來執行相關的任 務,譬如傳送訊息、解碼資料包、處理資料包等任務,還要維護客戶端連線的各種資料,為了和業務互動, 還要把這些客戶端連線和各種業務資料繫結起來,譬如把某個客戶端繫結到一個群組,繫結到一個 userid, 繫結到一個 token 等。GroupContext 就是用來配置執行緒池、確定監聽埠,維護客戶端各種資料等的。

 

GroupContext 是個抽象類,如果你是用 tio 作 tcp 客戶端,那麼你需要建立 ClientGroupContext,如 果你是用 tio 作 tcp 伺服器,那麼你需要建立 ServerGroupContext

  • AioHandler(訊息處理介面)
  • 處理訊息的核心介面,它有兩個子介面
  • ClientAioHandler
  • ServerAioHandler
  • AioListener(通道監聽者)
  • 處理事件監聽的核心介面,它有兩個子介面,
  • ClientAioListener
  • ServerAioListener
  • Packet(應用層資料包)
  • TCP 層過來的資料,都會被 tio 要求解碼成 Packet 物件,應用都需要繼承這個類,從而實現自己的業務 資料包。

 

用於應用層與傳輸層的資料傳遞

傳輸層在往應用層傳遞資料時,並不保證每次傳遞的資料是一個完整的應用層資料包(以 http 協議為 例,就是並不保證應用層收到的資料剛好可以組成一個 http 包),這就是我們經常提到的半包粘包。傳輸層只負責傳遞 byte[]資料,應用層需要自己對 byte[]資料進行解碼,以 http 協議為例,就是把 byte[] 解碼成 http 協議格式的字串。

 

  • AioServer(tio 服務端入口類)

 

  • AioClient(tio 客戶端入口類)

 

  • ObjWithLock(自帶讀寫鎖的物件)
  • 是一個自帶了一把(讀寫)鎖的普通物件(一般是集合物件),每當要對 這個物件進行同步安全操作(併發下對集合進行遍歷或對集合物件進行元素修改刪除增加)時,就得用這個 鎖。

t-io是基於tcp層協議的一個網路框架,所以在應用層與tcp傳輸層之間設計到一個數據的編碼與解碼問題,t-io讓我們能自定義資料協議,所以需要我們自己手動去編碼解碼過程。

入門例子HelloWord
  • git專案地址

https://gitee.com/tywo45/tio-showcase

 

git裡面有個幾個例子,首先我們看helloword的例子:

 

業務邏輯

本例子演示的是一個典型的TCP長連線應用,大體業務簡介如下。

  • 分為server和client工程,server和client共用common工程
  • 服務端和客戶端的訊息協議比較簡單,訊息頭為4個位元組,用以表示訊息體的長度,訊息體為一個字串的byte[]
  • 服務端先啟動,監聽6789埠
  • 客戶端連線到服務端後,會主動向伺服器傳送一條訊息
  • 伺服器收到訊息後會迴應一條訊息
  • 之後,框架層會自動從客戶端發心跳到伺服器,伺服器也會檢測心跳有沒有超時(這些事都是框架做的,業務層只需要配一個心跳超時引數即可)
  • 框架層會在斷鏈後自動重連(這些事都是框架做的,業務層只需要配一個重連配置物件即可)

具體類說明:

server端
  • 匯入核心包
<dependency>
   <groupId>org.t-io</groupId>
   <artifactId>tio-core</artifactId>
</dependency>
  • HelloServerStarter

構造ServerGroupContext,main方法啟動服務

  • HelloServerAioHandler

實現ServerAioHandler介面,重寫decodeencode,handler方法。

common
  • HelloPacket

繼承Packet類。自定義資料包的內容

  • Const

常量類,配置ip,埠等資訊

client端
  • HelloClientStarter

構造clientGroupContext,連線節點,使用資訊傳送訊息

  • HelloClientAioHandler

實現ClientAioHandler介面,重寫decode,encode,handler方法。

流程圖
  • idea請安裝PlantUML intergration外掛

客戶端與服務端溝通時序圖.puml0.8KB

Server端初始化時序圖.puml0.6KB

 

(初始化伺服器)

 

 

(客戶端與服務端通訊流程)

 

入門例子showcase
  • git專案地址

https://gitee.com/tywo45/tio-showcase

 

上面講的是helloword的例子,比較簡單,接下來的是showcase的例子,結合實際場景的一個例子。

 

業務邏輯

tio的框架初始化使用過程是一樣的。不過因為showcase中涉及到的互動越來越多,因此不能像helloword例子中的HelloPacket只有body這麼簡單了,我們至少要加上一個type引數(訊息型別),這樣的話伺服器獲取到資料包之後再更加type來選擇訊息處理類,從而拓展系統可用性。

 

(客戶端與伺服器溝通流程)

客戶端與服務端溝通時序圖.puml1.4KB

 

 

客戶端發起登入操作

  • org.tio.examples.showcase.client.ShowcaseClientStarter#processCommand
LoginReqBody loginReqBody = new LoginReqBody();
loginReqBody.setLoginname(loginname);
loginReqBody.setPassword(password);
 
  
ShowcasePacket reqPacket = new ShowcasePacket();
#這裡指定訊息型別
reqPacket.setType(Type.LOGIN_REQ);
reqPacket.setBody(Json.toJson(loginReqBody).getBytes(ShowcasePacket.CHARSET));
 
  
Tio.send(clientChannelContext, reqPacket);
  • LoginReqBody:登入請求引數封裝類,繼承BaseBody
  • ShowcasePacket:資料包,繼承Packet,和ByteBuffer相互轉換
  • clientChannelContext:連線上下文通道

服務端接受請求操作

  • org.tio.examples.showcase.server.ShowcaseServerAioHandler#handler
ShowcasePacket showcasePacket = (ShowcasePacket) packet;
#獲取訊息型別
Byte type = showcasePacket.getType();
#根據訊息型別找到對應的訊息處理類
AbsShowcaseBsHandler<?> showcaseBsHandler = handlerMap.get(type);
if (showcaseBsHandler == null) {
   log.error("{}, 找不到處理類,type:{}", channelContext, type);
   return;
}
#執行訊息處理。訊息處理類必須繼承AbsShowcaseBsHandler
showcaseBsHandler.handler(showcasePacket, channelContext);
  • handlerMap:存有當前所有訊息處理類的map。資料包中包含訊息型別,會根據訊息型別獲取對應的訊息處理類,而這個訊息處理類會呼叫handler()方法處理資料。
  • AbsShowcaseBsHandler:訊息處理抽象類,繼承這個類的處理類會對一種訊息型別進行處理,並且一般專門處理一種訊息封裝類(繼承BaseBody的封裝類)。

訊息型別對應訊息處理類的初始化

private static Map<Byte, AbsShowcaseBsHandler<?>> handlerMap = new HashMap<>();
static {
   #把訊息型別與訊息處理類對映起來 
   handlerMap.put(Type.GROUP_MSG_REQ, new GroupMsgReqHandler());
   handlerMap.put(Type.HEART_BEAT_REQ, new HeartbeatReqHandler());
   handlerMap.put(Type.JOIN_GROUP_REQ, new JoinGroupReqHandler());
   handlerMap.put(Type.LOGIN_REQ, new LoginReqHandler());
   handlerMap.put(Type.P2P_REQ, new P2PReqHandler());
}

 

如果接收到的訊息型別是P2P_REQ,那麼處理類就是P2PReqHandler:

  • org.tio.examples.showcase.server.handler.P2PReqHandler#handler
log.info("收到點對點請求訊息:{}", Json.toJson(bsBody));
 
  
ShowcaseSessionContext showcaseSessionContext = (ShowcaseSessionContext) channelContext.getAttribute();
 
  
P2PRespBody p2pRespBody = new P2PRespBody();
p2pRespBody.setFromUserid(showcaseSessionContext.getUserid());
p2pRespBody.setText(bsBody.getText());
 
  
ShowcasePacket respPacket = new ShowcasePacket();
respPacket.setType(Type.P2P_RESP);
respPacket.setBody(Json.toJson(p2pRespBody).getBytes(ShowcasePacket.CHARSET));
Tio.sendToUser(channelContext.groupContext, bsBody.getToUserid(), respPacket);
springboot整合t-io

專案執行:https://github.com/fanpan26/SpringBootLayIM.git

由於layim是付費產品,所以在網路上找了一個。(僅供學習哈)

需要放到專案目錄下面

static/js/layui/lay/modules/layim.js

 

layim.js33.3KB

 

專案結構

專案整合:

因為這裡需要和瀏覽器之間進行通訊,所以需要用到websocket機制。tio有整合websocket的框架,所以直接匯入即可。

<dependency>
    <groupId>org.t-io</groupId>
    <artifactId>tio-websocket-server</artifactId>
    <version>0.0.5-tio-websocket</version>
</dependency>

程式碼結構

Guava - EventBus(事件匯流排)

專案使用Guava的EventBus替代了Spring的ApplicationEvent事件機制。

Guava的EventBus使用介紹如下:

 

事件定義

EventBus為我們提供了register方法來訂閱事件,不需要實現任何的額外介面或者base類,只需要在訂閱方法上標註上@Subscribe和保證只有一個輸入引數的方法就可以搞定。

new Object() {
 
  
    @Subscribe
    public void lister(Integer integer) {
        System.out.printf("%d from int%n", integer);
    }
}

 

事件釋出

對於事件源,則可以通過post方法釋出事件。 正在這裡對於Guava對於事件的釋出,是依據上例中訂閱方法的方法引數型別決定的,換而言之就是post傳入的型別和其基類型別可以收到此事件。

//定義事件
final EventBus eventBus = new EventBus();
//註冊事件
eventBus.register(new Object() {
 
  
    //使用@Subscribe說明訂閱事件處理方法
    @Subscribe
    public void lister(Integer integer) {
        System.out.printf("%s from int%n", integer);
    }
 
  
    @Subscribe
    public void lister(Number integer) {
        System.out.printf("%s from Number%n", integer);
    }
 
  
    @Subscribe
    public void lister(Long integer) {
        System.out.printf("%s from long%n", integer);
    }
});
 
  
//釋出事件
eventBus.post(1);
eventBus.post(1L);

 

專案的而運用

 

主要處理事件:包含了申請通知,新增好友成功通知

關鍵類:

  • com.fyp.layim.common.event.bus.EventUtil:封裝了事件的監聽註冊,以及釋出動作
  • com.fyp.layim.common.event.bus.body.EventBody:釋出的內容封裝類,包含訊息型別和訊息內容欄位
  • com.fyp.layim.common.event.bus.handler.AbsEventHandler:事件處理抽象類,具體處理器需要繼承這個重寫handler()方法
  • com.fyp.layim.common.event.bus.handler.AddFriendEventHandler:新增好友成功通知處理類。
  • com.fyp.layim.common.event.bus.LayimEventType:訊息型別常量

呼叫:

  • com.fyp.layim.web.biz.UserController#handleFriendApply:好友同意好友請求之後釋出事件

邏輯:

事件的處理其實是給申請人發起好友同意通知

  • com.fyp.layim.im.common.util.PushUtil#pushAddFriendMessage
/**
 * 新增好友成功之後向對方推送訊息
 * */
public static void pushAddFriendMessage(long applyid){
    if(applyid==0){
        return;
    }
    Apply apply = applyService.getApply(applyid);
    ChannelContext channelContext = getChannelContext(""+apply.getUid());
    //先判斷是否線上,再去查詢資料庫,減少查詢次數
    if (channelContext != null && !channelContext.isClosed()) {
        LayimToClientAddFriendMsgBody body = new LayimToClientAddFriendMsgBody();
        User user = getUserService().getUser(apply.getToid());
        if (user==null){return;}