1. 程式人生 > 其它 >activemq 阻塞不消費_ActiveMQ嘔心瀝血之作啊,超級詳細

activemq 阻塞不消費_ActiveMQ嘔心瀝血之作啊,超級詳細

技術標籤:activemq 阻塞不消費

ActiveMQ 訊息中介軟體簡介

為什麼要用 MQ 訊息中介軟體

接下來舉個例子,我們以前上學時候有不會的問題都會去問老師,然後假如這時候有一百個學生問問題,那我們就得排隊等著,那一方面會浪費時間,另外一方面老師也不一定能處理完問題,那這時候老師看到這種情況就說:同學們你們把不會的問題總結出來由班長統一接收,然後再由班長交給我,這樣一來我們自己是不是不會浪費時間,另外一方面老師也可以有自己的時間處理自己的事情

MQ 訊息中介軟體能幹嘛?

削峰,非同步,解耦

通過訊息排隊和訊息傳遞模型在分散式下提供應用解耦,彈性伸縮,流量削峰,非同步通訊,資料同步等功能。

傳送者把訊息傳送給伺服器,訊息服務將訊息存放在如干個佇列和主題中,在合適的時候,訊息伺服器會將訊息轉發給接受者。在這個過程中傳送和接收是非同步的,無需等待,而且傳送者和接收者的生命週期也沒有必然關係。

ActiveMQ訊息中介軟體的簡單使用

新增相應的依賴架包

org.apache.activemqactivemq-all5.11.2org.springframeworkspring-core5.0.2.RELEASEorg.springframeworkspring-web5.0.2.RELEASEorg.springframeworkspring-oxm5.0.2.RELEASEorg.springframeworkspring-tx5.0.2.RELEASEorg.springframeworkspring-jdbc5.0.2.RELEASEorg.springframeworkspring-webmvc5.0.2.RELEASEorg.springframeworkspring-aop5.0.2.RELEASEorg.springframeworkspring-context-support5.0.2.RELEASEorg.springframeworkspring-test5.0.2.RELEASEorg.springframeworkspring-jms5.0.2.RELEASEjavax.jmsjavax.jms-api2.0.1org.apache.xbeanxbean-spring3.7junitjunit4.12

Spring整合ActiveMQ

JMS的編碼架構兩大模型的特性訊息生產者

packagecom.yang.activemq.queue;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;

publicclassJmsProduce{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringQUEUE_NAME="quenue01";
publicstaticvoidmain(String[]args)throwsJMSException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Queuequeue=session.createQueue(QUEUE_NAME);
//建立消費者
MessageProducerproducer=session.createProducer(queue);
for(inti=1;i<=3;i++){
//建立訊息傳送到佇列裡面
TextMessagetextMessage=session.createTextMessage("msg---->"+i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("訊息傳送完成");

}
}

訊息消費者

packagecom.yang.activemq.queue;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;
importjava.io.IOException;

publicclassJmsConsumer{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringQUEUE_NAME="quenue01";
publicstaticvoidmain(String[]args)throwsJMSException,IOException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Queuequeue=session.createQueue(QUEUE_NAME);
//建立消費者
MessageConsumerconsumer=session.createConsumer(queue);
//同步阻塞
//while(true){
//TextMessagemessage=(TextMessage)consumer.receive();
//if(message!=null){
//System.out.println("接收到的訊息是"+message.getText());
//}else{
//break;
//}
//}
//consumer.close();
//session.close();
//connection.close();
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
if(null!=message&&messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("訊息內容---->"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
}
}

TOPIC

主題topic特點 生產者將訊息釋出到topic中每個訊息有多個消費者,生產者和消費者之間有時間上的相關性。

生產者生產時,topic不儲存訊息,它是無狀態的不落地,假如沒有人訂閱就是廢訊息

packagecom.yang.activemq.queue;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;

publicclassJmsProduce{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringTOPIC_NAME="topic01";
publicstaticvoidmain(String[]args)throwsJMSException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Topictopic=(Topic)session.createTopic(TOPIC_NAME);
//建立消費者
MessageProducerproducer=session.createProducer(topic);
for(inti=1;i<=3;i++){
//建立訊息傳送到佇列裡面
TextMessagetextMessage=session.createTextMessage("msg---->"+i);
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("訊息傳送完成");

}
}

消費者

packagecom.yang.activemq.queue;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;
importjava.io.IOException;

publicclassJmsConsumer{
publicstaticfinalStringACTIVEMQ_URL="tcp://127.0.0.1:61616";
publicstaticfinalStringTOPIC_NAME="topic01";
publicstaticvoidmain(String[]args)throwsJMSException,IOException{
//建立連線工廠
ActiveMQConnectionFactoryconnectionFactory=newActiveMQConnectionFactory();
//連線工廠
Connectionconnection=connectionFactory.createConnection();
connection.start();
//建立回話
//第一個事務,第二個簽收
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//Destinationdestination=session.createQueue(QUEUE_NAME);
//建立目的地
Topicqueue=session.createTopic(TOPIC_NAME);
//建立消費者
MessageConsumerconsumer=session.createConsumer(queue);
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
if(null!=message&&messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("訊息內容---->"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
}
}

TOPIC和QUEUE的區別

比較專案TOPIC模式佇列QUEUE模式佇列
工作模式訂閱-釋出模式如果當前沒有訂閱者會丟棄,如果有訂閱者,那麼這些訂閱者會受到訊息負載均衡模式如果當前沒有消費者模式,訊息不會丟棄,如果有多個消費者,那麼一條訊息也會發送給其中一個消費者,並且要求消費者ACK訊息
有無狀態無狀態QUEUE資料預設會在mq伺服器上以檔案形式儲存
傳遞完整性如果沒有訂閱者訊息會丟棄訊息不會丟棄
處理效率由於訊息要按照訂閱者的數量進行復制,所以處理效能會隨著訂閱者的增加而明顯由於一條訊息只發送給一個消費者,所以就算消費者再多,效能也不會明顯降低

Spring整合ActtiveMQ

applicationContext-consumer.xml配置檔案

<?xml version="1.0"encoding="UTF-8"?>
"http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:amp="http://activemq.apache.org/schema/core"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms.xsd">id="connectionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"
/>"cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">"targetConnectionFactory"ref="connectionFactory"/>"sessionCacheSize"value="5"/>package="com.yang.springcloud"/>"cachingConnectionFactory"destination-type="queue">"spring_queue"ref="queueListener"/>"cachingConnectionFactory"destination-type="topic">"spring_topic"ref="topicListener"/>

applicationContext-producer.xml配置檔案

<?xml version="1.0"encoding="UTF-8"?>
"http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:amp="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core.xsd">id="connetionFactory"
brokerURL="tcp://127.0.0.1:61616"
userName="admin"
password="admin"
/>"cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">"targetConnectionFactory"ref="connetionFactory"/>"sessionCacheSize"value="5"/>"jmsQueueTemplate"class="org.springframework.jms.core.JmsTemplate">"connectionFactory"ref="cachingConnectionFactory"/>"pubSubDomain"value="false"/>"jmsTopicTemplate"class="org.springframework.jms.core.JmsTemplate">"connectionFactory"ref="cachingConnectionFactory"/>"pubSubDomain"value="true"/>

訊息佇列模型的程式碼:

packagecom.yang.springcloud;


importorg.springframework.stereotype.Component;
importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;

/**
*點對點
*/
@Component//放入SpringIIOC容器,名稱queueListener
publicclassQueueListenerimplementsMessageListener{
//用於接收訊息
@Override
publicvoidonMessage(Messagemessage){
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("queue介面訊息:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
}
}

訂閱-釋出模型的程式碼:

packagecom.yang.springcloud;

importorg.springframework.stereotype.Component;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.MessageListener;
importjavax.jms.TextMessage;


/**
*釋出訂閱
*/
@Component//放入SpringIIOC容器,名稱queueListener
publicclassTopicListenerimplementsMessageListener{
@Override
publicvoidonMessage(Messagemessage){
if(messageinstanceofTextMessage){

TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("topic介面訊息:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}

}
}
}

演示Spring整合ActiveMQ具體程式碼:

packagecom.yang.springcloud.producter;


importorg.junit.Test;
importorg.junit.runner.RunWith;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Qualifier;
importorg.springframework.jms.core.JmsTemplate;
importorg.springframework.jms.core.MessageCreator;
importorg.springframework.test.context.ContextConfiguration;
importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.Session;
importjavax.jms.TextMessage;
/**
*演示Spring與ActiveMQ整合
*/
@RunWith(SpringJUnit4ClassRunner.class)//junit與spring整合
@ContextConfiguration("classpath:applicationContext-producer.xml")//載入spring配置檔案publicclassSpringProducer{
//點對點模式
@Autowired
@Qualifier("jmsQueueTemplate")
privateJmsTemplatejmsQueueTemplate;

//釋出訂閱模式
@Autowired
@Qualifier("jmsTopicTemplate")
privateJmsTemplatejmsTopicTemplate;


/**
*點對點發送
*/
@Test
publicvoidptpSender(){
/**
*引數一:指定佇列的名稱
*引數二:MessageCreator介面,我們需要提供該介面的匿名內部實現
*/
jmsQueueTemplate.send("spring_queue",newMessageCreator(){
//我們只需要返回傳送的訊息內容即可
@Override
publicMessagecreateMessage(Sessionsession)throwsJMSException{
//建立文字訊息
TextMessagetextMessage=session.createTextMessage("springtestmessage");
returntextMessage;
}
});
System.out.println("訊息傳送已完成");
}

/**
*釋出訂閱傳送
*/
@Test
publicvoidpsSender(){
jmsTopicTemplate.send("spring_topic",newMessageCreator(){
@Override
publicMessagecreateMessage(Sessionsession)throwsJMSException{
//建立文字訊息
TextMessagetextMessage=session.createTextMessage("springtestmessage--topic");
returntextMessage;
}
});
System.out.println("訊息傳送已完成");
}
}

用於消費方啟動監聽類

packagecom.yang.springcloud.consumer;

importorg.springframework.context.support.ClassPathXmlApplicationContext;

importjava.io.IOException;

/**
*用於啟動消費方監聽
*/
publicclassSpringConsuer{

publicstaticvoidmain(String[]args)throwsIOException{
//1.載入spring配置
ClassPathXmlApplicationContext
cxt=newClassPathXmlApplicationContext("classpath:applicationContext-consumer.xml");
//2.啟動
cxt.start();

//3.阻塞方法,讓程式一直處於等待狀態
System.in.read();

}

}

SpringBoot整合ActiveMQ

匯入所需依賴


org.springframework.bootspring-boot-starterorg.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-activemqorg.springframework.bootspring-boot-starter-testtest

SpringBoot整合ActiveMQ生產者

程式碼示例:

application.yml配置檔案

server:
port:9001#埠
spring:
application:
name:activemq-producer#服務名稱

#springboot與activemq整合配置
activemq:
broker-url:tcp://127.0.0.1:61616#連線地址
user:admin#activemq使用者名稱
password:admin#activemq密碼

#指定傳送模式(點對點false,topic釋出訂閱true)
jms:
pub-sub-domain:true

#自己定義目標名稱(佇列或主題)
activemq:
name:springboot_topic

生產者的程式碼示例:

packagecom.yang.springcloud;


importcom.yang.producter.ProducerApplication;
importorg.junit.Test;
importorg.junit.runner.RunWith;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.beans.factory.annotation.Value;
importorg.springframework.boot.test.context.SpringBootTest;
importorg.springframework.jms.core.JmsMessagingTemplate;
importorg.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes=ProducerApplication.class)publicclassSpringBootProducer{
//JmsMessagingTemplate:用於工具類傳送訊息
@Autowired
privateJmsMessagingTemplatejmsMessagingTemplate;

@Value("${activemq.name}")
privateStringname;

@Test
publicvoidptpSender(){
/**
*引數一:佇列的名稱或主題名稱
*引數二:訊息內容
*/
jmsMessagingTemplate.convertAndSend(name,"springbootmessage--topic,我收到了");
System.out.println("完成了");
}
}

啟動類

packagecom.yang.producter;


importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;

/***生產者啟動類*/
@SpringBootApplication
publicclassProducerApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(ProducerApplication.class,args);
}
}

SpringBoot整合ActiveMQ消費者

具體程式碼示例:

application.yml配置檔案

server:
port:9002#埠
spring:
application:
name:activemq-consumer#服務名稱

#springboot與activemq整合配置
activemq:
broker-url:tcp://127.0.0.1:61616#連線地址
user:admin#activemq使用者名稱
password:admin#activemq密碼

#指定傳送模式(點對點false,釋出訂閱true)
jms:
pub-sub-domain:true

activemq:
name:springboot_topic

消費者類

packagecom.yang.consumer.listener;
importorg.springframework.jms.annotation.JmsListener;
importorg.springframework.stereotype.Component;

importjavax.jms.JMSException;
importjavax.jms.Message;
importjavax.jms.TextMessage;

/**
*用於監聽訊息類(既可以用於佇列的監聽,也可以用於主題監聽)
*/
@Component//放入IOC容器
publicclassMsgListener{
/**
*用於接收訊息的方法
*destination:佇列的名稱或主題的名稱
*/
@JmsListener(destination="${activemq.name}")
publicvoidreceiveMessage(Messagemessage){
if(messageinstanceofTextMessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("接收訊息:"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}

}
}
}

消費者啟動類

packagecom.yang.consumer;


importorg.springframework.boot.SpringApplication;
importorg.springframework.boot.autoconfigure.SpringBootApplication;

/**
*訊息消費者啟動類
*/
@SpringBootApplication
publicclassConsumerApplication{
publicstaticvoidmain(String[]args){
SpringApplication.run(ConsumerApplication.class,args);
}
}
                      END結束

●前端必須要掌握的優秀框架React ● 百分之八十都懂得RabbitMQ,三萬字一篇詳細給你搞定 ● Dubbo專欄:SpringBoot整合Dubbo服務降級與豪豬斷路器二 ●Dubbo專欄:SpringBoot整合Dubbo服務降級與豪豬斷路器一 ●Dubbo專欄:Dubbo分散式系統入門

喜歡的可以收藏下和點點關注,分享出去