1. 程式人生 > 其它 >Dapr牽手.NET學習筆記:釋出-訂閱

Dapr牽手.NET學習筆記:釋出-訂閱

  queue,是很好的削峰填谷工具,在業內也是主流;釋出訂閱,可以有效的解耦兩個應用,所以dapr把他們進行了有效的封裝,我們使用起來更簡單高效。

  本篇的案例是下完訂單後,會把訊息釋出到redis(當然也可以是其他)中,通知系統和支付系統會訂單這個訊息,同時,通知系統和支付系統的兩個例項中,只會有一個例項接收到這個訊息,進行處理,呼叫示意圖如下:

 

 專案結構如下:

 

 

一、配置

用docker-compose部署,docker-compose.yml內容

version: '3.4'

services:
  #┌────────────────────────────────┐
  #│ ordersystem app 
+ Dapr sidecar │ #└────────────────────────────────┘ ordersystem: image: ${DOCKER_REGISTRY-}ordersystem depends_on: - redis - placement build: context: ../ dockerfile: OrderSystem/Dockerfile ports: - "3500:3500" volumes: - ../OrderSystem:/OrderSystem networks:
- b2c-dapr ordersystem-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "order", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components"] depends_on: - ordersystem network_mode: "service:ordersystem" volumes:
- ../components:/components #┌───────────────────────────────────┐ #│ paymentsystem1 app + Dapr sidecar │ #└───────────────────────────────────┘ paymentsystem1: image: ${DOCKER_REGISTRY-}paymentsystem build: context: ../ dockerfile: PaymentSystem/Dockerfile ports: - "3601:3500" volumes: - ../PaymentSystem:/PaymentSystem networks: - b2c-dapr paymentsystem1-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "pay", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ] depends_on: - paymentsystem1 network_mode: "service:paymentsystem1" volumes: - ../components:/components #┌───────────────────────────────────┐ #│ paymentsystem2 app + Dapr sidecar │ #└───────────────────────────────────┘ paymentsystem2: image: ${DOCKER_REGISTRY-}paymentsystem build: context: ../ dockerfile: PaymentSystem/Dockerfile volumes: - ../PaymentSystem:/PaymentSystem ports: - "3602:3500" networks: - b2c-dapr paymentsystem2-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "pay", "-app-port", "80" ,"-placement-host-address", "placement:50006","-components-path","/components"] depends_on: - paymentsystem2 network_mode: "service:paymentsystem2" volumes: - ../components:/components #┌───────────────────────────────────┐ #│ noticesystem1 app + Dapr sidecar │ #└───────────────────────────────────┘ noticesystem1: image: ${DOCKER_REGISTRY-}noticesystem build: context: ../ dockerfile: NoticeSystem/Dockerfile ports: - "3701:3500" volumes: - ../NoticeSystem:/NoticeSystem networks: - b2c-dapr noticesystem1-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ] depends_on: - noticesystem1 network_mode: "service:noticesystem1" volumes: - ../components:/components #┌───────────────────────────────────┐ #│ noticesystem2 app + Dapr sidecar │ #└───────────────────────────────────┘ noticesystem2: image: ${DOCKER_REGISTRY-}noticesystem build: context: ../ dockerfile: NoticeSystem/Dockerfile ports: - "3702:3500" volumes: - ../NoticeSystem:/NoticeSystem networks: - b2c-dapr noticesystem2-dapr: image: "daprio/daprd:latest" command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ] depends_on: - noticesystem2 network_mode: "service:noticesystem2" volumes: - ../components:/components #┌────────────────────────┐ #│ Dapr placement service │ #└────────────────────────┘ placement: image: "daprio/dapr" command: ["./placement", "-port", "50006"] ports: - "50006:50006" networks: - b2c-dapr #┌───────────────────┐ #│ Redis state store │ #└───────────────────┘ redis: image: "redis:latest" ports: - "6380:6379" networks: - b2c-dapr networks: b2c-dapr:

pubsub.yaml(在components資料夾下 )內容是預設,如下

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: redis:6379
  - name: redisPassword
    value: ""

訂閱配置檔案如下subscription.yaml(在components資料夾下 )

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: orderComplete
  route: /ordercomplete
  pubsubname: pubsub
scopes:
- pay
- notice

二、程式碼

OrderSystem專案的appsettings.json

"PublishUrl": "http://localhost:3500/v1.0/publish/pubsub/orderComplete"

OrderSystem專案的釋出方法

 [HttpGet("/orderpub/{orderno}")]
    public async Task<IActionResult> OrderPub(string orderno)
    {
        try
        {
            _logger.LogInformation($"Order,publish");
            await Task.Delay(400);
            var client = _clientFactory.CreateClient();

            var stringContent = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(new { OrderNo = orderno, Amount = 30000, OrderTime = DateTime.UtcNow}), System.Text.Encoding.UTF8, "application/json");
            _logger.LogInformation(stringContent.ToString());
            var content = await client.PostAsync(_publishUrl, stringContent);
            return new JsonResult(new { order_result = "Order success,and publish", pay_result = content });
        }
        catch (Exception exc)
        {
            _logger.LogCritical(exc, exc.Message);
            return new JsonResult(new { order_result = "Order success,and publish,pay exception", message = exc.Message });
        }
    }

PaymentSystem和NoticeSystem專案中的訂閱實現

兩個實體類

public class PubBody
{
    public string id { get; set; }
    public string source { get; set; }
    public string pubsubname { get; set; }
    public string traceid { get; set; }
    public PubOrder data { get; set; }
    public string specversion { get; set; }
    public string datacontenttype { get; set; }
    public string type { get; set; }
    public string topic { get; set; }
}

public class PubOrder
{
    public string OrderNo { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderTime { get; set; }
}

NoticeSystem和PaymentSystem兩個專案中的訂閱方法如下

 [HttpPost("/ordercomplete")]
    public async Task<IActionResult> OrderComplete()
    {
        try
        {
            _logger.LogInformation("PaymentSystem OrderComplete runing……");
            using var reader = new StreamReader(Request.Body, System.Text.Encoding.UTF8);
            var content = await reader.ReadToEndAsync();
            var pubBody = Newtonsoft.Json.JsonConvert.DeserializeObject<PubBody>(content);
            _logger.LogInformation($"---------  HostName:{Dns.GetHostName()},OrderNo:{pubBody?.data.OrderNo},OrderAmount:{pubBody?.data.Amount},OrderTime:{pubBody?.data.OrderTime} -----------");
            await Task.Delay(200);
            _logger.LogInformation($"subscription pay complete");
            _logger.LogInformation($"return  SUCCESS");
            return new JsonResult(new
            {
                Status = "SUCCESS"
            });
        }
        catch (Exception exc)
        {
            _logger.LogCritical(exc, exc.Message);
            _logger.LogInformation($"return  RETRY");
            return new JsonResult(new
            {
                Status = "RETRY"
            });
        }
    }

三、釋出測試

進入在B2C目發,用命令列啟動docker compose

docker-compose up -d

可以測試了,呼叫OrderSystem的對外地址,下訂單NO0001,和NO0002

localhost:3500/v1.0/invoke/order/method/orderpub/NO0001和

localhost:3500/v1.0/invoke/order/method/orderpub/NO0001

檢視容器noticesystem1

 

 檢視容器noticesystem2

 

 檢視容器paymentsystem1

 

 檢視容器paymentsystem2

 

   NoticeSystem和PaymentSystem同時訂閱OrderSystem專案的釋出orderComplete,兩個例項會輪詢處理訂閱結果。Dapr就這樣,把複雜的釋出訂閱,封裝成一個api一樣的簡單呼叫和接收,專案中沒有一點的痕跡。

  想要更快更方便的瞭解相關知識,可以關注微信公眾號