1. 程式人生 > >springboot 第十章 (springboot與訊息)

springboot 第十章 (springboot與訊息)

概述

1.大多應用中,可通過訊息服務中介軟體來提升系統非同步通訊、擴充套件解耦能力

1.訊息服務中兩個重要概念:

訊息代理(message broker目的地(destination

當訊息傳送者傳送訊息以後,將由訊息代理接管,訊息代理保證訊息傳遞到指定目的地。

3.訊息佇列主要有兩種形式的目的地

1.佇列queue:點對點訊息通訊(point-to-point

2.主題(topic:釋出(publish/訂閱(subscribe)訊息通訊

訊息佇列能幹什麼? 

非同步處理

非同步處理,使用者註冊,同時傳送簡訊和郵箱

傳統方式單執行緒, 呼叫完傳送註冊郵箱的方法呼叫傳送註冊簡訊, 在使用者多的情況下效果會卡頓,延遲高使用者體驗不好

我們可以通過多執行緒, 假設此案例開啟兩個執行緒一個用來發簡訊,一個用來註冊郵箱 ,這樣是減少時間,但是隨著使用者變多不是辦法,

我們可以把註冊資訊放入一個佇列, 然後兩個服務非同步的不斷處理列隊中的訊息, 這樣給使用者的反應會很快

佇列 : 先進先出, 棧:先進後出 , 

應用解耦

應用之間的呼叫,解耦使用過 webservice 效率低,  dubbo 這個效能高,但是隨著微服務的發展感覺快被淘汰了 ,

流量削峰

比如秒殺, 如果系統對每條資料都進行處理, 不崩潰是不可能的,除非你的網站沒人訪問,沒人來,

可以制100 個訊息位,可以的訊息進了前一百, 就給他儲存到列隊中,如果沒有進, 直接回應秒殺失敗,這個就很快樂, 如果進了,我們慢慢處理, 之前用redis 快取技術也是可以實現的

相關概念  

點對點

這個的點對點不是 A->B 的這種,一個點對應另一個點, a傳送一個b接受一個, 這裡的點對點是說, a傳送了會儲存在訊息列隊中,然後b可以來拿這個訊息處理,操作, 如果有一個c 也可以,但是拿走訊息之後, 訊息就被移除列隊

釋出訂閱式

這個好理解, 就是a釋出一個主題, 然後同時很多物件都可以接收到, 

JMS 

java訊息服務,基於jvm訊息佇列的規範,ActiveMQ,HornetHQ  都是HornetHQ的實現

AMQP

高階訊息佇列協議,也是一個訊息代理的規範,相容JMS

RabbitMQAMQP的實現

JMS

AMQP

定義

Java api

網路線級協議

跨語言

跨平臺

Model

提供兩種訊息模型:

1)、Peer-2-Peer

(2)、Pub/sub

提供了五種訊息模型:

1)、direct exchange

(2)、fanout exchange

(3)、topic change

(4)、headers exchange

(5)、system exchange

本質來講,後四種和JMSpub/sub模型沒有太大差別,僅是在路由機制上做了更詳細的劃分;

支援訊息型別

多種訊息型別:

TextMessage

MapMessage

BytesMessage

StreamMessage

ObjectMessage

Message (只有訊息頭和屬性)

byte[]

當實際應用時,有複雜的訊息,可以將訊息序列化後傳送。

綜合評價

JMS 定義了JAVA API層面的標準;在java體系中,多個client均可以通過JMS進行互動,不需要應用修改程式碼,但是其對跨平臺的支援較差;

AMQP定義了wire-level層的協議標準;天然具有跨平臺、跨語言特性。

springboot 支援

spring-jms提供了對JMS的支援

spring-rabbit提供了對AMQP的支援

需要ConnectionFactory的實現來連線訊息代理

提供JmsTemplateRabbitTemplate來發送訊息

@JmsListenerJMS)、@RabbitListenerAMQP)註解在方法上監聽訊息代理髮布的訊息

@EnableJms@EnableRabbit開啟支援

springboot 自動內建了自動配置類 

JmsAutoConfiguration

RabbitAutoConfiguration

RabbitMQ

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。

核心概念

Message

訊息,訊息是不具名的,它由訊息頭和訊息體組成。訊息體是不透明的,而訊息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其他訊息的優先權)、delivery-mode(指出該訊息可能需要永續性儲存)等。

Publisher

訊息的生產者,也是一個向交換器釋出訊息的客戶端應用程式

Exchange

交換器,用來接收生產者傳送的訊息並將這些訊息路由給伺服器中的佇列。(交換器通過傳送者的路由鍵,來區分訊息所在的位置)

Exchange4種類型:direct(預設)fanout, topic, headers,不同型別的Exchange轉發訊息的策略有所區別

direct  : 可以實現點對點

fanout topic  headers  : 實現釋出功能

Queue

訊息佇列,用來儲存訊息直到傳送給消費者。它是訊息的容器,也是訊息的終點。一個訊息可投入一個或多個佇列。訊息一直在佇列裡面,等待消費者連線到這個佇列將其取走。

Binding

繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,所以可以將交換器理解成一個由繫結構成的路由表。

Exchange Queue的繫結可以是多對多的關係。

Connection

網路連線,比如一個TCP連線。

Channel

通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬 TCP 都是非常昂貴的開銷,所以引入了通道的概念,以複用一條 TCP 連線

Consumer

訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。

Virtual Host

虛擬主機,表示一批交換器、訊息佇列和相關物件。虛擬主機是共享相同的身份認證和加密環境的獨立伺服器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 伺服器,擁有自己的佇列、交換器、繫結和許可權機制。vhost AMQP 概念的基礎,必須在連線時指定,RabbitMQ 預設的 vhost /

Broker

表示訊息佇列伺服器實體

Springboot整合RabbitMq

傳送訊息


	@Test
	public void contextLoads() {
		Map<String,String> data = new HashMap<>();

		data.put("你好呀","e你");
		rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",data);
	}

點對點

	//廣播的形式
	@Test
	public void fanout(){
		rabbitTemplate.convertAndSend("exchange.fanout",new Book("三國演義","大叔"));
	}

廣播

接受訊息

	@Test
	public void receive(){
		Object atguigu = rabbitTemplate.receiveAndConvert("gulixueyuan.news");
		System.out.println(atguigu.toString());
	}

監聽一個或者多個列隊獲取訊息

    @RabbitListener(queues = "atguigu.news")
    public void receive(Book book){
        System.out.print(book.getName());
    }

    @RabbitListener(queues = "atguigu.news")
    public void receive (Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());
    }

模式使用java 的序列化,我們可以自定義json

package com.zzq.springboot02rabbitmq.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class myrabbitConfig {
    @Bean
    public MessageConverter messageConverter(){
        return  new Jackson2JsonMessageConverter();
    }
}

AmqpAdmin

使用springboot 提供的amqpAdmin我們可以操作 rabbitmq 中的元件, 

@Autowired
	private AmqpAdmin amqpAdmin;

	@Test
	public void createQueue(){
		amqpAdmin.declareQueue(new org.springframework.amqp.core.Queue("myQueue"));
	}

	//建立點對點的轉換器
	@Test
	public void createExchanges(){
		amqpAdmin.declareExchange(new DirectExchange("myDirectExchange"));
	}

	//繫結自己建立的queue 和exchanges
	@Test
	public void bindingQueueExchanges(){
		/**
		 * 繫結的元件 ,被繫結的目標型別, 繫結的轉換器 , 繫結的路由鍵值,繫結引數
		 */
		amqpAdmin.declareBinding(new Binding("myQueue",Binding.DestinationType.QUEUE,"myDirectExchange","myRoutingKey",null));

	}

	/**
	 * 解除
	 */
	@Test
	public void unbinding(){
		amqpAdmin.removeBinding(new Binding("myQueue",Binding.DestinationType.QUEUE,"myDirectExchange","myRoutingKey",null));
	}