1. 程式人生 > >使用RedisMQ 做分散式改造,從此放心安全迭代

使用RedisMQ 做分散式改造,從此放心安全迭代

引言

  熟悉TPL Dataflow博文的朋友可能記得這是個單體程式,使用TPL Dataflow 處理工作流任務, 在使用Docker部署的過程中, 有一個問題一直無法迴避:

       在單體程式部署的瞬間會有少量 流量無法處理;更糟糕的情況下,迭代部署的這個版本有問題,上線後無法運作, 更多的流量沒有得到處理。

      揹負神聖使命(巨大壓力)的程式猿心生一計, 為何不將單體程式改成分散式:服務A只接受資料,服務B只處理資料。

 

 

知識儲備:

    訊息佇列和訂閱釋出作為老生常談的兩個知識點被反覆提及,按照JMS的規範, 官方稱為點對點(point to point, queue) 和 訂閱釋出(publish/subscribe,topic ),

點對點:

  訊息生產者生產訊息傳送到queue中,然後消費者從queue中取出並且消費訊息。

注意:

訊息被消費以後,queue中不再有儲存,所以消費者不可能消費到已經被消費的訊息。

Queue支援存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費。

當沒有消費者可用時,這個訊息會被儲存直到有 一個可用的消費者。

釋出/訂閱

  訊息生產者(釋出)將訊息釋出到topic中,同時有多個訊息消費者(訂閱)消費該訊息。和點對點方式不同,釋出到topic的訊息會被所有訂閱者消費。

注意:

釋出者將訊息釋出到通道中,而不用知道訂閱者是誰(不關注是否存在訂閱者);訂閱者可收聽自己感興趣的多個通道, 也不需要知道釋出者是誰(不關注是哪個釋出者)。

故如果沒有消費者,釋出的訊息將得不到處理;

頭腦風暴 

本次採用的訊息佇列模型:

  •    解耦業務:  新建Receiver程式作為生產者,專注於接收併發送到佇列;原有的webapp作為消費者專注資料處理。
  •    起到削峰填谷的作用, 若建立多個消費者webapp容器,還能形成負載均衡的效果。 

Redis 原生支援釋出/訂閱 模型,內建的List資料結構亦能形成輕量級MQ的效果。

    需要關注Redis 兩個命令( 左進右出,右進左出同理):

    LPUSH  &  RPOP/BRPOP

Brpop 中的B 表示 “Block”, 是一個rpop命令的阻塞版本:若指定List沒有新元素,在給定時間內,該命令會阻塞當前redis客戶端連線,直到超時返回nil

程式設計實踐

本次使用 AspNetCore 完成RedisMQ的實踐。引入Redis國產第三方開源庫 CSRedisCore.

不使用著名的StackExchange.Redis 元件庫的原因:

  • 之前一直使用StackExchange.Redis, 參考了很多資料,做了很多優化,並未完全解決RedisTimeoutException問題 

  • StackExchange.Redis基於其多路複用機制,不支援阻塞式命令, 故採用了 CSRedisCore,該庫強調了API 與Redis官方命令一致,很容易上手

生產者Receiver:

------------------擷取自Startup.cs------------------------------ 
      public void ConfigureServices(IServiceCollection services)
      {
            var csredis = new CSRedisClient(Configuration.GetConnectionString("redis"));
            RedisHelper.Initialization(csredis);
            services.AddMvc();
      }

---------------------擷取自資料接收Controller-------------------
     [Route("batch")]
     [HttpPost]
     public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs)
     {
            if (!ModelState.IsValid)
                throw new ArgumentException("Http Body Payload Error.");
            var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; 
            eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);
            if (eqidPairs != null && eqidPairs.Any())
                RedisHelper.LPush(redisKey, eqidPairs.ToArray());
            await Task.CompletedTask;
     }

 消費者webapp:

     根據RedisMQ的事件推送方式,需要輪詢Redis  List 資料結構,這裡使用AspNetCore內建的BackgroundService 實現了 後臺輪詢任務。

    public class BackgroundJob : BackgroundService
    {
        private readonly IEqidPairHandler _eqidPairHandler;
        private readonly ILogger _logger;
        public BackgroundJob(IEqidPairHandler eqidPairHandler, ILoggerFactory loggerFactory)
        {
            _eqidPairHandler = eqidPairHandler;
            _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("Service starting");

            while (!stoppingToken.IsCancellationRequested)
            {
                var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";
                var eqidpair = RedisHelper.BRPop(5, key);
                if (eqidpair != null)
                    await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));
                else
                    await Task.Delay(1000, stoppingToken);
                
            }
            _logger.LogInformation("Service stopping");
        }
    }
var redis = new CSRedisClient[16]; //定義成單例
            for (var a = 0; a < redis.Length; a++)
                redis[a] = new CSRedisClient(Configuration.GetConnectionString("redis") + ",defualtDatabase=" + a);
            services.AddSingleton<CSRedisClient[]>(redis);
            RedisHelper.Initialization(redis[0]);
註冊CSRedisCore服務

 最後依照引言中的部署原理圖,將Nginx,Receiver, WebApp dockerize, 並且讓 webapp 依賴於Nginx,Receiver

-------------------擷取自docker-compose.yml檔案---------------------- 

 app:
    build:
      context: ./app
      dockerfile: Dockerfile
    expose:
      - "80"
    extra_hosts:
      - "dockerhost:172.18.0.1"
    environment:
      TZ: Asia/Shanghai
    volumes:
      - type: bind
        source: /mnt/eqidmanager/eqidlogs
        target: /app/eqidlogs
      - type: bind
        source: /mnt/eqidmanager/applogs
        target: /app/logs
      - type: bind
        source: /home/huangjun/eqidmanager/EqidManager.db
        target: /app/EqidManager.db
    healthcheck:
      test: ['CMD','curl','-f','http://localhost/healthcheck']
      interval: 1m30s
      timeout: 10s
      retries: 3
    depends_on:
      - receiver
      - proxy
    logging:
      options:
        max-size: "200k"
        max-file: "10"
    privileged: true 

       ① 根據docker-compsoe up命令的用法,若Receiver容器正在執行且服務配置並未改變,該容器不會被停止。

  ② 根據官方文件對於depends_on 指令的說明,該指定決定了容器啟動和停止的順序,因此引言中需要 【暫存流量】剛性需求可以得到滿足
  

  改造上線之後,效果很明顯,現在可以放心安全的迭代 TPL DataFlow資料處理程式。

作者:JulianHuang

碼甲拙見,如有問題請下方留言大膽斧正;碼字+Visio製圖,均為原創,看官請不吝好評+關注,  ~。。~

本文歡迎轉載,請轉載頁面明顯位置註明原作者及原文連結。