使用Masstransit開發基於訊息傳遞的分散式應用
Masstransit作為.Net平臺下的一款優秀的開源產品卻沒有得到應有的關注,這段時間有機會閱讀了Masstransit的原始碼,我覺得我有必要普及一下這個框架的使用。
值得一提的是Masstransit的原始碼寫的非常優秀,值得每個想提高自己程式設計能力的.Net選手閱讀,整個程式碼看起來賞心悅目。反之,每次開啟自己公司專案的時候心情都異常沉重。所以不是.Net不行,還是咱們水平不行。
學會了Masstransit你再也不用羨慕別人有Dubbo、Mule、Akka什麼的了,當然在某些方面他們的使用場景還是有一些區別。另外插播一條廣告:本人目前在西安求職中,如果那位同學有好的工作機會希望能夠幫忙推薦。
閱讀本篇文章的前提是你需要對訊息佇列有一些瞭解,特別是RabbitMq,Masstransit作為一款輕量級的ESB預設支援RabbitMq和MSMQ。本文的例子都使用RabbitMq來介紹,所以你最好能讀一下我之前寫的《如何優雅的使用RabbitMq》。
簡單來說,Masstransit提供了使用訊息佇列場景的一種抽象,也就是說,如果你有使用訊息佇列的需求,都可以通過Masstransit來完成,當然如果僅僅是拿訊息佇列來發個簡訊、郵件之類的並不能體現出Masstransit的優越性。當整個業務系統都通過Masstransit過來構建和互動的時候,才能真正體現ESB的價值所在。
我寫了5不同場景個Demo,方便大家學習和參考。我會重點講解Real World的案例,也就是如何在真實場景使用Masstransit。如果僅僅是把一些元件融入到了專案中並且能夠執行,並不能算是一個合格的架構師,一個合格的架構師一定是可以將某個元件以最佳實踐的方式融入到了自己的專案中,並且能夠為開發者提供清晰且合理的抽象,然後針對這一方案制定一些約定和規則,隨著專案的推進,整個專案的程式碼都能夠有章可循,始終在架構師的掌控之中。
一、傳送命令模型(Send Command Pattern)
這種模型最常見的就是CQRS中C,用來向DomainHandler傳送一個Command。另外系統的傳送郵件服務、傳送簡訊服務也可以通過這種模式來實現。這種模型跟郵遞員向郵箱投遞郵件有點相似。這一模型的特點是你需要知道對方終結點的地址,意味著你要明確要向哪個地址傳送訊息。從Masstransit提供的api就可以看出來:
var endPoint =await bus.GetSendEndpoint(sendToUri); var command = new GreetingCommandA() { Id = Guid.NewGuid(), DateTime = DateTime.Now }; await endPoint.Send(command);
這個Demo主要由2個工程組成,Client傳送訊息到Server,Server來響應這一訊息。
二、釋出/訂閱模型(publish/subscribe pattern)
之所以有基於訊息傳遞的分散式應用這種架構模式,很大程度上就是依靠這種模式來完成。一個典型的例子是子系統A釋出了一條訊息,子系統B和子系統C都可以訂閱這一訊息並非同步處理該訊息。而這一過程對子系統A來說是不關心的。從而減少不同的子系統之間的耦合,提高系統的可擴充套件性。
三、訊息的繼承層次
用過RabbitMQ的同學應該知道,RabbitMQ提供了3中型別的Exchange,分別為direct、fanout和topic。所有這一切都是為了提供一種路由訊息的機制。而這一切是通過匹配一種字串型別的routingKey來實現的,當然有了Masstransit你就不用這麼費勁了。C#作為一種強型別的語言,我們可以通過設計訊息的繼承層次來實現訊息的路由機制。比如我們可以設計下面的訊息繼承體系:
public interface IMessage { Guid Id { get; set; } } public class Message : IMessage { public Guid Id { get; set; } public string Type { get; set; } } public class UserUpdatedMessage : Message { public Guid Id { get; set; } }
有了這樣的繼承體系,我們可以定義下面的Consumer型別:
public class BaseInterfaceMessageConsumer:IConsumer<IMessage> { public async Task Consume(ConsumeContext<IMessage> context) { await Console.Out.WriteLineAsync($"consumer is BaseInterfaceMessageConsumer,message type is {context.Message.GetType()}"); } }
還可以定義下面的Consumer型別:
public class UserUpdatedMessageConsumer: IConsumer<UserUpdatedMessage> { public async Task Consume(ConsumeContext<UserUpdatedMessage> context) { await Console.Out.WriteLineAsync($"consumer is UserUpdatedMessageConsumer,message type is {context.Message.GetType()}"); } }
這樣就可以路由不同的訊息到相應的Consumer中了。
四、使用Topshelf來構建windows服務
我們最終要將consumer程式集打成windows服務來安裝在產品環境下,Topshelf為我們提供了一組DSL描述的api來建立window服務:
HostFactory.Run(x => { x.Service<GreetingServer>(s => { s.ConstructUsing(name => new GreetingServer()); s.WhenStarted(tc => tc.Start()); s.WhenStopped(tc => tc.Stop()); }); x.StartAutomatically(); x.RunAsLocalSystem(); x.SetDescription("A greeting service"); x.SetDisplayName("Greeting Service"); x.SetServiceName("GreetingService"); });
五、RPC呼叫(request/response pattern)
我們還可以通過Masstransit實現RPC呼叫:
var response = await client.Request(new SimpleRequest() {CustomerId = customerId}); Console.WriteLine("Customer Name: {0}", response.CusomerName);
這有點像是一個webservice呼叫,不過在ESB的設計中我們應該儘量避免這種設計,特別是在異構系統之間,應該儘量採用send command pattern和publish/subscriber pattern。
六、正式場景該如何使用Masstransit
在使用Masstranit的正式場景中,我們主要考慮以下幾個方面:
1、配置方式
定義一個抽象類,用來統一配置方式:
public abstract class BusConfiguration { public abstract string RabbitMqAddress { get; } public abstract string QueueName { get; } public abstract string RabbitMqUserName { get; } public abstract string RabbitMqPassword { get; } public abstract Action<IRabbitMqBusFactoryConfigurator,IRabbitMqHost> Configuration { get; } public virtual IBus CreateBus() { var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri(RabbitMqAddress), hst => { hst.Username(RabbitMqUserName); hst.Password(RabbitMqPassword); }); Configuration?.Invoke(cfg, host); }); return bus; } }
具體的專案會繼承該配置類做對應的配置:如UserManagementBusConfiguration、UserManagementServiceBusConfiguration等
2、能夠跟DI容器結合,本例以Castle Windsor Container為例:
在web專案中新增ServiceBusInstaller:
public class ServiceBusInstaller:IWindsorInstaller { public void Install(IWindsorContainer container, IConfigurationStore store) { container.Register( Component.For<IBus, IBusControl>() .Instance(UserManagementBusConfiguration.BusInstance) .LifestyleSingleton()); } }
然後我們就可以在controller中注入IBus了:
private readonly IUserProvider _userProvider; private readonly IBus _bus; public ValuesController(IUserProvider userProvider,IBus bus) { _userProvider = userProvider; _bus = bus; } [HttpGet] [Route("api/values/createuser")] public string CreateUser() { //save user in local db _bus.Publish(new UserCreatedEvent() {UserName = "Tom", Email = "[email protected]"}); return "create user named Tom"; }
同樣的道理,在consumer專案中也可以做同樣的配置,新增ConsumersInstaller:
public class ConsumersInstaller:IWindsorInstaller { public void Install(IWindsorContainer container, IConfigurationStore store) { container.Register( Classes.FromThisAssembly().BasedOn(typeof (IConsumer)).WithServiceBase().WithServiceSelf().LifestyleTransient()); } }
在Consumer中注入一個元件試試:
public class UserCreatedEventConsumer : IConsumer<UserCreatedEvent> { private readonly GreetingWriter _greetingWriter; public UserCreatedEventConsumer(GreetingWriter greetingWriter) { _greetingWriter = greetingWriter; } public async Task Consume(ConsumeContext<UserCreatedEvent> context) { _greetingWriter.SayHello(); await Console.Out.WriteLineAsync($"user name is {context.Message.UserName}"); await Console.Out.WriteLineAsync($"user email is {context.Message.Email}"); } }
把web專案和consumer服務都跑起來看看:
3、重試配置
cfg.UseRetry(Retry.Interval(3, TimeSpan.FromMinutes(1)));
訊息消費失敗後重試3次,每次間隔1分鐘
4、限速器
cfg.UseRateLimit(1000, TimeSpan.FromSeconds(1));
每分鐘訊息消費數限定在1000之內
5、熔斷器
cfg.UseCircuitBreaker(cb => { cb.TrackingPeriod = TimeSpan.FromMinutes(1); cb.TripThreshold = 15; cb.ActiveThreshold = 10; });
參照Martin Folwer對熔斷器模式的描述:CircuitBreaker
6、異常處理
public class UserUpdatedEventComsumer :IConsumer<UserUpdatedEvent> ,IConsumer<Fault<UserUpdatedEvent>> { public Task Consume(ConsumeContext<UserUpdatedEvent> context) { throw new System.NotImplementedException(); } public async Task Consume(ConsumeContext<Fault<UserUpdatedEvent>> context) { await Console.Out.WriteLineAsync($"catch exception: {context.Message.Message}"); } }
只要繼承於對應的Fault<TMessage>即可為對應的訊息編寫異常處理。
7、單元測試(待續)
8、訊息定時傳送(待續)
9、自定義中介軟體(待續)
10、自定義觀察者(待續)
11、長生命週期的消費者:Turnout(待續)
12、長生命週期的狀態機:saga(待續)
13、Routing slip pattern的實現:Courier(待續)
相關推薦
使用Masstransit開發基於訊息傳遞的分散式應用
Masstransit作為.Net平臺下的一款優秀的開源產品卻沒有得到應有的關注,這段時間有機會閱讀了Masstransit的原始碼,我覺得我有必要普及一下這個框架的使用。 值得一提的是Masstransit的原始碼寫的非常優秀,值得每個想提高自己程式設計能力的.Net選手閱讀,整個程式碼看起來賞心悅目。反之
《Flask Web開發——基於Python的Web應用開發實踐》一字一句上機實踐(下)
屬性 一個用戶 臨時 target 說明 實戰 分享圖片 ace 庫文件 目錄 前言 第8章 用戶認證 第9章 用戶角色 第10章 用戶資料 第11章 博客文章 第12章 關註者 第13章 用戶評論 第14章 應用編程接口 前言
(Flask Web開發:基於Python的Web應用開發實戰)------學習筆記(第2章)
第2章 程式的基本結構 本章將帶你瞭解 Flask 程式各部分的作用,編寫並執行第一個 Flask Web 程式。 2.1 初始化 所有 Flask 程式都必須建立一個程式例項,程式例項是 Flask 類的物件。 Web 伺服器使用一種名為 Web 伺服器閘
《FlaskWeb開發基於Python的Web應用開發實戰第2版》中英PDF+源代碼等4本書學習
開發實戰 leo 能夠 ffffff 第二版 http 資源 web開發 vpd 資源鏈接:https://pan.baidu.com/s/1p7CyLEodCy3e1u93jTVQLg《Flask Web開發 基於Python的Web應用開發實戰第2版》中英PDF+源代碼
推薦《FlaskWeb開發:基於Python的Web應用開發實戰》附下載連結
本書不僅適合初級Web開發人員學習閱讀,更是Python程式設計師用來學習高階Web開發技術的優秀參考書。 • 學習Flask應用的基本結構,編寫示例應用; • 使用必備的元件,包括模板、資料庫、Web表單和電子郵件支援; • 使用包和模組構建可伸縮的大型應用; •
用PHP開發基於MongoDB的php應用
一、連線資料庫主機 連線本地主機,埠為27017: $connection = new Mongo(); 連線遠端主機,埠為預設埠: $connection= new Mongo( "192.168.2.1" ); 連線遠端主機,埠為指定埠: $connection
《Flask Web開發 基於Python的Web應用開發實戰(第2版)》中文PDF+原始碼
下載: https://pan.baidu.com/s/1qz3Jpi4XuKQsSZJK0oMXBA 《Flask Web開發:基於Python的Web應用開發實戰》第2版 下載:https://pan.baidu.com/s/19APvGHguDOhognthTSw9JQ《Pyt
基於訊息的分散式事務簡單方案
System-A為主系統 流程描述 1. system-A執行本地事務, 傳送msg到redis,此時msg狀態為unknown(這裡全是unknown狀態的msg,訊息需要持久化,只有msg存在,即使system-b處理失敗也可以有其他方式處理,一般是人工) 2.1
基於訊息的分散式架構設計
分散式設計與開發中有些疑難問題必須藉助一些演算法才能解決,比如分散式環境一致性問題,感覺以下分散式演算法是必須瞭解的(隨著學習深入有待新增): Paxos演算法一致性Hash演算法Paxos演算法 1)問題描述 分散式中有這麼一個疑難問題,客戶端向一個分散式叢集的服務端發出一系列更新資料的訊息,由於分散式叢
Paxos演算法-基於訊息傳遞的一致性演算法
1.來源 Paxos演算法是萊斯利·蘭伯特(Leslie Lamport)於1990年提出的一種基於訊息傳遞的一致性演算法。 1.1.故事 在古希臘,有一個叫做Paxos的小島,島上通過議會的形式來通過法令,議會中議員通過信使來傳遞訊息。議員和信使都是兼職的,他們隨時有可能離開會議廳,並
基於 Redis 實現分散式應用限流
限流的目的是通過對併發訪問/請求進行限速或者一個時間視窗內的的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務 實際場景中常用的限流策略: Nginx接入層限流 按照一定的規則如帳號、
快速開發基於 HTML5 網路拓撲圖應用--入門篇(一)
計算機網路的拓撲結構是引用拓撲學中研究與大小,形狀無關的點、線關係的方法。把網路中的計算機和通訊裝置抽象為一個點,把傳輸介質抽象為一條線,由點和線組成的幾何圖形就是計算機網路的拓撲結構。網路的拓撲結構反映出網中各實體的結構關係,是建設計算機網路的第一步,是實現各種網路協議的基礎,它對網路的效能,系統的可靠性
案例分析:基於訊息的分散式架構
美國電腦科學家,LaTex的作者Leslie Lamport說:“分散式系統就是這樣一個系統,系統中一個你甚至都不知道的計算機出了故障,卻可能導致你自己的計算機不可用。”一語道破了開發分散式系統的玄機,那就是它的複雜與不可控。所以Martin Fowler強調:分散式呼
基於.Net Framework的N層分散式應用開發(轉載)
.Net Framework推出的許多新技術為上述任務的實現提供了相對簡單的解決方案。其中,基於SOAP的Web Service在處理分散式應用時具有比傳統的DCOM/CORBA明顯的優點,結合基於Web的ASP.NET頁面開發技術和SQL Server資料儲存技術(或Xml
電子書 flaskweb開發:基於Python的Web應用開發實戰.pdf
商業 機器 免費 影評 而且 視頻軟件 python程序 規範 初級 作為PythonWeb開發的微框架,Flask獨樹一幟。它不會強迫開發者遵循預置的開發規範,為開發者提供了自由度和創意空間。 《圖靈程序設計叢書·Flask Web開發:基於Python的Web應用開
基於python的web應用開發-添加關註者
templates maps classes else 解決方法 必須 簡化 head html 社交web允許用戶之間相互聯系。 例如: 關註者、好友、聯系人、聯絡人或夥伴。 記錄兩個用戶之間的定向聯系,在數據庫查詢中也要使用這種聯系。 一、論數據庫關系 一對多關系
前端入門:快速開發基於 HTML5 網絡拓撲圖應用
簡寫 我們 圖片路徑 urn setattr bsp left return 管理 計算機網絡的拓撲結構是引用拓撲學中研究與大小,形狀無關的點、線關系的方法。把網絡中的計算機和通信設備抽象為一個點,把傳輸介質抽象為一條線,由點和線組成的幾何圖形就是計算機網絡的拓撲結構。網絡
FlaskWeb開發:基於Python的Web應用開發實戰pdf
數據庫查詢 各類 啟動服務 管理 jin 軟件 請求 服務 inter 下載地址:網盤下載 內容簡介 · · · · · ·本書不僅適合初級Web開發人員學習閱讀,更是Python程序員用來學習高級Web開發技術的優秀參考書。? 學習Flask應用的基本結構,編寫示例應
基於Spring Boot構建應用開發規範
SpringBoot 項目規範 1.規範的意義和作用 編碼規範可以最大限度的提高團隊開發的合作效率 編碼規範可以盡可能的減少一個軟件的維護成本 , 並且幾乎沒有任何一個軟件,在其整個生命周期中,均由最初的開發人員來維護 編碼規範可以改善軟件的可讀性,可以讓開發人員盡快而徹底地理解新的代碼 規範性編碼
《Flask Web開發:基於Python的Web應用開發實戰》pdf 免費下載
需求 png 入行 14. 導入 框架 錯誤 pla 引用 《Flask Web開發:基於Python的Web應用開發實戰》pdf 免費下載鏈接: https://u253469.ctfile.com/fs/253469-292665036 第一部分 Flask