1. 程式人生 > 實用技巧 >springboot--ActiveMQ--訊息佇列

springboot--ActiveMQ--訊息佇列

ActiveMQ遠端訊息佇列

一、我們建立springboot專案工程

二、建立完畢我們簡單介紹 activeMQ

1.概述

訊息中介軟體可以理解成就是一個服務軟體,儲存資訊的容器,比如生活中的快遞雲櫃.
我們把資料放到訊息中介軟體當中, 然後通知對應的服務進行獲取
訊息中介軟體是在訊息的傳輸過程中儲存資訊的容器

2.訊息中介軟體應用場景

1. 使用訊息伺服器當做大的佇列使用, 先進先出, 來處理高併發寫入操作
2. 使用訊息伺服器可以將業務系統的序列執行改為並行執行, 處理效率高, 更合理的榨取伺服器的效能.

3.同步與非同步技術

同步技術
	dubbo是一中同步技術, 實時性高, controller呼叫service專案, 呼叫就執行, 
	如果service專案中的程式碼沒有執行完, controller裡面的程式碼一致等待結果.
非同步技術
	mq訊息中介軟體技術(jms) 是一種非同步技術, 訊息傳送方, 將訊息傳送給訊息伺服器, 
	訊息伺服器未必立即處理.什麼時候去處理, 主要看訊息伺服器是否繁忙, 
	訊息進入伺服器後會進入佇列中, 先進先出.實時性不高.

三、JMS 介紹

概述:

jms的全稱叫做Java message service (Java訊息服務) jms是jdk底層定義的規範
各大廠商都是實現這個規範的技術

jms訊息伺服器同類型技術

1.ActiveMQ
	是apache的一個比較老牌的訊息中介軟體, 它比較均衡, 既不是最安全的, 也不是最快的.
2.RabbitMQ
	是阿里巴巴的一個訊息中介軟體, 更適合金融類業務, 它對資料的安全性比較高.能夠保證資料不丟失.
3.Kafka
	Apache下的一個子專案。特點:高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;適合處理海量資料。

JMS中支援的訊息型別:

TextMessage
	一個字串物件
MapMessage
	key-value
ObjectMessage
	一個序列化的 Java 物件
BytesMessage
	一個位元組的資料流
StreamMessage
	Java 原始值的資料流

JMS中的兩種傳送模式

1.點對點模式
	一個傳送方, 一個接收方. 也可以多個傳送方, 一個接收方, 主要是接收方必須是第一個.
	示例圖
		
2.訂閱釋出模式
	一個傳送方, 多個接收方.	傳送方也可以是多個, 主要看接收方, 接收方必須是多個
	示例圖

ActiveMQ伺服器安裝

官方網站: http:activemq.apache.org/

四、專案建立完畢後引入依賴

 
      
   
            org.springframework.boot
            spring-boot-starter-activemq
        
        
        
            org.apache.activemq
activemq-pool 5.15.0
org.springframework.boot spring-boot-starter-web

寫入yml配置

server:
  port: 8080
spring:
  activemq:
    broker-url: tcp://www.yangbuyi.top:61616 # 可以使用我的伺服器mq
    user: admin
    password: admin
    close-timeout: 15s   # 在考慮結束之前等待的時間
    in-memory: true      # 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。
    non-blocking-redelivery: false  # 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。
    send-timeout: 0     # 等待訊息傳送響應的時間。設定為0等待永遠。
    queue-name: active.queue  # 點對點服務名稱  可修改
    topic-name: active.topic.name.model # 訂閱服務名稱 可修改
    packages:
      trust-all: true   # 配置信任 反序列化物件要用到的

#  packages:
#    trust-all: true #不配置此項,會報錯
pool:
  enabled: true
  max-connections: 10   #連線池最大連線數
  idle-timeout: 30000   #空閒的連線過期時間,預設為30秒

訊息配置類建立

package top.yangbuyi.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

/**
 * @description: 楊不易網站:www.yangbuyi.top
 * @program: yangbuyi_mq
 * @ClassName: BeanConfig
 * @create: 2020-07-30 09:18
 * @author: yangbuyi
 * @since: JDK1.8
 * @BeanConfig:
 **/

@Configuration
public class BeanConfig {

	  @Value("${spring.activemq.broker-url}")
	  private String brokerUrl;

	  @Value("${spring.activemq.user}")
	  private String username;

	  @Value("${spring.activemq.topic-name}")
	  private String password;

	  @Value("${spring.activemq.queue-name}")
	  private String queueName;

	  @Value("${spring.activemq.topic-name}")
	  private String topicName;

	  @Bean(name = "queue")
	  public Queue queue() {
			return new ActiveMQQueue(queueName);
	  }

	  @Bean(name = "topic")
	  public Topic topic() {
			return new ActiveMQTopic(topicName);
	  }

	  /**
	  * @Description: 楊不易個人網址:http://yangbuyi.top
	  * 功能描述:	 初始化工廠
	  * @Param:
	  * @return:
	  * @Author: TeouBle
	  * @Date: 2020/7/30 10:24
	  */
	  @Bean
	  public ConnectionFactory connectionFactory() {
			ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
			/* 配置信任 反序列化 */
			activeMQConnectionFactory.setTrustAllPackages(true);
			return activeMQConnectionFactory;
	  }

	  /**
	  * @Description: 楊不易個人網址:http://yangbuyi.top
	  * 功能描述: 	JMS 模板 Java訊息服務
	  * @Param:
	  * @return:
	  * @Author: TeouBle
	  * @Date: 2020/7/30 10:25
	  */
	  @Bean
	  public JmsMessagingTemplate jmsMessageTemplate() {
			return new JmsMessagingTemplate(connectionFactory());
	  }

	  /**
	  * @Description: 楊不易個人網址:http://yangbuyi.top
	  * 功能描述:	在Queue模式中,對訊息的監聽需要對containerFactory進行配置
	  * @Param: 
	  * @return: 
	  * @Author: TeouBle
	  * @Date: 2020/7/30 10:25
	  */
	  @Bean("queueListener")
	  public JmsListenerContainerFactory queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
			SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
			factory.setConnectionFactory(connectionFactory);
			factory.setPubSubDomain(false);
			return factory;
	  }

	  /**
	  * @Description: 楊不易個人網址:http://yangbuyi.top
	  * 功能描述:在Topic模式中,對訊息的監聽需要對containerFactory進行配置
	  * @Param: 
	  * @return: 
	  * @Author: TeouBle
	  * @Date: 2020/7/30 10:25
	  */
	  @Bean("topicListener")
	  public JmsListenerContainerFactory topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
			SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
			factory.setConnectionFactory(connectionFactory);
			factory.setPubSubDomain(true);
			return factory;
	  }
}

建立訊息監聽類

QueueConsumerListener 點對點模式 消費者 接收
package top.yangbuyi.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @description: 楊不易網站:www.yangbuyi.top
 * @program: yangbuyi_mq
 * @ClassName: QueueConsumerListener
 * @create: 2020-07-30 09:21
 * @author: yangbuyi
 * @since: JDK1.8
 * @QueueConsumerListener:
 **/
@Component
@Slf4j
public class QueueConsumerListener {
	  @Value("${spring.activemq.queue-name}")
	  private String queueName;

	  //queue模式的消費者
	  @JmsListener(destination = "${spring.activemq.queue-name}", containerFactory = "queueListener")
	  public void readActiveQueue(String message) {
			System.out.println("queue接受到:" + message);
			System.out.println("點對點服務名稱:"+ queueName);
			System.out.println("點對點監聽成功");
	  }

}

TopicConsumerListener 訂閱模式 消費者 接收
package top.yangbuyi.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.catalina.User;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.configurationprocessor.json.JSONObject;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import top.yangbuyi.controller.user;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
 * @description: 楊不易網站:www.yangbuyi.top
 * @program: yangbuyi_mq
 * @ClassName: TopicConsumerListener
 * @create: 2020-07-30 09:23
 * @author: yangbuyi
 * @since: JDK1.8
 * @TopicConsumerListener:
 **/

@Component
public class TopicConsumerListener {
	  @Value("${spring.activemq.topic-name}")
	  private String topicName;

	  //topic模式的消費者
	  @JmsListener(destination = "${spring.activemq.topic-name}", containerFactory = "topicListener")
	  public void readActiveQueue(Message message) {
			ObjectMessage u = (ObjectMessage) message;
			try {
				  user object = (user) u.getObject();
				  System.out.println(object.getName());
				  System.out.println(object.getAge());
			} catch (JMSException e) {
				  e.printStackTrace();
			}
			System.out.println();
			System.out.println("topic接受到:" + message);
			System.out.println("訂閱服務名稱:" + topicName);
	  }
}

建立一個小物件用來測試傳遞物件


package top.yangbuyi.controller;


import java.io.Serializable;

/**
 * @description: 楊不易網站:www.yangbuyi.top
 * @program: yangbuyi_mq
 * @ClassName: user
 * @create: 2020-07-30 10:03
 * @author: yangbuyi
 * @since: JDK1.8
 * @user:
 **/


public class user implements Serializable {
	private String name;
	private Integer age;

	  public user(String name, Integer age) {
			this.name = name;
			this.age = age;
	  }

	  public String getName() {
			return name;
	  }

	  public void setName(String name) {
			this.name = name;
	  }

	  public Integer getAge() {
			return age;
	  }

	  public void setAge(Integer age) {
			this.age = age;
	  }
}

建立controller

package top.yangbuyi.controller;

import javafx.geometry.Pos;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.*;

import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;
import java.util.ArrayList;

/**
 * @description: 楊不易網站:www.yangbuyi.top
 * @program: yangbuyi_mq
 * @ClassName: ProducerController
 * @create: 2020-07-30 09:20
 * @author: yangbuyi
 * @since: JDK1.8
 * @ProducerController:  生產者
 **/

@RestController
@Slf4j
public class ProducerController {
	  @Autowired
	  private JmsMessagingTemplate jmsMessagingTemplate;

	  @Autowired
	  private Queue queue;

	  @Autowired
	  private Topic topic;

	  @RequestMapping(value ="queue/test",method = RequestMethod.POST)
	  public String sendQueue(@RequestBody String str) {
			this.sendMessage(this.queue, str);
			return "success---點對點發送成功";
	  }

	  @RequestMapping(value = "topic/test",method = RequestMethod.POST)
	  public String sendTopic(@RequestBody String str) {
			this.sendMessageObject(this.topic, new user(str,10));
			return "success---訂閱傳送成功";
	  }


	  // 傳送訊息,destination是傳送到的佇列,message是待發送的訊息
	  // 生產者
	  private void sendMessageObject(Destination destination, final Object message){
			jmsMessagingTemplate.convertAndSend(destination, message);

	  }

	  // 傳送訊息,destination是傳送到的佇列,message是待發送的訊息
	  // 生產者
	  private void sendMessage(Destination destination, final String message){
			jmsMessagingTemplate.convertAndSend(destination, message);
	  }


}

五、進行postman傳送請求

訂閱服務

點對點服務

到此activemq就講到這裡了 還有蠻多可以自己去 學習學習!!!

你的壓力來源於無法自律,只是假裝努力,現狀跟不上內心慾望,所以你焦慮又恐慌。——楊不易