Asp.net Core CAP Kafka 分散式訊息佇列
阿新 • • 發佈:2020-10-29
CAP 是一個基於 .NET Standard 的 C# 庫,它是一種處理分散式事務的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高效能等特點。
參考中文官網
從這張圖可以看到CAP主要是保證訊息的一致性,沒有事務回滾的操作,需要自己實現訊息雙向推送
我一開始以為實現分散式事務鎖的機制,後來訊息微服務用鎖實現事務也不太合適
下面是我寫的一個Demo
CAP 支援的多種訊息佇列,我這裡用的是kafka
1.kafka 安裝,localtime是時區檔案,可以進入bash設定中國的時區然後把檔案拷貝出來,如果有中國時區的linux的系統直接copy出來對映就行了
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka docker run -d --name zookeeper -p 2181:2181 ` -v $pwd/localtime:/etc/localtime wurstmeister/zookeeper docker run -d --name kafka ` -p 9092:9092 -e KAFKA_BROKER_ID=0 ` -e KAFKA_ZOOKEEPER_CONNECT=56.1.136.150:2181/kafka ` -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 ` -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 ` -v $pwd/localtime:/etc/localtime wurstmeister/kafka
2.新建專案 Services_1和Services_2
導包
Install-Package DotNetCore.CAP
Install-Package DotNetCore.CAP.Kafka
Install-Package DotNetCore.CAP.SqlServer
3.註冊服務 Services_1和Services_2
services.AddCapExtension(Configuration); public static class ServiceCollectionExtensions { public static void AddCapExtension(this IServiceCollection services, IConfiguration configuration) { services.AddCap(t => { t.UseSqlServer(configuration.GetConnectionString("Cap")); t.UseKafka(configuration.GetConnectionString("Kafka")); }); } }
- 訊息訂閱 Services_2
public interface ISubscriberEvent { public void Hello(string msg); } public class SubscriberEventHandler : ISubscriberEvent, ICapSubscribe { [CapSubscribe("Service2.hello")] public void Hello(string msg) { Console.WriteLine(msg); } } services.AddTransient<EventSubs.ISubscriberEvent, EventSubs.SubscriberEventHandler>();
5.訊息釋出
public class Service1Controller : ControllerBase
{
[HttpGet]
public IActionResult Get([FromServices] ICapPublisher capPublish, [FromServices] IConfiguration configuration)
{
using SqlConnection connection = new SqlConnection(configuration.GetConnectionString("Cap"));
connection.Open();
using (IDbTransaction transaction = connection.BeginTransaction(capPublish)) {
try
{
capPublish.Publish("Service2.hello", "分散式事務");
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
}
}
return Ok();
}
}