基於RabbitMQ的Rpc框架
參考文件:https://www.cnblogs.com/ericli-ericli/p/5917018.html
MQ的使用場景大概包括解耦,提高峰值處理能力,送達和排序保證,緩衝等。
MQ概述
訊息佇列技術是分散式應用間交換資訊的一種技術。
訊息佇列可駐留在記憶體或磁碟上,佇列儲存訊息直到它們被應用程式讀走。
通過訊息佇列,應用程式可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程式接收此訊息。
MQ主要作用是接收和轉發訊息。你可以想想在生活中的一種場景:當你把信件的投進郵筒,郵遞員肯定最終會將信件送給收件人。我們可以把MQ比作 郵局和郵遞員。
MQ和郵局的主要區別是,它不處理訊息,但是,它會接受資料、儲存訊息資料、轉發訊息。
RabbitMQ術語
生產者:
訊息傳送者,在MQ中被稱為生產者(producer),一個傳送訊息的應用也被叫做生產者,用P表示
消費者:
生產者“生產”出訊息後,最終由誰消費呢?等待接受訊息的應用程式,我們稱之為消費者(Consuming ),用C表示
佇列:
訊息只能儲存在佇列(queue )中。儘管訊息在rabbitMQ和應用程式間流通,但是佇列卻是存在於RabbitMQ內部。
一個佇列不受任何限制,它可以儲存你想要儲存的訊息量,它本質上是一個無限的緩衝區。
多個生產者可以向同一個佇列傳送訊息,多個消費者可以嘗試從同一個訊息佇列中接收資料。
一個佇列像下面這樣(上面是它的佇列名稱)
注意:
生產者、消費者、中介軟體不必在一臺機器上,實際應用中也是絕大多數不在一起的。我們可以用一張圖表示RabbitMQ的構造:
注:此圖片摘自於百度百科RabbitMQ。
RabbitMQ 實現RPC
(RPC) Remote Procedure Call Protocol 遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之間攜帶資訊資料。
在一個大型的公司,系統由大大小小的服務構成,不同的團隊維護不同的程式碼,部署在不同的機器。但是在做開發時候往往要用到其它團隊的方法,因為已經有了實現。但是這些服務部署不同的機器上,想要呼叫就需要網路通訊,這些程式碼繁瑣且複雜,一不小心就會寫的很低效。RPC協議定義了規劃,其它的公司都給出了不同的實現。比如微軟的wcf,以及現在火熱的WebApi。
在RabbitMQ中RPC的實現也是很簡單高效的,現在我們的客戶端、服務端都是訊息釋出者與訊息接收者。
首先客戶端通過RPC向服務端發出請求
我這裡有一堆東西需要你給我處理一下,correlation_id:這是我的請求標識,erply_to:你處理完過後把結果返回到這個佇列中。
服務端拿到了請求,開始處理並返回
correlation_id:這是你的請求標識 ,原封不動的給你。 這時候客戶端用自己的correlation_id與服務端返回的id進行對比。是我的,就接收。
在我們釋出訊息的時候,會呼叫channel物件的BasicPublish
方法,這個方法中有一個IBasicProperties
的引數basicProperties
。
在這物件中,有一個ReplyTo
屬性,我們可以將生產者監聽的訊息佇列名稱存放在裡面。當消費者程式接收到這條訊息的時候,就可以在Receive事件的ea物件中獲取ReplyTo屬性的值
var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes);
那麼當訊息生產者接收到訊息消費者任務完成的訊息之後,該如何確定完的是哪一個任務呢?
在現實情況,訊息生產者通常會發出多個任務,多個訊息消費者分別進行不同的任務,這時候我們就需要知道是哪個訊息消費者完成了任務。
當訊息生產者呼叫channel物件的BasicPublish
方法傳送訊息時,IBasicProperties
物件除了可以幫助我們傳遞訊息生產者監聽的訊息佇列名,還可以幫我們傳遞一個CorrelationId
(相關Id),當傳送任務訊息的時候,我們給每個任務訊息定義一個唯一的相關Id, 並存儲在IBasicProperties
物件的CorrelationId
屬性中。
var properties = channel.CreateBasicProperties(); properties.ReplyTo = replyQueueName; properties.CorrelationId = Guid.NewGuid().ToString();
這樣訊息消費者在接收到任務訊息時,可以從Receive的ea引數中獲取CorrelationId
。當任務完成時,再將儲存有這個CorrelationId
的任務完成訊息傳送到訊息生產者關注的訊息佇列中, 訊息生產者就可以知道是哪個任務完成了
一些繁瑣的細節rabbitmq已經為我們封裝了,簡單的SimpleRpcServer與SimpleRpcClient讓Rpc實現的更為方便。
開發指南:RabbitMQ .NET/C# Client API Guide
API文件:https://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.2.2/rabbitmq-dotnet-client-3.2.2-client-htmldoc/html/index.html
Client
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { SimpleRpcClient client = new SimpleRpcClient(channel, new PublicationAddress(exchangeType: ExchangeType.Direct, exchangeName: string.Empty, routingKey: "RpcQueue")); var prop = channel.CreateBasicProperties(); prop.CorrelationId = Guid.NewGuid().ToString(); IBasicProperties outProp; var msg = client.Call(prop, Encoding.UTF8.GetBytes(args[0]), out outProp); if (prop.CorrelationId == outProp.CorrelationId) { Console.WriteLine($"Task {prop.CorrelationId} completed."); Console.WriteLine(Encoding.UTF8.GetString(msg)); } } } }
Server
- 建立MySimpleRpcServer類,繼承自SimpleRpcServer類
- HandleSimpleCall方法裡添加回調返回值
- ProcessRequest方法為任務處理方法
- 使用server.MainLoop() 啟動服務
public class MySimpleRpcServer : SimpleRpcServer { public MySimpleRpcServer(Subscription subscription) : base(subscription) { } public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; return Encoding.UTF8.GetBytes($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} Task {requestProperties.CorrelationId} Completed."); } /// <summary> /// 進行處理 /// </summary> /// <param name="evt"></param> public override void ProcessRequest(BasicDeliverEventArgs evt) { Console.WriteLine("[x] Received {0}", Encoding.UTF8.GetString(evt.Body)); Thread.Sleep(4000); base.ProcessRequest(evt); } }
Program.cs
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("RpcQueue", true, false, false, null); SimpleRpcServer rpc = new MySimpleRpcServer(new Subscription(channel, "RpcQueue")); rpc.MainLoop(); Console.ReadKey(); } } }
參考文件:RabbitMQ學習筆記(六) RPC 含手工實現RabbitMQ的RPC、使用SimpleRpcClient類和SimpleRpcServer類實現RPC的簡單示例。
簡易RPC框架-心跳與重連機制
參考文件:簡易RPC框架-心跳與重連機制
心跳
就是告訴其它人自己還活著。在簡易RPC框架中,採用的是TCP長連線,為了確保長連線有效,就需要客戶端與服務端之間有一種通知機制告知對方的存活狀態。
如何實現
客戶端傳送心跳訊息
在狀態空閒的時候定時給服務端傳送訊息型別為PING訊息。
服務端接收心跳訊息
捕獲通道空閒狀態事件,如果接收客戶端PING訊息,則傳送PONG訊息給服務端。如果在一定時間內沒有收到客戶端的PING訊息,則說明客戶端已經不線上,此時關閉通道。
客戶端管理可用連線
由於服務端會因為長時間接收不到服務端的PING訊息而關閉通道,這就導致快取在客戶端的連線的可用性發生變化。需要將不可用的從可用列表中轉移出去,並對不可用連線進行處理,比如直接丟棄或者是重新連線。
預備知識
ChannelPipeline與handle的關係。netty中的這些handle和spring mvc中的filter作用是類似的,ChannelPipeline可以理解成handle的容器,裡面可以被註冊眾多處理不同業務功能的事件處理器,比如:
- 編碼
- 解碼
- 心跳
- 許可權
- 加密
- 解密
- 業務程式碼執行
- ......
具體實現
空閒狀態處理器
可以利用netty提供的IdleStateHandler來發送PING-PONG訊息。這個處理器主要是捕獲通道超時事件,主要有三類
- 讀超時,一定時間內沒有從通道內讀取到任何資料
- 寫超時,一定時間內沒有從通道內寫入任何資料
- 讀寫超時,一定時間內沒有從通道內讀取或者是寫入任何資料
客戶端加入空閒狀態處理器
客戶端捕獲讀寫超時,如果事件觸發就給服務端傳送PING訊息。
服務端加入空閒狀態處理器
服務端只需要捕獲讀超時即可,當讀超時觸發後就關閉通道。
為什麼在空閒狀態才傳送心跳訊息
在正常客戶端與服務端有互動的情況下,說明雙方都在正常工作不需要額外的心跳來告知對方的存活。只有雙方在一定時間內沒有接收到對方的訊息時才開始採用心跳訊息來探測對方的存活,這也是一種提升效率的做法。
抽象心跳處理器
建立AbstractHeartbeatHandler,並繼承ChannelInboundHandlerAdapter,服務於客戶端與服務端的心跳處理器。在讀取方法中判斷訊息型別:
- 如果是PING訊息就傳送PONG訊息給客戶端
- 如果收到的是PONG訊息,則直接列印訊息說明客戶端已經成功接收到服務端返回的PONG訊息
- 如果是其它型別的訊息,則通知下一個處理器處理訊息
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { if(!(msg instanceof RpcMessage)){ channelHandlerContext.fireChannelRead(msg); return; } RpcMessage message=(RpcMessage)msg; if(null==message||null==message.getMessageHeader()){ channelHandlerContext.fireChannelRead(msg); return; } if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){ logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody()); } else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){ this.sendPong(channelHandlerContext); } else { channelHandlerContext.fireChannelRead(msg); } }
空閒狀態事件,可以根據不同的狀態做不同的行為處理,定義三個可重寫事件供客戶端與服務端處理器具體確認處理事件。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: this.handleReaderIdle(ctx); break; case WRITER_IDLE: this.handleWriterIdle(ctx); break; case ALL_IDLE: this.handleAllIdle(ctx); break; default: break; } } }
客戶端心跳處理器
繼承抽象心跳處理器,並重寫事件傳送PING訊息。
public class ClientHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleAllIdle(ChannelHandlerContext ctx) { this.sendPing(ctx); } }
服務端心跳處理器
繼承抽象心跳處理器,並重寫事件關閉通道。
public class ServerHeartbeatHandler extends AbstractHeartbeatHandler { @Override protected void handleReaderIdle(ChannelHandlerContext ctx) { logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel"); ctx.close(); } }
客戶端ChannelPipeline中加入心跳處理器
比如5秒內未寫入或者讀取通道資料就觸發超時事件。
.addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));
服務端ChannelPipeline中加入心跳處理器
比如10秒未接收到通道訊息就觸發讀超時事件。
.addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))
客戶端訊息示例
正常情況下心跳訊息顯示如下圖所示,訊息的內容可以根據自己的情況自行定義。
客戶端下線訊息示例
停止客戶端程式,然後服務端讀超時事件觸發,並關閉通道。
客戶端可用連線管理
由於上述的服務端心跳處理器,在觸發讀超時後會關閉通訊管道,這導致客戶端快取的連線狀態會出現不可用的情況,為了讓客戶端一直只能取到可用連線就需要對從快取中獲取到的連線做狀態判斷,如果可用直接返回,如果不可用則將連線從可用列表中刪除然後取下一個可用連線。
修改獲取連線方法
通過channel的isActive屬性可以判斷連線是否可用,如果不可以做刪除並重新獲取的操作。
public RpcClientInvoker getInvoker() { // ... int index = loadbalanceService.index(size); RpcClientInvoker invoker= RpcClientInvokerCache.get(index); if(invoker.getChannel().isActive()) { return invoker; } else { RpcClientInvokerCache.removeHandler(invoker); logger.info("invoker is not active,so remove it and get next one"); return this.getInvoker(); } }
後臺啟動任務處理不可用連線
啟動一個每隔5秒執行一次任務的執行緒,定時取出不可用連線,然後重連,並將不可用連線刪除。
這裡我處理的重連是直接丟棄原有不可用連線,然後重新建立新連線。
private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class); static { executorService.schedule(new Runnable() { @Override public void run() { while (true) { List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers(); if (!CollectionUtils.isEmpty(notConnectedHandlers)) { for (RpcClientInvoker invoker : notConnectedHandlers) { RpcClientInvokerManager.getInstance(referenceConfig).connect(); } RpcClientInvokerCache.clearNotConnectedHandler(); } } } }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS); }
本文原始碼
https://github.com/jiangmin168168/jim-framework
其他資料:
Docker安裝RabbitMQ,RabbitMQ Management使用