1. 程式人生 > 其它 >MassTransit中Request&Response基本使用

MassTransit中Request&Response基本使用

MassTransit是一個自由、開源、輕量級的訊息匯流排基於.Net框架,用於建立分散式應用程式。方便搭建基於訊息的鬆耦合非同步通訊的應用程式和服務。MassTransit在現有訊息傳輸上提供了一組廣泛的功能,從而使開發人員能夠友好地使用基於訊息的會話模式非同步連線服務。基於訊息的通訊是實現面向服務的體系結構的可靠和可擴充套件的方式。

官網地址:http://masstransit-project.com/

釋出訂閱模式

這種場景十分常見,傳送一個訊息(或事件)到訊息佇列中,有一個或是多個訂閱方對預期的訊息接收處理。

基於需要搭建了兩個WebApi程式,用於模擬傳送方和訂閱方,其中的RabbitMQ已預先搭建好了,只在程式中引用包配置下即可。

<PackageReference Include="MassTransit" Version="7.2.0" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />

釋出端配置

在Startup中增加MassTransit需要的服務及初始化配置。

  • 對RabbitMQ的連線地址埠、虛擬主機、訪問賬號密碼等系列配置。
  • 對傳送方需要傳送的訊息初始化一個請求客戶端,配置請求資訊及推送到MQ的地址。
services.AddMassTransit(x =>
{
  x.UsingRabbitMq((context, cfg) =>
  {
    cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
    {
      h.Username(Configuration["RabbitmqConfig:Username"]);
      h.Password(Configuration["RabbitmqConfig:Password"]);
    });
  });
  x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
});
services.AddMassTransitHostedService();

為了快速瞭解,使用Controller在Action中發起對MQ的訊息推送

[ApiController]
[Route("[controller]")]
public class ValueController : ControllerBase
{
  readonly IPublishEndpoint _publishEndpoint;
  public ValueController(IPublishEndpoint publishEndpoint)
  {
    _publishEndpoint = publishEndpoint;
  }
  [HttpPost]
  public async Task<ActionResult> Post(string value)
  {
    await _publishEndpoint.Publish<ValueEntered>(new
    {
      Value = value
    });
    return Ok();
  }
}

訂閱端配置

訂閱端也建立一個WebApi應用,在Startup中增加MassTransit的服務,使用到的Nuget包和釋出端一樣。

  • 對RabbitMQ的連線地址埠、虛擬主機、訪問賬號密碼等系列配置。
  • 為訂閱端增加一個訂閱處理的Handler,即如下的ValueEnteredEventConsumer
  • 增加一個接受點,指定佇列名稱,即傳送端傳送的佇列名稱,設定該佇列消費處理的Consumer,即ValueEnteredEventConsumer
services.AddMassTransit(x =>
{
  x.AddConsumer<ValueEnteredEventConsumer>();
  x.UsingRabbitMq((context, cfg) =>
  {
    cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
    {
      h.Username(Configuration["RabbitmqConfig:Username"]);
      h.Password(Configuration["RabbitmqConfig:Password"]);
    });
    cfg.ReceiveEndpoint("events-valueentered", e =>
    {
      e.ConfigureConsumer<ValueEnteredEventConsumer>(context);
    });
  });
});
services.AddMassTransitHostedService();

如此一來,通過Postman傳送一個請求,經發布端釋出一個訊息到RabbitMQ,訂閱端偵聽訊息,處理訊息,一切都很熟悉。

請求響應模式

在釋出訂閱的基礎上,改變以往的習慣,當釋出一個訊息後,等待訂閱方的處理,並將訊息推送回RabbitMQ,傳送方接受到處理後的訊息繼續執行。

請求端

在Startup中新加上一個用於傳送訊息(CheckOrderStatus)的請求客戶端及指定訊息佇列名稱(為每一個訊息建立一個單獨的佇列)。

x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));

增加一個Controller及Action,來請求及獲取處理結果(OrderStatusResult)。

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
  private readonly IRequestClient<CheckOrderStatus> _client;
  public OrderController(IRequestClient<CheckOrderStatus> client)
  {
    _client = client;
  }
  public async Task<OrderStatusResult> Get(string id)
  {
    var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });
    return response.Message;
  }
}

響應端

同樣在響應端Startup中對新的訊息設定下訊息偵聽佇列以及相應的Handler如下的ValueEnteredEventConsumer去消費訊息並返回處理結果。

x.AddConsumer<CheckOrderStatusConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
  // ...
  cfg.ReceiveEndpoint("events-checkorderstatus", e =>
  {
  e.ConfigureConsumer<CheckOrderStatusConsumer>(context);
  });
});

Consumer中獲取請求引數,執行請求,返回執行結果。

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
  public async Task Consume(ConsumeContext<CheckOrderStatus> context)
  {
    if (context.Message.OrderId == "9527")
    {
      throw new InvalidOperationException("Order not found");
    }
    Console.WriteLine($"OrderId:{context.Message.OrderId}");
    await context.RespondAsync<OrderStatusResult>(new
    {
      OrderId = context.Message.OrderId,
      Timestamp = Guid.NewGuid().ToString(),
      StatusCode = "1",
      StatusText = "Close"
    });
  }
}

這樣一來,當請求端發起一個訊息(事件)到RabbitMQ,響應端偵聽並處理完畢返回處理結果到RabbitMQ,請求端依照響應結果繼續執行後續請求。

HTTP方式差異

與以往的Http請求方式有所不同,通過httpClient.PostAsync傳送請求,接收端處理並返回結果,而走requestClient傳送請求到RabbitMQ,再由RabbitMQ推送到偵聽節點消費並返回結果,如下第一二部分結構。

2021-06-29,望技術有成後能回來看見自己的腳步