1. 程式人生 > 實用技巧 >Asp.net Core CAP Kafka 分散式訊息佇列

Asp.net Core CAP Kafka 分散式訊息佇列

CAP 是一個基於 .NET Standard 的 C# 庫,它是一種處理分散式事務的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高效能等特點。

參考中文官網
從這張圖可以看到CAP主要是保證訊息的一致性,沒有事務回滾的操作,需要自己實現訊息雙向推送
我一開始以為實現分散式事務鎖的機制,後來訊息微服務用鎖實現事務也不太合適
下面是我寫的一個Demo
CAP 支援的多種訊息佇列,我這裡用的是kafka
1.kafka 安裝,localtime是時區檔案,可以進入bash設定中國的時區然後把檔案拷貝出來,如果有中國時區的linux的系統直接copy出來對映就行了

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

docker run -d --name zookeeper -p 2181:2181 `
-v $pwd/localtime:/etc/localtime wurstmeister/zookeeper

docker run -d --name kafka `
-p 9092:9092 -e KAFKA_BROKER_ID=0 `
-e KAFKA_ZOOKEEPER_CONNECT=56.1.136.150:2181/kafka `
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 `
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 `
-v $pwd/localtime:/etc/localtime wurstmeister/kafka

2.新建專案 Services_1和Services_2
導包

Install-Package DotNetCore.CAP
Install-Package DotNetCore.CAP.Kafka
Install-Package DotNetCore.CAP.SqlServer

3.註冊服務 Services_1和Services_2

  services.AddCapExtension(Configuration);

  public static class ServiceCollectionExtensions
    {
        public static void AddCapExtension(this IServiceCollection services, IConfiguration configuration)
        {
            services.AddCap(t =>
            {
                t.UseSqlServer(configuration.GetConnectionString("Cap"));
                t.UseKafka(configuration.GetConnectionString("Kafka"));

            });
        }
    }
  1. 訊息訂閱 Services_2
    public interface ISubscriberEvent
    {
        public void Hello(string msg);
    }

    public class SubscriberEventHandler : ISubscriberEvent, ICapSubscribe
    {
        [CapSubscribe("Service2.hello")]
        public void Hello(string msg)
        {
            Console.WriteLine(msg);
        }
    }

    services.AddTransient<EventSubs.ISubscriberEvent, EventSubs.SubscriberEventHandler>();

5.訊息釋出

    public class Service1Controller : ControllerBase
    {
        [HttpGet]
        public IActionResult Get([FromServices] ICapPublisher capPublish, [FromServices] IConfiguration configuration)
        {
            using SqlConnection connection = new SqlConnection(configuration.GetConnectionString("Cap"));
            connection.Open();
            using (IDbTransaction transaction = connection.BeginTransaction(capPublish)) {
                try
                {
                    capPublish.Publish("Service2.hello", "分散式事務");        
                    transaction.Commit();
                }
                catch (Exception ex)
                {
                    transaction.Rollback();
                }
            }            
            return Ok();
        }

    }

Demo下載