ActiveMQ訊息中介軟體簡單配置
@目錄
訊息中介軟體
什麼是訊息中介軟體
概述
訊息中介軟體可以理解成就是一個服務軟體,儲存資訊的容器,比如生活中的快遞雲櫃.
我們把資料放到訊息中介軟體當中, 然後通知對應的服務進行獲取
訊息中介軟體是在訊息的傳輸過程中儲存資訊的容器
訊息中介軟體應用場景
- 使用訊息伺服器當做大的佇列使用, 先進先出, 來處理高併發寫入操作
- 使用訊息伺服器可以將業務系統的序列執行改為並行執行, 處理效率高, 更合理的榨取伺服器的效能.
同步與非同步技術
同步技術
dubbo是一中同步技術, 實時性高, controller呼叫service專案, 呼叫就執行,
如果service專案中的程式碼沒有執行完, controller裡面的程式碼一致等待結果.
非同步技術
mq訊息中介軟體技術(jms) 是一種非同步技術, 訊息傳送方, 將訊息傳送給訊息伺服器,
訊息伺服器未必立即處理.什麼時候去處理, 主要看訊息伺服器是否繁忙,
訊息進入伺服器後會進入佇列中, 先進先出.實時性不高.
JMS
概述:
jms的全稱叫做Java message service (Java訊息服務) jms是jdk底層定義的規範
各大廠商都是實現這個規範的技術
jms訊息伺服器同類型技術
ActiveMQ:是apache的一個比較老牌的訊息中介軟體, 它比較均衡, 既不是最安全的, 也不是最快的.
RabbitMQ:是阿里巴巴的一個訊息中介軟體, 更適合金融類業務, 它對資料的安全性比較高.能夠保證資料不丟失.
Kafka:Apache下的一個子專案。特點:高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;適合處理海量資料。
JMS中支援的訊息型別:
TextMessage: 一個字串物件
MapMessage:key-value
ObjectMessage:一個序列化的 Java 物件
BytesMessage:一個位元組的資料流
StreamMessage:Java 原始值的資料流
JMS中的兩種傳送模式
點對點模式
一個傳送方, 一個接收方. 也可以多個傳送方, 一個接收方, 主要是接收方必須是第一個.
訂閱釋出模式
一個傳送方, 多個接收方. 傳送方也可以是多個, 主要看接收方, 接收方必須是多個
ActiveMQ安裝
連結: https://pan.baidu.com/s/1B0ZW3_Z3xcamUCniNjd10Q 提取碼: avr2
- 將apache-activemq-5.12.0-bin.tar.gz 上傳至Linux伺服器
- 解壓此檔案
tar zxvf apache-activemq-5.12.0-bin.tar.gz - 為apache-activemq-5.12.0目錄賦權
chmod 777 apache-activemq-5.12.0 - 進入apache-activemq-5.12.0\bin目錄賦與執行許可權
cd /usr/local/apache-activemq-5.12.0/bin
chmod 755 activemq - 啟動
./activemq start - 在瀏覽器當中輸入http://192.168.0.106:8161/ ( ip:8161)
- 進入管理頁面
- 使用者名稱和密碼都是 admin
說明
- Number Of Pending Messages :等待消費的訊息 這個是當前未出佇列的數量。
- Number Of Consumers :消費者 這個是消費者端的消費者數量
- Messages Enqueued :進入佇列的訊息 進入佇列的總數量,包括出佇列的。
- Messages Dequeued :出了佇列的訊息 可以理解為是消費這消費掉的數量。
訊息伺服器小例子
- 建立普通maven Jar工程
- 引入pom依賴
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
點對點模式Queue
建立QueueProducer
public class QueueProducer {
public static void main(String[] args) throws Exception{
//1.建立連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.獲取連線
Connection connection = connectionFactory.createConnection();
//3.啟動連線
connection.start();
//4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立佇列物件, 指定傳送的佇列名稱, 佇列名稱可以隨意起名, 但是傳送到哪裡, 就要從哪裡去接收
Queue queue = session.createQueue("test-queue");
//6.建立訊息生產者
MessageProducer producer = session.createProducer(queue);
//7.建立訊息
TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
//8.傳送訊息
producer.send(textMessage);
//9.關閉資源
producer.close();
session.close();
connection.close();
}
}
建立QueueConsumer
public class QueueConsumer {
public static void main(String[] args) throws Exception{
//1.建立連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.獲取連線
Connection connection = connectionFactory.createConnection();
//3.啟動連線
connection.start();
//4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立佇列物件
Queue queue = session.createQueue("test-queue");
//6.建立訊息消費
MessageConsumer consumer = session.createConsumer(queue);
//7.監聽訊息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到訊息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.等待鍵盤輸入
System.in.read();
//9.關閉資源
consumer.close();
session.close();
connection.close();
}
}
執行QueueProducer後執行QueueConsumer
訂閱釋出模式Topic
TopicConsumer1
public class TopicConsumer1 {
public static void main(String[] args) throws Exception {
//1.建立連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.獲取連線
Connection connection = connectionFactory.createConnection();
//3.啟動連線
connection.start();
//4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立主題物件
Topic topic = session.createTopic("test-topic");
//6.建立訊息消費
MessageConsumer consumer = session.createConsumer(topic);
//7.監聽訊息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到訊息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待鍵盤輸入
System.in.read();
//9.關閉資源
consumer.close();
session.close();
connection.close();
}
}
TopicConsumer2
public class TopicConsumer2 {
public static void main(String[] args) throws Exception {
//1.建立連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.獲取連線
Connection connection = connectionFactory.createConnection();
//3.啟動連線
connection.start();
//4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立主題物件
Topic topic = session.createTopic("test-topic");
//6.建立訊息消費
MessageConsumer consumer = session.createConsumer(topic);
//7.監聽訊息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到訊息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//8.等待鍵盤輸入
System.in.read();
//9.關閉資源
consumer.close();
session.close();
connection.close();
}
}
TopicProducer
public class TopicProducer {
public static void main(String[] args) throws Exception {
//1.建立連線工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.106:61616");
//2.獲取連線
Connection connection = connectionFactory.createConnection();
//3.啟動連線
connection.start();
//4.獲取session (引數1:是否啟動事務,引數2:訊息確認模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立主題物件
Topic topic = session.createTopic("test-topic");
//6.建立訊息生產者
MessageProducer producer = session.createProducer(topic);
//7.建立訊息
TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
//8.傳送訊息
producer.send(textMessage);
//9.關閉資源
producer.close();
session.close();
connection.close();
}
}