springboot 第十章 (springboot與訊息)
概述
1.大多應用中,可通過訊息服務中介軟體來提升系統非同步通訊、擴充套件解耦能力
1.訊息服務中兩個重要概念:
訊息代理(message broker)和目的地(destination)
當訊息傳送者傳送訊息以後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。
3.訊息佇列主要有兩種形式的目的地
1.佇列(queue):點對點訊息通訊(point-to-point)
2.主題(topic):釋出(publish)/訂閱(subscribe)訊息通訊
訊息佇列能幹什麼?
非同步處理
非同步處理,使用者註冊,同時傳送簡訊和郵箱
傳統方式單執行緒, 呼叫完傳送註冊郵箱的方法呼叫傳送註冊簡訊, 在使用者多的情況下效果會卡頓,延遲高使用者體驗不好
我們可以通過多執行緒, 假設此案例開啟兩個執行緒一個用來發簡訊,一個用來註冊郵箱 ,這樣是減少時間,但是隨著使用者變多不是辦法,
我們可以把註冊資訊放入一個佇列, 然後兩個服務非同步的不斷處理列隊中的訊息, 這樣給使用者的反應會很快
佇列 : 先進先出, 棧:先進後出 ,
應用解耦
應用之間的呼叫,解耦使用過 webservice 效率低, dubbo 這個效能高,但是隨著微服務的發展感覺快被淘汰了 ,
流量削峰
比如秒殺, 如果系統對每條資料都進行處理, 不崩潰是不可能的,除非你的網站沒人訪問,沒人來,
可以制100 個訊息位,可以的訊息進了前一百, 就給他儲存到列隊中,如果沒有進, 直接回應秒殺失敗,這個就很快樂, 如果進了,我們慢慢處理, 之前用redis 快取技術也是可以實現的
相關概念
點對點
這個的點對點不是 A->B 的這種,一個點對應另一個點, a傳送一個b接受一個, 這裡的點對點是說, a傳送了會儲存在訊息列隊中,然後b可以來拿這個訊息處理,操作, 如果有一個c 也可以,但是拿走訊息之後, 訊息就被移除列隊
釋出訂閱式
這個好理解, 就是a釋出一個主題, 然後同時很多物件都可以接收到,
JMS
java訊息服務,基於jvm訊息佇列的規範,ActiveMQ,HornetHQ 都是HornetHQ的實現
AMQP
–高階訊息佇列協議,也是一個訊息代理的規範,相容JMS
–RabbitMQ是AMQP的實現
JMS |
AMQP |
|
定義 |
Java api |
網路線級協議 |
跨語言 |
否 |
是 |
跨平臺 |
否 |
是 |
Model |
提供兩種訊息模型: (1)、Peer-2-Peer (2)、Pub/sub |
提供了五種訊息模型: (1)、direct exchange (2)、fanout exchange (3)、topic change (4)、headers exchange (5)、system exchange 本質來講,後四種和JMS的pub/sub模型沒有太大差別,僅是在路由機制上做了更詳細的劃分; |
支援訊息型別 |
多種訊息型別: TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message (只有訊息頭和屬性) |
byte[] 當實際應用時,有複雜的訊息,可以將訊息序列化後傳送。 |
綜合評價 |
JMS 定義了JAVA API層面的標準;在java體系中,多個client均可以通過JMS進行互動,不需要應用修改程式碼,但是其對跨平臺的支援較差; |
AMQP定義了wire-level層的協議標準;天然具有跨平臺、跨語言特性。 |
springboot 支援
–spring-jms提供了對JMS的支援
–spring-rabbit提供了對AMQP的支援
–需要ConnectionFactory的實現來連線訊息代理
–提供JmsTemplate、RabbitTemplate來發送訊息
–@JmsListener(JMS)、@RabbitListener(AMQP)註解在方法上監聽訊息代理髮布的訊息
–@EnableJms、@EnableRabbit開啟支援
–
springboot 自動內建了自動配置類
–JmsAutoConfiguration
–RabbitAutoConfiguration
RabbitMQ
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。
核心概念
Message
訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。
Publisher
訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式。
Exchange
交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。(交換器通過傳送者的路由鍵,來區分訊息所在的位置)
Exchange有4種類型:direct(預設),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別
direct : 可以實現點對點
fanout topic headers : 實現釋出功能
Queue
訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。
Binding
繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。
Exchange 和Queue的繫結可以是多對多的關係。
Connection
網路連線,比如一個TCP連線。
Channel
通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線。
Consumer
訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
Virtual Host
虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost 是 AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost 是 / 。
Broker
表示訊息佇列伺服器實體
Springboot整合RabbitMq
傳送訊息
@Test
public void contextLoads() {
Map<String,String> data = new HashMap<>();
data.put("你好呀","e你");
rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",data);
}
點對點
//廣播的形式
@Test
public void fanout(){
rabbitTemplate.convertAndSend("exchange.fanout",new Book("三國演義","大叔"));
}
廣播
接受訊息
@Test
public void receive(){
Object atguigu = rabbitTemplate.receiveAndConvert("gulixueyuan.news");
System.out.println(atguigu.toString());
}
監聽一個或者多個列隊獲取訊息
@RabbitListener(queues = "atguigu.news")
public void receive(Book book){
System.out.print(book.getName());
}
@RabbitListener(queues = "atguigu.news")
public void receive (Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
模式使用java 的序列化,我們可以自定義json
package com.zzq.springboot02rabbitmq.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class myrabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
AmqpAdmin
使用springboot 提供的amqpAdmin我們可以操作 rabbitmq 中的元件,
@Autowired
private AmqpAdmin amqpAdmin;
@Test
public void createQueue(){
amqpAdmin.declareQueue(new org.springframework.amqp.core.Queue("myQueue"));
}
//建立點對點的轉換器
@Test
public void createExchanges(){
amqpAdmin.declareExchange(new DirectExchange("myDirectExchange"));
}
//繫結自己建立的queue 和exchanges
@Test
public void bindingQueueExchanges(){
/**
* 繫結的元件 ,被繫結的目標型別, 繫結的轉換器 , 繫結的路由鍵值,繫結引數
*/
amqpAdmin.declareBinding(new Binding("myQueue",Binding.DestinationType.QUEUE,"myDirectExchange","myRoutingKey",null));
}
/**
* 解除
*/
@Test
public void unbinding(){
amqpAdmin.removeBinding(new Binding("myQueue",Binding.DestinationType.QUEUE,"myDirectExchange","myRoutingKey",null));
}