1. 程式人生 > >微服務實戰(三):落地微服務架構到直銷系統(構建基於RabbitMq的訊息匯流排)

微服務實戰(三):落地微服務架構到直銷系統(構建基於RabbitMq的訊息匯流排)

從前面文章可以看出,訊息匯流排是EDA(事件驅動架構)與微服務架構的核心部件,沒有訊息匯流排,就無法很好的實現微服務之間的解耦與通訊。通常我們可以利用現有成熟的訊息代理產品或雲平臺提供的訊息服務來構建自己的訊息匯流排;也可以自己完全寫一個訊息代理產品,然後基於它構建自己的訊息匯流排。通常我們不用重複造輪子(除非公司有特殊的要求,比如一些大型網際網路公司考慮到自主可控的白盒子),可以利用比如像RabbitMq這樣成熟的訊息代理產品作為訊息匯流排的底層支援。

RabbitMq核心元件解釋:

Connection:訊息的傳送方或訂閱方通過它連線到RabbitMq伺服器。

Channel:訊息的傳送方或訂閱方通過Connection連線到RabbitMq伺服器後,通過Channel建立會話通道。

Exchange:訊息的傳送方向Exchange傳送訊息,通過RabbitMq伺服器中Exchange與Queue的繫結關係,Exchange會將訊息路由到匹配的Queue中。

Queue:訊息的承載者,訊息的傳送者的訊息最終通過Exchange路由到匹配的Queue,訊息的接收者從Queue接收訊息並進行處理。

Exchange模式:在訊息傳送到Exchange時,需要路由到匹配的Queue中,至於如何路由,則是由Exchange模式決定的。

1.Direct模式:特定的路由鍵(訊息型別)轉發到該Exchange的指定Queue中。

2.Fanout模式:傳送到該Exchange的訊息,被同時傳送到Exchange下繫結的所有Queue中。

3.Topic模式:具有某種特徵的訊息轉發到該Exchange的指定Queue中。

我們最常見的使用是Direct模式,如果訊息要被多個消費者消費,則可以使用Fanout模式。

實現基於RabbitMq的訊息匯流排:
我們首先需要安裝Erlang與RabbitMq到伺服器上,然後就可以進行基於RabbitMq的訊息匯流排的開發了,開發的總體思路與步驟如下:

1.首先建立一個專案作為訊息匯流排,然後引入Rabbitmq.Client 這個nuget包,這樣就有了RabbitMq開發的支援。

2.前面實現了基本的訊息匯流排,所有基於RabbitMq的訊息匯流排是從它繼承下來的,並需要傳入特定的引數到訊息匯流排的建構函式中:

 public RabbitMqEB(IConnectionFactory connectionFactory,IEventHandlerExecutionContext context,
            string exchangeName,string exchangeType,string queueName,int publisherorconsumer,
            bool autoAck = true) : base(context)
        {
            this.connectionFactory = connectionFactory;
            this.connection = this.connectionFactory.CreateConnection();
            this.exchangeName = exchangeName;
            this.exchangeType = exchangeType;
            this.autoAck = autoAck;
            this.queueName = queueName;
            if (publisherorconsumer == 2)
            {
                this.channel = CreateComsumerChannel();
            }
        }

connectionFactory:RabbitMq.Client中的型別,用於與RabbitMq伺服器建立連線時需要使用的物件。

context:訊息與訊息處理器之間的關聯關係的物件。

exchangeName:生產者或消費者需要連線到的Exchange的名字。

exchangeType:前面所描述的Exchange模式。

queueName:生產者或消費者傳送或接收訊息時的Queue的名字。

publisherorconsumer:指定連線到訊息匯流排的元件是訊息匯流排的生產者還是消費者,消費者和生產者會有不同,消費者(publisherorconsumer==2)會構建一個消費通道,用於從Queue接收訊息並呼叫父類的ieventHandlerExecutionContext的HandleAsync方法來處理訊息。

3.建立到RabbitMq的連線: 

//判斷是否已經建立了連線
public bool IsConnected
        {
            get { return this.connection != null && this.connection.IsOpen; }
        }
public bool TryConnect() { //出現連線異常時的重試策略,通常通過第三方nuget包實現重試功能,這裡出現連線異常時,每個1秒重試一次,共重試5次 var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>() .WaitAndRetry(5, p => TimeSpan.FromSeconds(1),(ex,time)=> { //記錄錯誤日誌 }); policy.Execute(() => { //建立RabbitMq Server的連線 this.connection = this.connectionFactory.CreateConnection(); }); if (IsConnected) { return true; } return false; }

 4.建立消費者通道:

private IModel CreateComsumerChannel()
        {
            if (!IsConnected)
            {
                TryConnect();
            }
            var channel = this.connection.CreateModel();            
            channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
            channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false,
                arguments: null);
            var consumer = new EventingBasicConsumer(channel);
          //消費者接收到訊息的處理
            consumer.Received += async (model, ea) =>
                {
                    var eventbody = ea.Body;
                    var json = Encoding.UTF8.GetString(eventbody);
                    var @event = (IEvent)JsonConvert.DeserializeObject(json);
                   //呼叫關聯物件中訊息對應的處理器的處理方法
                    await this.eventHandlerExecutionContext.HandleAsync(@event);
                   //向會話通道確認此訊息已被處理
                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                };
            channel.BasicConsume(queue: this.queueName, autoAck: false, consumer: consumer);
            
            channel.CallbackException += (sender, ea) =>
            {
                this.channel.Dispose();
                this.channel = CreateComsumerChannel();
             };
            return channel;
        }

5.對生產者釋出訊息到交換機佇列的支援:

 public override void Publish<TEvent>(TEvent @event)
        {
            if (!IsConnected)
            {
                TryConnect();
            }
            using(var channel = this.connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: exchangeName, type: exchangeType, durable: true);
                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);
              //釋出到交換機,根據交換機與佇列的繫結以及交換機模式,最終釋出到指定的佇列中
                channel.BasicPublish(this.exchangeName, @event.GetType().FullName,null, body);
            }
        }

6.對訂閱者從交換機佇列中訂閱訊息的支援:

 public override void Subscribe<TEvent, TEventHandler>()
        {
           //註冊接收到的訊息型別到訂閱方的處理器之間的關係
            if (!this.eventHandlerExecutionContext.IsRegisterEventHandler < TEvent,TEventHandler>()){
                this.eventHandlerExecutionContext.RegisterEventHandler<TEvent, TEventHandler>();
              //消費者進行佇列繫結
                this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName);
            }
        }

從上面的6個步驟,我們基本上就完成了基於RabbitMq訊息匯流排的基本功能,這裡需要說明的是,上述程式碼只是演示,在實際生產環境中,不能直接使用以上程式碼,還需要小心的重構此程式碼以保證可靠性與效能。

QQ討論群:309287205 

微服務實戰視訊請關注微信公眾號:

相關推薦

微服務實()落地服務架構直銷系統(構建基於RabbitMq訊息匯流排)

從前面文章可以看出,訊息匯流排是EDA(事件驅動架構)與微服務架構的核心部件,沒有訊息匯流排,就無法很好的實現微服務之間的解耦與通訊。通常我們可以利用現有成熟的訊息代理產品或雲平臺提供的訊息服務來構建自己的訊息匯流排;也可以自己完全寫一個訊息代理產品,然後基於它構建自己的訊息匯流排。通常我們不用重複造輪子(除

微服務實(九)落地服務架構直銷系統(回顧總結)

這個系列我們大概寫了八篇文章,將微服務的最重要的內容過了一遍。當然其中有些內容還沒有涉及到,比如Docker(不是微服務架構風格中必須的)等,關於Docker我們自己可以在網上找找其他文章。 這篇文章就來回顧下微服務架構風格是如何落地的,如果你對以下回顧的內容都很清楚並已經有一些實踐的經驗,那麼恭喜你,你已

微服務實(七)落地服務架構直銷系統(實現命令與命令處理器)

我們先來看看CQRS架構,你對下圖的架構還有印象嗎?每個元件的功能都還清楚嗎?如果有疑問,請查考文章《微服務實戰(五):落地微服務架構到直銷系統(構建高效能大併發系統)》。   前一篇文章已經實現了Event Store的基礎功能部分,本篇文章我們通過C端的標準方式,實現一個下單的高併發命令端,來看看需要實現

微服務實(八)落地服務架構直銷系統(服務高可用性)

在微服務架構風格的系統中,如果單個微服務垮掉或地址不可訪問,雖然對系統的影響是有限的,但我們也必須採取一定的手段來保證每個微服務儘量可用;並且在大併發的情況下,雖然可以通過EDA訊息佇列處理的方式提高吞吐量,但仍然需要WebApi能夠更加高效的偵聽使用者請求,處理訊息,即使在某個服務短暫不可用的情況下。本篇文

微服務實(二)落地服務架構直銷系統(構建訊息匯流排框架介面)

從上一篇文章大家可以看出,實現一個自己的訊息匯流排框架是非常重要的內容,訊息匯流排可以將界限上下文之間進行解耦,也可以為大併發訪問提供必要的支援。 訊息匯流排的作用: 1.界限上下文解耦:在DDD第一波文章中,當更新了訂單資訊後,我們通過呼叫經銷商界限上下文的領域模型和倉儲,進行了經銷商資訊的更新,這造成了

微服務實(一)落地服務架構直銷系統(什麼是服務)

網上有很多關於微服務的文章,從不同的維度對微服務進行了相關的講述;有些高屋建瓴,有些涉及細節,有些側重理論,有些側重程式碼,都是非常不錯的瞭解微服務的文章。 我們這個系列的文章的維度主要是實戰落地,也就是我們在平常工作以及產品開發過程中,考慮為什麼選擇微服務架構風格,以及如何將微服務的架構風格落地到我們實際的

微服務實(六)落地服務架構直銷系統(事件儲存)

在CQRS架構中,一個比較重要的內容就是當命令處理器從命令佇列中接收到相關的命令資料後,通過呼叫領域物件邏輯,然後將當前事件的物件資料持久化到事件儲存中。主要的用途是能夠快速持久化物件此次的狀態,另外也可以通過未來最終一致性的需求,通過事件資料將物件還原到一個特定的狀態,這個狀態通常是通過物件事件的版本來進行

微服務實(五)落地服務架構直銷系統(構建高效能大併發系統)

在現代系統中,特別是網際網路軟體,通常會涉及到大量使用者的併發訪問,我們的系統一定要在架構上支援高效能、大併發的訪問。一個高效能的系統通常由很多的方面組成,包括資料庫高效能、Web伺服器高效能、負載均衡、快取、軟體架構等。我們這篇文章先從軟體開發架構的角度作為切入點來介紹如何構建高效能的系統。 傳統架構效能

微服務實(四)落地服務架構直銷系統(將生產者與消費者接入訊息匯流排)

前一篇文章我們已經完成了基於RabbitMq實現的的訊息匯流排,這篇文章就來看看生產者(訂單微服務)與消費者(經銷商微服務)如何接入訊息匯流排實現訊息的傳送與訊息的接收處理。 定義需要傳送的訊息: 下單訊息要被髮送到訊息匯流排,並被經銷商微服務的處理器處理。經銷商微服務處理時,需要知道要對哪個經銷商處理多少

《Spring Cloud微服務實》讀書筆記之服務治理Spring Cloud Eureka

摘要 服務治理是微服務架構最為核心和基礎的模組,用於實現各個微服務例項的自動化註冊與發現。Spring Cloud Eureka 是對Netflix Eureka的二次封裝,負責服務的治理。 關鍵詞:服務治理 一、服務治理介紹 服務治理是微服務架構最為核心和基礎

)springcloud - 服務架構代碼結構

article 搭建 ring 分享 組件 particle ima api 微服務雲架構 我們根據微服務化設計思想,結合spring cloud本身的服務發現、治理、配置化管理、分布式等項目優秀解決方案,我們使用Maven技術將框架進行模塊化、服務化、原子化封裝,也為後期

多研究些架構,少談些框架(1)服務架構的核心概念

定位 dubbo spring 提供服務 電信 cor res gate 虛擬 微服務架構和SOA區別 微服務現在辣麽火,業界流行的對比的卻都是所謂的Monolithic單體應用,而大量的系統在十幾年前都是已經是分布式系統了,那麽微服務作為新的理念和原來的分布式系統,或者說

圖靈學院服務架構】SpringCloud之Ribbon(四)

SpringCloud Ribbon​    一:Ribbon是什麼? Ribbon是Netflix釋出的開源專案,主要功能是提供客戶端的軟體負載均衡演算法,將Netflix的中間層服務連線在一起。Ribbon客戶端元件提供一系列完善的配置項如連線超時,重試等。簡單的說,就是

圖靈學院服務架構】SpringCloud之Eureka(服務註冊和服務發現基礎篇)(二)

一:Eureka簡介   Eureka是Spring Cloud Netflix的一個子模組,也是核心模組之一。用於雲端服務發現,一個基於REST的服務,用於定位服務,以實現雲端中間層服務發現和故障轉移。服務註冊與發現對於微服務系統來說非常重要。有了服務發現與註冊,你就不需要

.NET Core 服務架構 Steeltoe 使用(基於 Spring Cloud)

閱讀目錄: 1. Spring Cloud Eureka 註冊服務及呼叫 2. Spring Cloud Hystrix 斷路器 3. Spring Cloud Hystrix 指標監控 4. Spring Cloud Config 配置中心 現在主流的開發平臺是微服務架構,在眾多的微服務開源專案中,Sp

基於SpringCloud服務架構廣告系統設計與實現》筆記

img span 設計與實現 微服務 課程 png 1-1 分享圖片 bubuko 1-1 課程導學 什麽是廣告系統? 2-1 廣告系統概覽 2-2 廣告系統架構 2-3 準備工作與系統目錄結構 《基於SpringCl

微服務實(六)選擇服務部署策略

因此 區別 嚴重 http 虛擬化 one rose 精確 命名空間 微服務實戰(一):微服務架構的優勢與不足 微服務實戰(二):使用API Gateway 微服務實戰(三):深入微服務架構的進程間通信 微服務實戰(四):服務發現的可行方案以及實踐案例 微服務實踐(五)

Spring cloud微服務實)——基於OAUTH2.0統一認證授權的服務基礎架構升級

前言 從2018年年初寫的一篇主題為 Spring cloud微服務實戰——基於OAUTH2.0統一認證授權的微服務基礎架構的文章後就很少更新了。自從小寶貝誕生和公司業務的繁忙,年初計劃每週更新一篇博文的計劃已經落空了。年底了,終於清閒了些。 升級 Spring cloud微

微服務實(六)選擇服務部署策略 - DockOne.io

利用 -- 的區別 box imp 通信 標準 應用打包 email 原文:微服務實戰(六):選擇微服務部署策略 - DockOne.io 【編者的話】這篇博客是用微服務建應用的第六篇,第一篇介紹了微服務架構

微服務實深入微服務架構的進程間通信 - DockOne.io

ram 可靠的 eat repr 都在 kit 瀏覽器 兼容 信息 原文:微服務實戰(三):深入微服務架構的進程間通信 - DockOne.io 【