cloud stream 官方文件閱讀筆記1
1、建立專案
在 spring initialzar 中選擇 Rabbitmq和cloud Stream兩個模組,最好的方法是搜尋就可以出來了
2、一個簡單的例子
修改生成的主程式為下面的形式
@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args) ;
}
@StreamListener(Sink.INPUT)
public void handle(Person person) {
System.out.println("Received: " + person);
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString () {
return this.name;
}
}
}
這裡使用了@EnableBinding(Sink.class),使得Sink能夠繫結訊息的 input和output。初始化了框架繫結到訊息中介軟體
而且它自動生成了配置(包含佇列,主題和其他的元件)通過 Sink.INPUT 訊息通道。
這裡加入了一個handler方法去接收收到的Person型別的訊息,該方法將直接將接收到的訊息轉化成Person物件。
現在,我們已經有一個完整的springcloud Stream專案來監聽訊息隊列了。不過在此之前,我們預設你選擇了RabbitMQ作為
訊息中介軟體。預設你RabbitMQ服務已經安裝且正在執行(注:預設在本機的RabbitMQ預設埠開啟服務,而沒有修改埠),現在你可以通過執行
主程式的 main 方法來開始這個程式了。
你在控制檯中可能會看見這些輸出:
--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
--- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
--- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a3a299:0/[email protected] . .
. . .
--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
. . .
--- [ main] c.e.l.LoggingConsumerApplication : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)
現在區你的 RabbitMQ web管理介面或者其他 訊息佇列客戶端向當前佇列傳送一個訊息,傳送的佇列就是以上控制檯輸出中 input
繫結的佇列(注:rabbitmq支援動態繫結,根據程式的實現進行繫結)。例如,根據以上資訊,我應該向
input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg 佇列傳送檢測資訊。其中,anonymous.CbMIwdkJSBO1ZoPDOtHtCg 代表的是一個訊息
組而且被註冊了。所以它在你的環境中可能是不一樣的,更優秀的做法是,你可以給它指派一個確切的名字,只要你喜歡,你可以這樣做:
spring.cloud.stream.bindings.input.group=hello
指派你的組名為 hello
在訊息佇列中我們發一個JSON格式的訊息來代表一個Person物件,就像下面這樣:
{"name":"Sam Spade"}
然後在你的控制檯下面就應該會輸出
Received: Sam Spade