Dapr牽手.NET學習筆記:釋出-訂閱
queue,是很好的削峰填谷工具,在業內也是主流;釋出訂閱,可以有效的解耦兩個應用,所以dapr把他們進行了有效的封裝,我們使用起來更簡單高效。
本篇的案例是下完訂單後,會把訊息釋出到redis(當然也可以是其他)中,通知系統和支付系統會訂單這個訊息,同時,通知系統和支付系統的兩個例項中,只會有一個例項接收到這個訊息,進行處理,呼叫示意圖如下:
專案結構如下:
一、配置
用docker-compose部署,docker-compose.yml內容
version: '3.4'
services:
#┌────────────────────────────────┐
#│ ordersystem app + Dapr sidecar │
#└────────────────────────────────┘
ordersystem:
image: ${DOCKER_REGISTRY-}ordersystem
depends_on:
- redis
- placement
build:
context: ../
dockerfile: OrderSystem/Dockerfile
ports:
- "3500:3500"
volumes:
- ../OrderSystem:/OrderSystem
networks:
- b2c-dapr
ordersystem-dapr:
image: "daprio/daprd:latest"
command: [ "./daprd", "-app-id", "order", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components"]
depends_on:
- ordersystem
network_mode: "service:ordersystem"
volumes:
- ../components:/components
#┌───────────────────────────────────┐
#│ paymentsystem1 app + Dapr sidecar │
#└───────────────────────────────────┘
paymentsystem1:
image: ${DOCKER_REGISTRY-}paymentsystem
build:
context: ../
dockerfile: PaymentSystem/Dockerfile
ports:
- "3601:3500"
volumes:
- ../PaymentSystem:/PaymentSystem
networks:
- b2c-dapr
paymentsystem1-dapr:
image: "daprio/daprd:latest"
command: [ "./daprd", "-app-id", "pay", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]
depends_on:
- paymentsystem1
network_mode: "service:paymentsystem1"
volumes:
- ../components:/components
#┌───────────────────────────────────┐
#│ paymentsystem2 app + Dapr sidecar │
#└───────────────────────────────────┘
paymentsystem2:
image: ${DOCKER_REGISTRY-}paymentsystem
build:
context: ../
dockerfile: PaymentSystem/Dockerfile
volumes:
- ../PaymentSystem:/PaymentSystem
ports:
- "3602:3500"
networks:
- b2c-dapr
paymentsystem2-dapr:
image: "daprio/daprd:latest"
command: [ "./daprd", "-app-id", "pay", "-app-port", "80" ,"-placement-host-address", "placement:50006","-components-path","/components"]
depends_on:
- paymentsystem2
network_mode: "service:paymentsystem2"
volumes:
- ../components:/components
#┌───────────────────────────────────┐
#│ noticesystem1 app + Dapr sidecar │
#└───────────────────────────────────┘
noticesystem1:
image: ${DOCKER_REGISTRY-}noticesystem
build:
context: ../
dockerfile: NoticeSystem/Dockerfile
ports:
- "3701:3500"
volumes:
- ../NoticeSystem:/NoticeSystem
networks:
- b2c-dapr
noticesystem1-dapr:
image: "daprio/daprd:latest"
command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]
depends_on:
- noticesystem1
network_mode: "service:noticesystem1"
volumes:
- ../components:/components
#┌───────────────────────────────────┐
#│ noticesystem2 app + Dapr sidecar │
#└───────────────────────────────────┘
noticesystem2:
image: ${DOCKER_REGISTRY-}noticesystem
build:
context: ../
dockerfile: NoticeSystem/Dockerfile
ports:
- "3702:3500"
volumes:
- ../NoticeSystem:/NoticeSystem
networks:
- b2c-dapr
noticesystem2-dapr:
image: "daprio/daprd:latest"
command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]
depends_on:
- noticesystem2
network_mode: "service:noticesystem2"
volumes:
- ../components:/components
#┌────────────────────────┐
#│ Dapr placement service │
#└────────────────────────┘
placement:
image: "daprio/dapr"
command: ["./placement", "-port", "50006"]
ports:
- "50006:50006"
networks:
- b2c-dapr
#┌───────────────────┐
#│ Redis state store │
#└───────────────────┘
redis:
image: "redis:latest"
ports:
- "6380:6379"
networks:
- b2c-dapr
networks:
b2c-dapr:
pubsub.yaml(在components資料夾下 )內容是預設,如下
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis:6379
- name: redisPassword
value: ""
訂閱配置檔案如下subscription.yaml(在components資料夾下 )
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: orderComplete
route: /ordercomplete
pubsubname: pubsub
scopes:
- pay
- notice
二、程式碼
OrderSystem專案的appsettings.json
"PublishUrl": "http://localhost:3500/v1.0/publish/pubsub/orderComplete"
OrderSystem專案的釋出方法
[HttpGet("/orderpub/{orderno}")]
public async Task<IActionResult> OrderPub(string orderno)
{
try
{
_logger.LogInformation($"Order,publish");
await Task.Delay(400);
var client = _clientFactory.CreateClient();
var stringContent = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(new { OrderNo = orderno, Amount = 30000, OrderTime = DateTime.UtcNow}), System.Text.Encoding.UTF8, "application/json");
_logger.LogInformation(stringContent.ToString());
var content = await client.PostAsync(_publishUrl, stringContent);
return new JsonResult(new { order_result = "Order success,and publish", pay_result = content });
}
catch (Exception exc)
{
_logger.LogCritical(exc, exc.Message);
return new JsonResult(new { order_result = "Order success,and publish,pay exception", message = exc.Message });
}
}
PaymentSystem和NoticeSystem專案中的訂閱實現
兩個實體類
public class PubBody
{
public string id { get; set; }
public string source { get; set; }
public string pubsubname { get; set; }
public string traceid { get; set; }
public PubOrder data { get; set; }
public string specversion { get; set; }
public string datacontenttype { get; set; }
public string type { get; set; }
public string topic { get; set; }
}
public class PubOrder
{
public string OrderNo { get; set; }
public decimal Amount { get; set; }
public DateTime OrderTime { get; set; }
}
NoticeSystem和PaymentSystem兩個專案中的訂閱方法如下
[HttpPost("/ordercomplete")]
public async Task<IActionResult> OrderComplete()
{
try
{
_logger.LogInformation("PaymentSystem OrderComplete runing……");
using var reader = new StreamReader(Request.Body, System.Text.Encoding.UTF8);
var content = await reader.ReadToEndAsync();
var pubBody = Newtonsoft.Json.JsonConvert.DeserializeObject<PubBody>(content);
_logger.LogInformation($"--------- HostName:{Dns.GetHostName()},OrderNo:{pubBody?.data.OrderNo},OrderAmount:{pubBody?.data.Amount},OrderTime:{pubBody?.data.OrderTime} -----------");
await Task.Delay(200);
_logger.LogInformation($"subscription pay complete");
_logger.LogInformation($"return SUCCESS");
return new JsonResult(new
{
Status = "SUCCESS"
});
}
catch (Exception exc)
{
_logger.LogCritical(exc, exc.Message);
_logger.LogInformation($"return RETRY");
return new JsonResult(new
{
Status = "RETRY"
});
}
}
三、釋出測試
進入在B2C目發,用命令列啟動docker compose
docker-compose up -d
可以測試了,呼叫OrderSystem的對外地址,下訂單NO0001,和NO0002
localhost:3500/v1.0/invoke/order/method/orderpub/NO0001和
localhost:3500/v1.0/invoke/order/method/orderpub/NO0001
檢視容器noticesystem1
檢視容器noticesystem2
檢視容器paymentsystem1
檢視容器paymentsystem2
NoticeSystem和PaymentSystem同時訂閱OrderSystem專案的釋出orderComplete,兩個例項會輪詢處理訂閱結果。Dapr就這樣,把複雜的釋出訂閱,封裝成一個api一樣的簡單呼叫和接收,專案中沒有一點的痕跡。