1. 程式人生 > >spring cloud stream 基礎使用

spring cloud stream 基礎使用

spring cloud stream


依賴

 compile 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'

本章記錄下調研 spring cloud stream 基礎用法的結果;

基礎

yml 配置

spring:
  rabbitmq:
    host: 192.168.7.106
    port
: 5672 username: xxx password: 123456

消費端

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

/**
 * <pre>
 * 使用預設的佇列名,消費端
 * </pre>
 * @author zhuqiang
 * @date 2018/12/4 13:40
 */
@EnableBinding(value = {Sink.class}) @Slf4j public class SinkReceiver { @StreamListener(Sink.INPUT) public void receive(Object payload) { log.info("Received from default channel : {}", payload.toString()); } }

生產端,使用測試來發送

@RunWith(SpringRunner.class)
@EnableBinding(value = {
ReceiverAppTest.SinkSender.class}) public class ReceiverAppTest { @Autowired private SinkSender sinkSender; @Test public void sinkSenderTester() { sinkSender.output().send(MessageBuilder.withPayload("produce a message to " + Sink.INPUT + " channel").build()); } public interface SinkSender { @Output(Sink.INPUT) MessageChannel output(); } }

分組、自定義、重試、處理錯誤、死信佇列

  • 分組 : 分組的作用是,多個相同的分散式微服務啟動的話,讓他們在一個佇列組裡面,這樣每次就只能有一個例項被消費了
  • 自定義:基礎裡面是使用框架預定義的佇列名稱,這裡可以自定義
  • 重試:在處理訊息的時候,如果丟擲了異常,則會再次委派給你處理
  • 處理錯誤:在處理訊息的時候,如果訊息丟擲了異常,你希望處理這個錯誤的話
  • 死信佇列:當這個訊息處理失敗多次的時候,那麼很有可能是一個不可處理的訊息,放入死信佇列

自定義、預設重試機制與錯誤處理

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * <pre>
 *  定義消費端,資訊接收處理
 * </pre>
 * @author zhuqiang
 * @date 2018/12/3 15:31
 */
public interface CmdInput {
    String INPUT = “xxxxx”;

    @Input(INPUT)
    SubscribableChannel input();
}
@Component
@EnableBinding(CmdInput .class) // 開啟繫結介面
@Slf4j
public class NoticeCmdReceiver {

    // 預設丟擲異常的話,預設重試 3 次
    // 如果有自定義的錯誤處理則不會重試
    @StreamListener(target = NoticeCmdInput.INPUT)
    public void process(NoticeCmdRequest cmd) {
        log.info("StreamListener process message {}", cmd);
        throw new RuntimeException("xxx");
    }
    // 處理錯誤通道名稱格式是:通道名.組名.errors
    // 不會通過 佇列傳遞,應該是內部呼叫
    @ServiceActivator(inputChannel = CmdInput.INPUT + ".group.errors")
    public void processErrorerror(Message<?> message) {
        System.out.println("Handling ERROR: " + message);
    }
}    

注意:yml 中沒有額外的配置;

死信佇列與分組

yml 配置

spring:
  cloud:
    stream:
      bindings:
        # 多個例項消費的時候可以唯一消費
        xxxxx:
          destination: xxxxx
          group: group # 使用死信佇列必須分組
          consumer:
            max_attempt: 3  # 最大重試次數,預設是 3 次
            dlq-ttl: 2000  # 重試間隔時間
            republishToDlq: true
            #  deadLetterQueueName: 手動寫死死信佇列名稱,也可以像下面這樣開啟自動名稱,如這裡自動生成 xxxxx.group.dlq
      rabbit:
        bindings:
          xxxxx:
            # 死信佇列
            consumer:
              auto-bind-dlq: true

增加該配置之後,就會回 xxxxx 的佇列再自動生成一個死信佇列 xxxxx.group.dlq;
這裡的配置加上最上面的程式碼效果就是:

  1. 失敗會重試 3 次
  2. 3 次失敗就會進入錯誤處理方法。而不會進入死信佇列;死信佇列與錯誤處理是互斥的;

總結

  1. 分組:單例項消費
  2. 重試:沒有配置 consumer.max_attempt 的話,預設是 3 次,且有錯誤處理的話,不會重試
  3. 處理錯誤:處理錯誤與死信佇列是互斥的,要麼被處理了,要麼發往死信佇列