[置頂] springboot--ActiveMQ--訊息佇列
阿新 • • 發佈:2020-08-18
ActiveMQ遠端訊息佇列
一、我們建立springboot專案工程
二、建立完畢我們簡單介紹 activeMQ
1.概述
訊息中介軟體可以理解成就是一個服務軟體,儲存資訊的容器,比如生活中的快遞雲櫃.我們把資料放到訊息中介軟體當中, 然後通知對應的服務進行獲取訊息中介軟體是在訊息的傳輸過程中儲存資訊的容器
2.訊息中介軟體應用場景
1. 使用訊息伺服器當做大的佇列使用, 先進先出, 來處理高併發寫入操作2. 使用訊息伺服器可以將業務系統的序列執行改為並行執行, 處理效率高, 更合理的榨取伺服器的效能.
3.同步與非同步技術
同步技術dubbo是一中同步技術, 實時性高, controller呼叫service專案, 呼叫就執行, 如果service專案中的程式碼沒有執行完, controller裡面的程式碼一致等待結果.非同步技術mq訊息中介軟體技術(jms) 是一種非同步技術, 訊息傳送方, 將訊息傳送給訊息伺服器, 訊息伺服器未必立即處理.什麼時候去處理, 主要看訊息伺服器是否繁忙, 訊息進入伺服器後會進入佇列中, 先進先出.實時性不高.
三、JMS 介紹
概述:
jms的全稱叫做Java message service (Java訊息服務) jms是jdk底層定義的規範各大廠商都是實現這個規範的技術
jms訊息伺服器同類型技術
1.ActiveMQ是apache的一個比較老牌的訊息中介軟體, 它比較均衡, 既不是最安全的, 也不是最快的. 2.RabbitMQ是阿里巴巴的一個訊息中介軟體, 更適合金融類業務, 它對資料的安全性比較高.能夠保證資料不丟失.3.KafkaApache下的一個子專案。特點:高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;適合處理海量資料。
JMS中支援的訊息型別:
TextMessage一個字串物件MapMessagekey-valueObjectMessage一個序列化的 Java 物件BytesMessage一個位元組的資料流StreamMessageJava 原始值的資料流
JMS中的兩種傳送模式
1.點對點模式一個傳送方, 一個接收方. 也可以多個傳送方, 一個接收方, 主要是接收方必須是第一個. 示例圖2.訂閱釋出模式一個傳送方, 多個接收方.傳送方也可以是多個, 主要看接收方, 接收方必須是多個示例圖
ActiveMQ伺服器安裝
官方網站: http:activemq.apache.org/
四、專案建立完畢後引入依賴
org.springframework.boot spring-boot-starter-activemq org.apache.activemq activemq-pool 5.15.0 org.springframework.boot spring-boot-starter-web
寫入yml配置
server: port: 8080spring: activemq: broker-url: tcp://www.yangbuyi.top:61616 # 可以使用我的伺服器mq user: admin password: admin close-timeout: 15s # 在考慮結束之前等待的時間 in-memory: true # 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。 non-blocking-redelivery: false # 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。 send-timeout: 0 # 等待訊息傳送響應的時間。設定為0等待永遠。 queue-name: active.queue # 點對點服務名稱 可修改 topic-name: active.topic.name.model # 訂閱服務名稱 可修改 packages: trust-all: true # 配置信任 反序列化物件要用到的# packages:# trust-all: true #不配置此項,會報錯pool: enabled: true max-connections: 10 #連線池最大連線數 idle-timeout: 30000 #空閒的連線過期時間,預設為30秒
訊息配置類建立
package top.yangbuyi.config;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQQueue;import org.apache.activemq.command.ActiveMQTopic;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jms.config.JmsListenerContainerFactory;import org.springframework.jms.config.SimpleJmsListenerContainerFactory;import org.springframework.jms.core.JmsMessagingTemplate;import javax.jms.ConnectionFactory;import javax.jms.Queue;import javax.jms.Topic;/** * @description: 楊不易網站:www.yangbuyi.top * @program: yangbuyi_mq * @ClassName: BeanConfig * @create: 2020-07-30 09:18 * @author: yangbuyi * @since: JDK1.8 * @BeanConfig: **/@Configurationpublic class BeanConfig { @Value('${spring.activemq.broker-url}') private String brokerUrl; @Value('${spring.activemq.user}') private String username; @Value('${spring.activemq.topic-name}') private String password; @Value('${spring.activemq.queue-name}') private String queueName; @Value('${spring.activemq.topic-name}') private String topicName; @Bean(name = 'queue') public Queue queue() {return new ActiveMQQueue(queueName); } @Bean(name = 'topic') public Topic topic() {return new ActiveMQTopic(topicName); } /** * @Description: 楊不易個人網址:http://yangbuyi.top * 功能描述: 初始化工廠 * @Param: * @return: * @Author: TeouBle * @Date: 2020/7/30 10:24 */ @Bean public ConnectionFactory connectionFactory() {ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);/* 配置信任 反序列化 */activeMQConnectionFactory.setTrustAllPackages(true);return activeMQConnectionFactory; } /** * @Description: 楊不易個人網址:http://yangbuyi.top * 功能描述: JMS 模板 Java訊息服務 * @Param: * @return: * @Author: TeouBle * @Date: 2020/7/30 10:25 */ @Bean public JmsMessagingTemplate jmsMessageTemplate() {return new JmsMessagingTemplate(connectionFactory()); } /** * @Description: 楊不易個人網址:http://yangbuyi.top * 功能描述:在Queue模式中,對訊息的監聽需要對containerFactory進行配置 * @Param: * @return: * @Author: TeouBle * @Date: 2020/7/30 10:25 */ @Bean('queueListener') public JmsListenerContainerFactory queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);return factory; } /** * @Description: 楊不易個人網址:http://yangbuyi.top * 功能描述:在Topic模式中,對訊息的監聽需要對containerFactory進行配置 * @Param: * @return: * @Author: TeouBle * @Date: 2020/7/30 10:25 */ @Bean('topicListener') public JmsListenerContainerFactory topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(true);return factory; }}
建立訊息監聽類
QueueConsumerListener 點對點模式 消費者 接收
package top.yangbuyi.listener;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Value;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/** * @description: 楊不易網站:www.yangbuyi.top * @program: yangbuyi_mq * @ClassName: QueueConsumerListener * @create: 2020-07-30 09:21 * @author: yangbuyi * @since: JDK1.8 * @QueueConsumerListener: **/@Component@Slf4jpublic class QueueConsumerListener { @Value('${spring.activemq.queue-name}') private String queueName; //queue模式的消費者 @JmsListener(destination = '${spring.activemq.queue-name}', containerFactory = 'queueListener') public void readActiveQueue(String message) {System.out.println('queue接受到:' + message);System.out.println('點對點服務名稱:'+ queueName);System.out.println('點對點監聽成功'); }}
TopicConsumerListener 訂閱模式 消費者 接收
package top.yangbuyi.listener;import lombok.extern.slf4j.Slf4j;import org.apache.catalina.User;import org.springframework.beans.factory.annotation.Value;import org.springframework.boot.configurationprocessor.json.JSONObject;import org.springframework.jms.annotation.JmsListener;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import top.yangbuyi.controller.user;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.ObjectMessage;import java.io.Serializable;import java.util.ArrayList;import java.util.List;/** * @description: 楊不易網站:www.yangbuyi.top * @program: yangbuyi_mq * @ClassName: TopicConsumerListener * @create: 2020-07-30 09:23 * @author: yangbuyi * @since: JDK1.8 * @TopicConsumerListener: **/@Componentpublic class TopicConsumerListener { @Value('${spring.activemq.topic-name}') private String topicName; //topic模式的消費者 @JmsListener(destination = '${spring.activemq.topic-name}', containerFactory = 'topicListener') public void readActiveQueue(Message message) {ObjectMessage u = (ObjectMessage) message;try { user object = (user) u.getObject(); System.out.println(object.getName()); System.out.println(object.getAge());} catch (JMSException e) { e.printStackTrace();}System.out.println();System.out.println('topic接受到:' + message);System.out.println('訂閱服務名稱:' + topicName); }}
建立一個小物件用來測試傳遞物件
package top.yangbuyi.controller;import java.io.Serializable;/** * @description: 楊不易網站:www.yangbuyi.top * @program: yangbuyi_mq * @ClassName: user * @create: 2020-07-30 10:03 * @author: yangbuyi * @since: JDK1.8 * @user: **/public class user implements Serializable {private String name;private Integer age; public user(String name, Integer age) {this.name = name;this.age = age; } public String getName() {return name; } public void setName(String name) {this.name = name; } public Integer getAge() {return age; } public void setAge(Integer age) {this.age = age; }}
建立controller
package top.yangbuyi.controller;import javafx.geometry.Pos;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.http.MediaType;import org.springframework.jms.core.JmsMessagingTemplate;import org.springframework.web.bind.annotation.*;import javax.jms.Destination;import javax.jms.Queue;import javax.jms.Topic;import java.util.ArrayList;/** * @description: 楊不易網站:www.yangbuyi.top * @program: yangbuyi_mq * @ClassName: ProducerController * @create: 2020-07-30 09:20 * @author: yangbuyi * @since: JDK1.8 * @ProducerController: 生產者 **/@RestController@Slf4jpublic class ProducerController { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue queue; @Autowired private Topic topic; @RequestMapping(value ='queue/test',method = RequestMethod.POST) public String sendQueue(@RequestBody String str) {this.sendMessage(this.queue, str);return 'success---點對點發送成功'; } @RequestMapping(value = 'topic/test',method = RequestMethod.POST) public String sendTopic(@RequestBody String str) {this.sendMessageObject(this.topic, new user(str,10));return 'success---訂閱傳送成功'; } // 傳送訊息,destination是傳送到的佇列,message是待發送的訊息 // 生產者 private void sendMessageObject(Destination destination, final Object message){jmsMessagingTemplate.convertAndSend(destination, message); } // 傳送訊息,destination是傳送到的佇列,message是待發送的訊息 // 生產者 private void sendMessage(Destination destination, final String message){jmsMessagingTemplate.convertAndSend(destination, message); }}
五、進行postman傳送請求
訂閱服務
點對點服務
到此activemq就講到這裡了 還有蠻多可以自己去 學習學習!!!
你的壓力來源於無法自律,只是假裝努力,現狀跟不上內心慾望,所以你焦慮又恐慌。——楊不易