ActiveMQ訊息佇列的使用和應用
阿新 • • 發佈:2018-12-30
一、什麼是ActiveMQ
AciveMQ是Apache出品的目前最流行,能力強勁的開源訊息匯流排
訊息列隊有兩種訊息模式,一種是點對點的訊息模式,還有一種就是訂閱的模式.
主要功能:
- 解決伺服器之間的耦合性
- 使用訊息佇列,增加系統併發處理量
主要應用場景:
- 當系統使用簡訊平臺、郵件平臺的時候
- 當系統使用搜索平臺、快取平臺的時候你
二、使用外接ActiveMQ流程:
1.官網地址:http://activemq.apache.org/
2.安裝包下載完成後解壓後 就是這個樣子了 (注意一定要解壓到全英文路徑下的包內)
4.雙擊啟動activemq.bat
5.點完之後會自動彈出來doc啟動,稍等一下 如果你最後跟我的一樣 說明啟動成功了
6.啟動成功,在瀏覽器中訪問http://localhost:8161/ 就能訪問到activemq的頁面
7.登入成功後(使用者名稱和密碼都是admin),會有兩個佇列:topic和queue (後臺建立佇列,這邊會實時顯示)
三、兩種訊息模式
1.點對點的實現程式碼
專案使用MAVEN來構建(pom.xml)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itsiji</groupId> <artifactId>activemq</artifactId> <version>0.0.1-SNAPSHOT</version> <!--activemq--> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency> </dependencies> <build> <plugins> <!-- java編譯外掛 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
1.1點對點的傳送方
需要注意的:連線地址的ip和埠號是 127.0.0.1:61616
在頁面管理控制檯檢視訊息佇列的ip和埠號是 127.0.0.1:8161
package com.itsiji.test.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TestQueueSend { //連線賬號 private String userName = ""; //連線密碼 private String password = ""; //連線地址 private String brokerURL = "tcp://127.0.0.1:61616"; //connection的工廠 private ConnectionFactory factory; //連線物件 private Connection connection; //一個操作會話 private Session session; //目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic private Destination destination; //生產者,就是產生資料的物件 private MessageProducer producer; public static void main(String[] args) { TestQueueSend send = new TestQueueSend(); send.start(); } public void start(){ try { //根據使用者名稱,密碼,url建立一個連線工廠 factory = new ActiveMQConnectionFactory(userName, password, brokerURL); //從工廠中獲取一個連線 connection = factory.createConnection(); //測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了 connection.start(); //建立一個session //第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED //第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。 //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。 //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。 //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立 destination = session.createQueue("text-msg"); //從session中,獲取一個訊息生產者 producer = session.createProducer(destination); //設定生產者的模式,有兩種可選 //DeliveryMode.PERSISTENT 當activemq關閉的時候,佇列資料將會被儲存 //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,佇列裡面的資料將會被清空 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //建立一條訊息,當然,訊息的型別有很多,如文字,位元組,物件等,可以通過session.create..方法來創建出來 TextMessage textMsg = session.createTextMessage("呵呵"); for(int i = 0 ; i < 100 ; i ++){ //傳送一條訊息 producer.send(textMsg); } System.out.println("傳送訊息成功"); //即便生產者的物件關閉了,程式還在執行哦 producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
1.2點對點的接收方
package com.itsiji.test.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TestQueueReceive {
//連線賬號
private String userName = "";
//連線密碼
private String password = "";
//連線地址
private String brokerURL = "tcp://127.0.0.1:61616";
//connection的工廠
private ConnectionFactory factory;
//連線物件
private Connection connection;
//一個操作會話
private Session session;
//目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic
private Destination destination;
//消費者,就是接收資料的物件
private MessageConsumer consumer;
public static void main(String[] args) {
TestQueueReceive receive = new TestQueueReceive();
receive.start();
}
public void start(){
try {
//根據使用者名稱,密碼,url建立一個連線工廠
factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
//從工廠中獲取一個連線
connection = factory.createConnection();
//測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了
connection.start();
//建立一個session
//第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED
//第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。
//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。
//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立
destination = session.createQueue("text-msg");
//根據session,建立一個接收者物件
consumer = session.createConsumer(destination);
//實現一個訊息的監聽器
//實現這個監聽器後,以後只要有訊息,就會通過這個監聽器接收到
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//獲取到接收的資料
String text = ((TextMessage)message).getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//關閉接收端,也不會終止程式哦
// consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.訂閱/釋出模式的實現程式碼
2.1訂閱模式的釋出方
package com.itsiji.test.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TestTopicSend {
//連線賬號
private String userName = "";
//連線密碼
private String password = "";
//連線地址
private String brokerURL = "tcp://127.0.0.1:61616";
//connection的工廠
private ConnectionFactory factory;
//連線物件
private Connection connection;
//一個操作會話
private Session session;
//目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic
private Destination destination;
//生產者,就是產生資料的物件
private MessageProducer producer;
public static void main(String[] args) {
TestTopicSend send = new TestTopicSend();
send.start();
}
public void start(){
try {
//根據使用者名稱,密碼,url建立一個連線工廠
factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
//從工廠中獲取一個連線
connection = factory.createConnection();
//測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了
connection.start();
//建立一個session
//第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED
//第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。
//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。
//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立
//=======================================================
//點對點與訂閱模式唯一不同的地方,就是這一行程式碼,點對點建立的是Queue,而訂閱模式建立的是Topic
destination = session.createTopic("topic-text");
//=======================================================
//從session中,獲取一個訊息生產者
producer = session.createProducer(destination);
//設定生產者的模式,有兩種可選
//DeliveryMode.PERSISTENT 當activemq關閉的時候,佇列資料將會被儲存
//DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,佇列裡面的資料將會被清空
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//建立一條訊息,當然,訊息的型別有很多,如文字,位元組,物件等,可以通過session.create..方法來創建出來
TextMessage textMsg = session.createTextMessage("哈哈");
long s = System.currentTimeMillis();
for(int i = 0 ; i < 100 ; i ++){
//傳送一條訊息
producer.send(textMsg);
}
long e = System.currentTimeMillis();
System.out.println("傳送訊息成功");
System.out.println(e - s);
//即便生產者的物件關閉了,程式還在執行哦
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.2訂閱模式的接收方
package com.itsiji.test.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TestTopicReceive {
//連線賬號
private String userName = "";
//連線密碼
private String password = "";
//連線地址
private String brokerURL = "tcp://127.0.0.1:61616";
//connection的工廠
private ConnectionFactory factory;
//連線物件
private Connection connection;
//一個操作會話
private Session session;
//目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic
private Destination destination;
//生產者,就是產生資料的物件
private MessageProducer producer;
public static void main(String[] args) {
TestTopicReceive send = new TestTopicReceive();
send.start();
}
public void start(){
try {
//根據使用者名稱,密碼,url建立一個連線工廠
factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
//從工廠中獲取一個連線
connection = factory.createConnection();
//測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了
connection.start();
//建立一個session
//第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED
//第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。
//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。
//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立
//=======================================================
//點對點與訂閱模式唯一不同的地方,就是這一行程式碼,點對點建立的是Queue,而訂閱模式建立的是Topic
destination = session.createTopic("topic-text");
//=======================================================
//從session中,獲取一個訊息生產者
producer = session.createProducer(destination);
//設定生產者的模式,有兩種可選
//DeliveryMode.PERSISTENT 當activemq關閉的時候,佇列資料將會被儲存
//DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,佇列裡面的資料將會被清空
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//建立一條訊息,當然,訊息的型別有很多,如文字,位元組,物件等,可以通過session.create..方法來創建出來
TextMessage textMsg = session.createTextMessage("哈哈");
long s = System.currentTimeMillis();
for(int i = 0 ; i < 100 ; i ++){
//傳送一條訊息
textMsg.setText("哈哈" + i);
producer.send(textMsg);
}
long e = System.currentTimeMillis();
System.out.println("傳送訊息成功");
System.out.println(e - s);
//即便生產者的物件關閉了,程式還在執行哦
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3.傳送訊息的資料型別
javax.jms.Message 這個介面,只要是這個介面的資料,都可以被髮送
//純字串的資料
session.createTextMessage();
//序列化的物件
session.createObjectMessage();
//流,可以用來傳遞檔案等
session.createStreamMessage();
//用來傳遞位元組
session.createBytesMessage();
//這個方法創建出來的就是一個map,可以把它當作map來用,當你看了它的一些方法,你就懂了
session.createMapMessage();
//這個方法,拿到的是javax.jms.Message,是所有message的介面
session.createMessage();