spring cloud stream 基礎使用
阿新 • • 發佈:2018-12-05
spring cloud stream
依賴
compile 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
本章記錄下調研 spring cloud stream 基礎用法的結果;
基礎
yml 配置
spring:
rabbitmq:
host: 192.168.7.106
port : 5672
username: xxx
password: 123456
消費端
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
* <pre>
* 使用預設的佇列名,消費端
* </pre>
* @author zhuqiang
* @date 2018/12/4 13:40
*/
@EnableBinding(value = {Sink.class})
@Slf4j
public class SinkReceiver {
@StreamListener(Sink.INPUT)
public void receive(Object payload) {
log.info("Received from default channel : {}", payload.toString());
}
}
生產端,使用測試來發送
@RunWith(SpringRunner.class)
@EnableBinding(value = { ReceiverAppTest.SinkSender.class})
public class ReceiverAppTest {
@Autowired
private SinkSender sinkSender;
@Test
public void sinkSenderTester() {
sinkSender.output().send(MessageBuilder.withPayload("produce a message to " + Sink.INPUT + " channel").build());
}
public interface SinkSender {
@Output(Sink.INPUT)
MessageChannel output();
}
}
分組、自定義、重試、處理錯誤、死信佇列
- 分組 : 分組的作用是,多個相同的分散式微服務啟動的話,讓他們在一個佇列組裡面,這樣每次就只能有一個例項被消費了
- 自定義:基礎裡面是使用框架預定義的佇列名稱,這裡可以自定義
- 重試:在處理訊息的時候,如果丟擲了異常,則會再次委派給你處理
- 處理錯誤:在處理訊息的時候,如果訊息丟擲了異常,你希望處理這個錯誤的話
- 死信佇列:當這個訊息處理失敗多次的時候,那麼很有可能是一個不可處理的訊息,放入死信佇列
自定義、預設重試機制與錯誤處理
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* <pre>
* 定義消費端,資訊接收處理
* </pre>
* @author zhuqiang
* @date 2018/12/3 15:31
*/
public interface CmdInput {
String INPUT = “xxxxx”;
@Input(INPUT)
SubscribableChannel input();
}
@Component
@EnableBinding(CmdInput .class) // 開啟繫結介面
@Slf4j
public class NoticeCmdReceiver {
// 預設丟擲異常的話,預設重試 3 次
// 如果有自定義的錯誤處理則不會重試
@StreamListener(target = NoticeCmdInput.INPUT)
public void process(NoticeCmdRequest cmd) {
log.info("StreamListener process message {}", cmd);
throw new RuntimeException("xxx");
}
// 處理錯誤通道名稱格式是:通道名.組名.errors
// 不會通過 佇列傳遞,應該是內部呼叫
@ServiceActivator(inputChannel = CmdInput.INPUT + ".group.errors")
public void processErrorerror(Message<?> message) {
System.out.println("Handling ERROR: " + message);
}
}
注意:yml 中沒有額外的配置;
死信佇列與分組
yml 配置
spring:
cloud:
stream:
bindings:
# 多個例項消費的時候可以唯一消費
xxxxx:
destination: xxxxx
group: group # 使用死信佇列必須分組
consumer:
max_attempt: 3 # 最大重試次數,預設是 3 次
dlq-ttl: 2000 # 重試間隔時間
republishToDlq: true
# deadLetterQueueName: 手動寫死死信佇列名稱,也可以像下面這樣開啟自動名稱,如這裡自動生成 xxxxx.group.dlq
rabbit:
bindings:
xxxxx:
# 死信佇列
consumer:
auto-bind-dlq: true
增加該配置之後,就會回 xxxxx 的佇列再自動生成一個死信佇列 xxxxx.group.dlq;
這裡的配置加上最上面的程式碼效果就是:
- 失敗會重試 3 次
- 3 次失敗就會進入錯誤處理方法。而不會進入死信佇列;死信佇列與錯誤處理是互斥的;
總結
- 分組:單例項消費
- 重試:沒有配置 consumer.max_attempt 的話,預設是 3 次,且有錯誤處理的話,不會重試
- 處理錯誤:處理錯誤與死信佇列是互斥的,要麼被處理了,要麼發往死信佇列