1. 程式人生 > >淘淘商城24_ActiveMq訊息佇列01

淘淘商城24_ActiveMq訊息佇列01

一、什麼是訊息佇列MQ?

舉個例子:們去銀行視窗辦理業務,經常會遇到有好多人都在辦業務,這個時候呢,就需要排隊,等待視窗喊號:

                  001號顧客請到1號視窗辦理業務

                  002號顧客請到2號視窗辦理業務

通過以上例子就體現出了一種訊息佇列的作用:排隊

Mq解決了排隊和高併發的問題

1. MQ的應用場景

@Test
	public void testTopicConsumer() throws Exception {
		// 第一步:建立一個ConnectionFactory物件。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616");
		// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線。呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
		Topic topic = session.createTopic("test-topic");
		// 第六步:使用Session物件建立一個Consumer物件。
		MessageConsumer consumer = session.createConsumer(topic);
		// 第七步:接收訊息。
		consumer.setMessageListener(new MessageListener() {

			@Override
			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					String text = null;
					// 取訊息的內容
					text = textMessage.getText();
					// 第八步:列印訊息。
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
           //這裡啟動三個消費者
		System.out.println("topic的消費端03。。。。。");
		// 等待鍵盤輸入
		System.in.read();
		// 第九步:關閉資源
		consumer.close();
		session.close();
		connection.close();
	}

大數量的提交,例如提交訂單,秒殺

2. 同類型的產品

rubbitMQ, rackMQ(阿里的產品), activeMQ, kafaka(大資料裡hadoop會用到)

二、ActiveMq

1. 什麼是activeMq?

ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

主要特點:

1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)

3. 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性

4. 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上

5. 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支援通過JDBC和journal提供高速的訊息持久化

7. 從設計上保證了高效能的叢集,客戶端-伺服器,點對點

8. 支援Ajax

9. 支援與Axis的整合

10. 可以很容易得呼叫內嵌JMS provider,進行測試

2. ActiveMQ的訊息形式

對於訊息的傳遞有兩種型別:

一種是點對點的,即一個生產者和一個消費者一一對應;-------->q

另一種是釋出/訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。-------->topic

JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。

· StreamMessage -- Java原始值的資料流

· MapMessage-- 一套名稱-值對

· TextMessage-- 一個字串物件

· ObjectMessage--一個序列化的 Java物件

· BytesMessage--一個位元組的資料流

三、ActiveMQ的安裝

百度網盤:activeMq的安裝包

連結:https://pan.baidu.com/s/171HOqMN1aoqAZMaszKFUrg 
提取碼:z16n 

 

1. 進入http://activemq.apache.org/下載ActiveMQ

我們使用的版本是5.12.0

 

2. 安裝環境:

因為之前在dubbo這個工程中新增過jdk,所以我現在就在taotao-dubbo這個裡面新增activeMQ了

  1. 需要jdk (不需要贅述了,之前安裝過了)
  2. 安裝Linux系統。生產環境都是Linux系統。

3. 安裝步驟

第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。

第二步:解壓縮。

第三步:啟動。

使用bin目錄下的activemq命令啟動:

[[email protected] bin]# ./activemq start

關閉:

[[email protected] bin]# ./activemq stop

檢視狀態:

[[email protected] bin]# ./activemq status

 

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2

 

四、進入管理後臺:埠號為8161

埠號說明:

前端頁面埠號為:8161

後臺開發埠號為:61616

http://172.18.34.94:8161/admin

使用者名稱:admin

密碼:admin

五、Queue

生產者:生產訊息,傳送端。

把jar包新增到工程中。使用5.11.2版本的jar包。

1. 新增依賴

2. Producer

2.1 寫一個測試類ActiveMqTest.java

步驟:

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。

第二步:使用ConnectionFactory物件建立一個Connection物件。

第三步:開啟連線,呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。

第六步:使用Session物件建立一個Producer物件。

第七步:建立一個Message物件,建立一個TextMessage物件。

第八步:使用Producer物件傳送訊息。

第九步:關閉資源。

package com.taotao.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

public class ActiveMqTest {
	
	@Test
	public void queueProducerTest() throws Exception{
		// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
		//brokerURL伺服器的ip及埠號
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616");
		// 第二步:使用ConnectionFactory物件建立一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線,呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
		//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
		//引數:佇列的名稱。
		Queue queue = session.createQueue("test-queue");
		// 第六步:使用Session物件建立一個Producer物件。
		MessageProducer producer = session.createProducer(queue);
		// 第七步:建立一個Message物件,建立一個TextMessage物件。
		/*TextMessage message = new ActiveMQTextMessage();
		message.setText("hello activeMq,this is my first test.");*/
		TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
		// 第八步:使用Producer物件傳送訊息。
		producer.send(textMessage);
		// 第九步:關閉資源。
		producer.close();
		session.close();
		connection.close();
	}
}

2.2. 測試

2.3 Consumer

消費者:接收訊息。

第一步:建立一個ConnectionFactory物件。

第二步:從ConnectionFactory物件中獲得一個Connection物件。

第三步:開啟連線。呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。

第六步:使用Session物件建立一個Consumer物件。

第七步:接收訊息。

第八步:列印訊息。

第九步:關閉資源

/**
	 * 消費者
	 * @throws Exception
	 */
	@Test
	public void queueConsumerTest() throws Exception {
		// 第一步:建立一個ConnectionFactory物件。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616");
		// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線。呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
		Queue queue = session.createQueue("test-queue");
		// 第六步:使用Session物件建立一個Consumer物件。
		MessageConsumer consumer = session.createConsumer(queue);
		// 第七步:接收訊息。
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					String text = null;
					//取訊息的內容
					text = textMessage.getText();
					// 第八步:列印訊息。
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		//等待鍵盤輸入
		System.in.read();
		// 第九步:關閉資源
		consumer.close();
		session.close();
		connection.close();
	}

此時生產者釋出到activeMq的訊息,已經被消費者接受,所以頁面發生變化,如下圖:  

六、topic(先啟動消費者,再啟動生產者)

1. Producer

這種釋出服務的方式是預設不在伺服器端進行快取的,不持久化,

使用步驟:

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。

第二步:使用ConnectionFactory物件建立一個Connection物件。

第三步:開啟連線,呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Topic物件。

第六步:使用Session物件建立一個Producer物件。

第七步:建立一個Message物件,建立一個TextMessage物件。

第八步:使用Producer物件傳送訊息。

第九步:關閉資源。

@Test
	public void testTopicProducer() throws Exception {
		// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
		// brokerURL伺服器的ip及埠號
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.104:61616");
		// 第二步:使用ConnectionFactory物件建立一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線,呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		// 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
		// 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。
		// 引數:話題的名稱。
		Topic topic = session.createTopic("test-topic");
		// 第六步:使用Session物件建立一個Producer物件。
		MessageProducer producer = session.createProducer(topic);
		// 第七步:建立一個Message物件,建立一個TextMessage物件。
		/*
		 * TextMessage message = new ActiveMQTextMessage(); message.setText(
		 * "hello activeMq,this is my first test.");
		 */
		TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
		// 第八步:使用Producer物件傳送訊息。
		producer.send(textMessage);
		// 第九步:關閉資源。
		producer.close();
		session.close();
		connection.close();
	}

2. Consumer

這種Topic模式和Queue模式的區別在於,Topic釋出的服務沒有消費者消費的情況下是不會在伺服器端進行快取的,直接就會找不到了,但是Queue這種模式如果消費端沒有消費的話,是直都會儲存到伺服器端的.

消費者:接收訊息。

第一步:建立一個ConnectionFactory物件。

第二步:從ConnectionFactory物件中獲得一個Connection物件。

第三步:開啟連線。呼叫Connection物件的start方法。

第四步:使用Connection物件建立一個Session物件。

第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。

第六步:使用Session物件建立一個Consumer物件。

第七步:接收訊息。

第八步:列印訊息。

第九步:關閉資源

可以看到,在後臺啟動的3個消費者服務,都消費了生產者釋出的訊息:hello activeMq,this is my topic test