淘淘商城24_ActiveMq訊息佇列01
一、什麼是訊息佇列MQ?
舉個例子:們去銀行視窗辦理業務,經常會遇到有好多人都在辦業務,這個時候呢,就需要排隊,等待視窗喊號:
001號顧客請到1號視窗辦理業務
002號顧客請到2號視窗辦理業務
通過以上例子就體現出了一種訊息佇列的作用:排隊
Mq解決了排隊和高併發的問題
1. MQ的應用場景
@Test public void testTopicConsumer() throws Exception { // 第一步:建立一個ConnectionFactory物件。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616"); // 第二步:從ConnectionFactory物件中獲得一個Connection物件。 Connection connection = connectionFactory.createConnection(); // 第三步:開啟連線。呼叫Connection物件的start方法。 connection.start(); // 第四步:使用Connection物件建立一個Session物件。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session物件建立一個Consumer物件。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收訊息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取訊息的內容 text = textMessage.getText(); // 第八步:列印訊息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //這裡啟動三個消費者 System.out.println("topic的消費端03。。。。。"); // 等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); }
大數量的提交,例如提交訂單,秒殺
2. 同類型的產品
rubbitMQ, rackMQ(阿里的產品), activeMQ, kafaka(大資料裡hadoop會用到)
二、ActiveMq
1. 什麼是activeMq?
ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
主要特點:
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)
3. 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
4. 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
5. 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支援通過JDBC和journal提供高速的訊息持久化
7. 從設計上保證了高效能的叢集,客戶端-伺服器,點對點
8. 支援Ajax
9. 支援與Axis的整合
10. 可以很容易得呼叫內嵌JMS provider,進行測試
2. ActiveMQ的訊息形式
對於訊息的傳遞有兩種型別:
一種是點對點的,即一個生產者和一個消費者一一對應;-------->q
另一種是釋出/訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。-------->topic
JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。
· StreamMessage -- Java原始值的資料流
· MapMessage-- 一套名稱-值對
· TextMessage-- 一個字串物件
· ObjectMessage--一個序列化的 Java物件
· BytesMessage--一個位元組的資料流
三、ActiveMQ的安裝
百度網盤:activeMq的安裝包
連結:https://pan.baidu.com/s/171HOqMN1aoqAZMaszKFUrg
提取碼:z16n
1. 進入http://activemq.apache.org/下載ActiveMQ
我們使用的版本是5.12.0
2. 安裝環境:
因為之前在dubbo這個工程中新增過jdk,所以我現在就在taotao-dubbo這個裡面新增activeMQ了
- 需要jdk (不需要贅述了,之前安裝過了)
- 安裝Linux系統。生產環境都是Linux系統。
3. 安裝步驟
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。
第二步:解壓縮。
第三步:啟動。
使用bin目錄下的activemq命令啟動:
[[email protected] bin]# ./activemq start
關閉:
[[email protected] bin]# ./activemq stop
檢視狀態:
[[email protected] bin]# ./activemq status
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2
四、進入管理後臺:埠號為8161
埠號說明:
前端頁面埠號為:8161
後臺開發埠號為:61616
http://172.18.34.94:8161/admin
使用者名稱:admin
密碼:admin
五、Queue
生產者:生產訊息,傳送端。
把jar包新增到工程中。使用5.11.2版本的jar包。
1. 新增依賴
2. Producer
2.1 寫一個測試類ActiveMqTest.java
步驟:
第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
第二步:使用ConnectionFactory物件建立一個Connection物件。
第三步:開啟連線,呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
第六步:使用Session物件建立一個Producer物件。
第七步:建立一個Message物件,建立一個TextMessage物件。
第八步:使用Producer物件傳送訊息。
第九步:關閉資源。
package com.taotao.test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class ActiveMqTest {
@Test
public void queueProducerTest() throws Exception{
// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
//brokerURL伺服器的ip及埠號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616");
// 第二步:使用ConnectionFactory物件建立一個Connection物件。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連線,呼叫Connection物件的start方法。
connection.start();
// 第四步:使用Connection物件建立一個Session物件。
//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
//引數:佇列的名稱。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session物件建立一個Producer物件。
MessageProducer producer = session.createProducer(queue);
// 第七步:建立一個Message物件,建立一個TextMessage物件。
/*TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
// 第八步:使用Producer物件傳送訊息。
producer.send(textMessage);
// 第九步:關閉資源。
producer.close();
session.close();
connection.close();
}
}
2.2. 測試
2.3 Consumer
消費者:接收訊息。
第一步:建立一個ConnectionFactory物件。
第二步:從ConnectionFactory物件中獲得一個Connection物件。
第三步:開啟連線。呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
第六步:使用Session物件建立一個Consumer物件。
第七步:接收訊息。
第八步:列印訊息。
第九步:關閉資源
/**
* 消費者
* @throws Exception
*/
@Test
public void queueConsumerTest() throws Exception {
// 第一步:建立一個ConnectionFactory物件。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616");
// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連線。呼叫Connection物件的start方法。
connection.start();
// 第四步:使用Connection物件建立一個Session物件。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session物件建立一個Consumer物件。
MessageConsumer consumer = session.createConsumer(queue);
// 第七步:接收訊息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//取訊息的內容
text = textMessage.getText();
// 第八步:列印訊息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待鍵盤輸入
System.in.read();
// 第九步:關閉資源
consumer.close();
session.close();
connection.close();
}
此時生產者釋出到activeMq的訊息,已經被消費者接受,所以頁面發生變化,如下圖:
六、topic(先啟動消費者,再啟動生產者)
1. Producer
這種釋出服務的方式是預設不在伺服器端進行快取的,不持久化,
使用步驟:
第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
第二步:使用ConnectionFactory物件建立一個Connection物件。
第三步:開啟連線,呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Topic物件。
第六步:使用Session物件建立一個Producer物件。
第七步:建立一個Message物件,建立一個TextMessage物件。
第八步:使用Producer物件傳送訊息。
第九步:關閉資源。
@Test
public void testTopicProducer() throws Exception {
// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
// brokerURL伺服器的ip及埠號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616");
// 第二步:使用ConnectionFactory物件建立一個Connection物件。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連線,呼叫Connection物件的start方法。
connection.start();
// 第四步:使用Connection物件建立一個Session物件。
// 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
// 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。
// 引數:話題的名稱。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session物件建立一個Producer物件。
MessageProducer producer = session.createProducer(topic);
// 第七步:建立一個Message物件,建立一個TextMessage物件。
/*
* TextMessage message = new ActiveMQTextMessage(); message.setText(
* "hello activeMq,this is my first test.");
*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
// 第八步:使用Producer物件傳送訊息。
producer.send(textMessage);
// 第九步:關閉資源。
producer.close();
session.close();
connection.close();
}
2. Consumer
這種Topic模式和Queue模式的區別在於,Topic釋出的服務沒有消費者消費的情況下是不會在伺服器端進行快取的,直接就會找不到了,但是Queue這種模式如果消費端沒有消費的話,是直都會儲存到伺服器端的.
消費者:接收訊息。
第一步:建立一個ConnectionFactory物件。
第二步:從ConnectionFactory物件中獲得一個Connection物件。
第三步:開啟連線。呼叫Connection物件的start方法。
第四步:使用Connection物件建立一個Session物件。
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
第六步:使用Session物件建立一個Consumer物件。
第七步:接收訊息。
第八步:列印訊息。
第九步:關閉資源
可以看到,在後臺啟動的3個消費者服務,都消費了生產者釋出的訊息:hello activeMq,this is my topic test