activemq 阻塞不消費_ActiveMQ嘔心瀝血之作啊,超級詳細
阿新 • • 發佈:2021-01-04
技術標籤:activemq 阻塞不消費
ActiveMQ 訊息中介軟體簡介
為什麼要用 MQ 訊息中介軟體
接下來舉個例子,我們以前上學時候有不會的問題都會去問老師,然後假如這時候有一百個學生問問題,那我們就得排隊等著,那一方面會浪費時間,另外一方面老師也不一定能處理完問題,那這時候老師看到這種情況就說:同學們你們把不會的問題總結出來由班長統一接收,然後再由班長交給我,這樣一來我們自己是不是不會浪費時間,另外一方面老師也可以有自己的時間處理自己的事情
MQ 訊息中介軟體能幹嘛?
削峰,非同步,解耦
通過訊息排隊和訊息傳遞模型在分散式下提供應用解耦,彈性伸縮,流量削峰,非同步通訊,資料同步等功能。
傳送者把訊息傳送給伺服器,訊息服務將訊息存放在如干個佇列和主題中,在合適的時候,訊息伺服器會將訊息轉發給接受者。在這個過程中傳送和接收是非同步的,無需等待,而且傳送者和接收者的生命週期也沒有必然關係。
ActiveMQ訊息中介軟體的簡單使用
新增相應的依賴架包
org.apache.activemqactivemq-all5.11.2org.springframeworkspring-core5.0.2.RELEASEorg.springframeworkspring-web5.0.2.RELEASEorg.springframeworkspring-oxm5.0.2.RELEASEorg.springframeworkspring-tx5.0.2.RELEASEorg.springframeworkspring-jdbc5.0.2.RELEASEorg.springframeworkspring-webmvc5.0.2.RELEASEorg.springframeworkspring-aop5.0.2.RELEASEorg.springframeworkspring-context-support5.0.2.RELEASEorg.springframeworkspring-test5.0.2.RELEASEorg.springframeworkspring-jms5.0.2.RELEASEjavax.jmsjavax.jms-api2.0.1org.apache.xbeanxbean-spring3.7junitjunit4.12
Spring整合ActiveMQ
JMS的編碼架構兩大模型的特性訊息生產者
packagecom.yang.activemq.queue;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
publicclassJmsProduce{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringQUEUE_NAME="quenue01";
publicstaticvoidmain(String[]args)throwsJMSException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Queuequeue=session.createQueue(QUEUE_NAME);
//建立消費者
MessageProducerproducer=session.createProducer(queue);
for(inti=1;i<=3;i++){
//建立訊息傳送到佇列裡面
TextMessagetextMessage=session.createTextMessage("msg---->"+i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("訊息傳送完成");
}
}
訊息消費者
packagecom.yang.activemq.queue;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
importjava.io.IOException;
publicclassJmsConsumer{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringQUEUE_NAME="quenue01";
publicstaticvoidmain(String[]args)throwsJMSException,IOException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Queuequeue=session.createQueue(QUEUE_NAME);
//建立消費者
MessageConsumerconsumer=session.createConsumer(queue);
//同步阻塞
//while(true){
//TextMessagemessage=(TextMessage)consumer.receive();
//if(message!=null){
//System.out.println("接收到的訊息是"+message.getText());
//}else{
//break;
//}
//}
//consumer.close();
//session.close();
//connection.close();
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
if(null!=message&&messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("訊息內容---->"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
TOPIC
主題topic特點 生產者將訊息釋出到topic中每個訊息有多個消費者,生產者和消費者之間有時間上的相關性。
生產者生產時,topic不儲存訊息,它是無狀態的不落地,假如沒有人訂閱就是廢訊息
packagecom.yang.activemq.queue;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
publicclassJmsProduce{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringTOPIC_NAME="topic01";
publicstaticvoidmain(String[]args)throwsJMSException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Topictopic=(Topic)session.createTopic(TOPIC_NAME);
//建立消費者
MessageProducerproducer=session.createProducer(topic);
for(inti=1;i<=3;i++){
//建立訊息傳送到佇列裡面
TextMessagetextMessage=session.createTextMessage("msg---->"+i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("訊息傳送完成");
}
}
消費者
packagecom.yang.activemq.queue;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
importjava.io.IOException;
publicclassJmsConsumer{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringTOPIC_NAME="topic01";
publicstaticvoidmain(String[]args)throwsJMSException,IOException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Topicqueue=session.createTopic(TOPIC_NAME);
//建立消費者
MessageConsumerconsumer=session.createConsumer(queue);
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
if(null!=message&&messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("訊息內容---->"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
}
}
TOPIC和QUEUE的區別
比較專案 | TOPIC模式佇列 | QUEUE模式佇列 |
---|---|---|
工作模式 | 訂閱-釋出模式如果當前沒有訂閱者會丟棄,如果有訂閱者,那麼這些訂閱者會受到訊息 | 負載均衡模式如果當前沒有消費者模式,訊息不會丟棄,如果有多個消費者,那麼一條訊息也會發送給其中一個消費者,並且要求消費者ACK訊息 |
有無狀態 | 無狀態 | QUEUE資料預設會在mq伺服器上以檔案形式儲存 |
傳遞完整性 | 如果沒有訂閱者訊息會丟棄 | 訊息不會丟棄 |
處理效率 | 由於訊息要按照訂閱者的數量進行復制,所以處理效能會隨著訂閱者的增加而明顯 | 由於一條訊息只發送給一個消費者,所以就算消費者再多,效能也不會明顯降低 |
Spring整合ActtiveMQ
applicationContext-consumer.xml
配置檔案
<?xml version="1.0"encoding="UTF-8"?>
"http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:amp="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms.xsd">id="connectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"
/>"cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">"targetConnectionFactory"ref="connectionFactory"/>"sessionCacheSize"value="5"/>package="com.yang.springcloud"/>"cachingConnectionFactory"destination-type="queue">"spring_queue"ref="queueListener"/>"cachingConnectionFactory"destination-type="topic">"spring_topic"ref="topicListener"/>
applicationContext-producer.xml
配置檔案
<?xml version="1.0"encoding="UTF-8"?>
"http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:amp="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core.xsd">id="connetionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"
/>"cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">"targetConnectionFactory"ref="connetionFactory"/>"sessionCacheSize"value="5"/>"jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">"connectionFactory"ref="cachingConnectionFactory"/>"pubSubDomain"value="false"/>"jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">"connectionFactory"ref="cachingConnectionFactory"/>"pubSubDomain"value="true"/>
訊息佇列模型的程式碼:
packagecom.yang.springcloud;
importorg.springframework.stereotype.Component;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;
/**
*點對點
*/
@Component//放入SpringIIOC容器,名稱queueListener
publicclassQueueListenerimplementsMessageListener{
//用於接收訊息
@Override
publicvoidonMessage(Messagemessage){
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("queue介面訊息:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
}
訂閱-釋出模型的程式碼:
packagecom.yang.springcloud;
importorg.springframework.stereotype.Component;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;
/**
*釋出訂閱
*/
@Component//放入SpringIIOC容器,名稱queueListener
publicclassTopicListenerimplementsMessageListener{
@Override
publicvoidonMessage(Messagemessage){
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("topic介面訊息:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
}
演示Spring整合ActiveMQ具體程式碼:
packagecom.yang.springcloud.producter;
importorg.junit.Test;
importorg.junit.runner.RunWith;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.jms.core.JmsTemplate;
importorg.springframework.jms.core.MessageCreator;
importorg.springframework.test.context.ContextConfiguration;
importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.Session;
importjavax.jms.TextMessage;
/**
*演示Spring與ActiveMQ整合
*/
@RunWith(SpringJUnit4ClassRunner.class)//junit與spring整合
@ContextConfiguration("classpath:applicationContext-producer.xml")//載入spring配置檔案publicclassSpringProducer{
//點對點模式
@Autowired
@Qualifier("jmsQueueTemplate")
privateJmsTemplatejmsQueueTemplate;
//釋出訂閱模式
@Autowired
@Qualifier("jmsTopicTemplate")
privateJmsTemplatejmsTopicTemplate;
/**
*點對點發送
*/
@Test
publicvoidptpSender(){
/**
*引數一:指定佇列的名稱
*引數二:MessageCreator介面,我們需要提供該介面的匿名內部實現
*/
jmsQueueTemplate.send("spring_queue",newMessageCreator(){
//我們只需要返回傳送的訊息內容即可
@Override
publicMessagecreateMessage(Sessionsession)throwsJMSException{
//建立文字訊息
TextMessagetextMessage=session.createTextMessage("springtestmessage");
returntextMessage;
}
});
System.out.println("訊息傳送已完成");
}
/**
*釋出訂閱傳送
*/
@Test
publicvoidpsSender(){
jmsTopicTemplate.send("spring_topic",newMessageCreator(){
@Override
publicMessagecreateMessage(Sessionsession)throwsJMSException{
//建立文字訊息
TextMessagetextMessage=session.createTextMessage("springtestmessage--topic");
returntextMessage;
}
});
System.out.println("訊息傳送已完成");
}
}
用於消費方啟動監聽類
packagecom.yang.springcloud.consumer;
importorg.springframework.context.support.ClassPathXmlApplicationContext;
importjava.io.IOException;
/**
*用於啟動消費方監聽
*/
publicclassSpringConsuer{
publicstaticvoidmain(String[]args)throwsIOException{
//1.載入spring配置
ClassPathXmlApplicationContext
cxt=newClassPathXmlApplicationContext("classpath:applicationContext-consumer.xml");
//2.啟動
cxt.start();
//3.阻塞方法,讓程式一直處於等待狀態
System.in.read();
}
}
SpringBoot整合ActiveMQ
匯入所需依賴
org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-activemqorg.springframework.bootspring-boot-starter-testtest
SpringBoot整合ActiveMQ生產者
程式碼示例:
application.yml
配置檔案
server:
port:9001#埠
spring:
application:
name:activemq-producer#服務名稱
#springboot與activemq整合配置
activemq:
broker-url:tcp://127.0.0.1:61616#連線地址
user:admin#activemq使用者名稱
password:admin#activemq密碼
#指定傳送模式(點對點false,topic釋出訂閱true)
jms:
pub-sub-domain:true
#自己定義目標名稱(佇列或主題)
activemq:
name:springboot_topic
生產者的程式碼示例:
packagecom.yang.springcloud;
importcom.yang.producter.ProducerApplication;
importorg.junit.Test;
importorg.junit.runner.RunWith;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.boot.test.context.SpringBootTest;
importorg.springframework.jms.core.JmsMessagingTemplate;
importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes=ProducerApplication.class)publicclassSpringBootProducer{
//JmsMessagingTemplate:用於工具類傳送訊息
@Autowired
privateJmsMessagingTemplatejmsMessagingTemplate;
@Value("${activemq.name}")
privateStringname;
@Test
publicvoidptpSender(){
/**
*引數一:佇列的名稱或主題名稱
*引數二:訊息內容
*/
jmsMessagingTemplate.convertAndSend(name,"springbootmessage--topic,我收到了");
System.out.println("完成了");
}
}
啟動類
packagecom.yang.producter;
importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;
/***生產者啟動類*/
@SpringBootApplication
publicclassProducerApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(ProducerApplication.class,args);
}
}
SpringBoot整合ActiveMQ消費者
具體程式碼示例:
application.yml配置檔案
server:
port:9002#埠
spring:
application:
name:activemq-consumer#服務名稱
#springboot與activemq整合配置
activemq:
broker-url:tcp://127.0.0.1:61616#連線地址
user:admin#activemq使用者名稱
password:admin#activemq密碼
#指定傳送模式(點對點false,釋出訂閱true)
jms:
pub-sub-domain:true
activemq:
name:springboot_topic
消費者類
packagecom.yang.consumer.listener;
importorg.springframework.jms.annotation.JmsListener;
importorg.springframework.stereotype.Component;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.TextMessage;
/**
*用於監聽訊息類(既可以用於佇列的監聽,也可以用於主題監聽)
*/
@Component//放入IOC容器
publicclassMsgListener{
/**
*用於接收訊息的方法
*destination:佇列的名稱或主題的名稱
*/
@JmsListener(destination="${activemq.name}")
publicvoidreceiveMessage(Messagemessage){
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("接收訊息:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
}
消費者啟動類
packagecom.yang.consumer;
importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;
/**
*訊息消費者啟動類
*/
@SpringBootApplication
publicclassConsumerApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(ConsumerApplication.class,args);
}
}
END結束
●前端必須要掌握的優秀框架React
●
百分之八十都懂得RabbitMQ,三萬字一篇詳細給你搞定
●
Dubbo專欄:SpringBoot整合Dubbo服務降級與豪豬斷路器二
●Dubbo專欄:SpringBoot整合Dubbo服務降級與豪豬斷路器一
●Dubbo專欄:Dubbo分散式系統入門
喜歡的可以收藏下和點點關注,分享出去