1. 程式人生 > >ActiveMQ訊息佇列的使用和應用

ActiveMQ訊息佇列的使用和應用

一、什麼是ActiveMQ

AciveMQ是Apache出品的目前最流行,能力強勁的開源訊息匯流排

訊息列隊有兩種訊息模式,一種是點對點的訊息模式,還有一種就是訂閱的模式.

主要功能:

  1. 解決伺服器之間的耦合性
  2. 使用訊息佇列,增加系統併發處理量

主要應用場景:

  1. 當系統使用簡訊平臺、郵件平臺的時候
  2. 當系統使用搜索平臺、快取平臺的時候你

二、使用外接ActiveMQ流程:

1.官網地址:http://activemq.apache.org/

2.安裝包下載完成後解壓後 就是這個樣子了 (注意一定要解壓到全英文路徑下的包內)

3.開啟你的bin目錄,開啟你係統對應位數的目錄

4.雙擊啟動activemq.bat

5.點完之後會自動彈出來doc啟動,稍等一下 如果你最後跟我的一樣 說明啟動成功了

6.啟動成功,在瀏覽器中訪問http://localhost:8161/ 就能訪問到activemq的頁面

7.登入成功後(使用者名稱和密碼都是admin),會有兩個佇列:topicqueue (後臺建立佇列,這邊會實時顯示)

三、兩種訊息模式

1.點對點的實現程式碼

專案使用MAVEN來構建(pom.xml)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.itsiji</groupId>
  <artifactId>activemq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <!--activemq-->
  <dependencies>
  	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-client</artifactId>
		<version>5.13.4</version>
	 </dependency>
  </dependencies>
  
  <build>
	<plugins>			
		<!-- java編譯外掛 -->
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.2</version>
			<configuration>
				<source>1.7</source>
				<target>1.7</target>
				<encoding>UTF-8</encoding>
			</configuration>
	     </plugin>
	</plugins>
  </build>
</project>

1.1點對點的傳送方

需要注意的:連線地址的ip和埠號是 127.0.0.1:61616

                      在頁面管理控制檯檢視訊息佇列的ip和埠號是 127.0.0.1:8161

package com.itsiji.test.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestQueueSend {
	

	
	    //連線賬號
	    private String userName = "";
	    //連線密碼
	    private String password = "";
	    //連線地址
	    private String brokerURL = "tcp://127.0.0.1:61616";
	    //connection的工廠
	    private ConnectionFactory factory;
	    //連線物件
	    private Connection connection;
	    //一個操作會話
	    private Session session;
	    //目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic
	    private Destination destination;
	    //生產者,就是產生資料的物件
	    private MessageProducer producer;
	    
	    public static void main(String[] args) {
	         TestQueueSend send = new TestQueueSend();
	        send.start();
	    }
	    
	    public void start(){
	        try {
	            //根據使用者名稱,密碼,url建立一個連線工廠
	            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
	            //從工廠中獲取一個連線
	            connection = factory.createConnection();
	            //測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了
	            connection.start();
	            //建立一個session
	            //第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED
	            //第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
	            //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。
	            //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。
	            //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。
	            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	            //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立
	            destination = session.createQueue("text-msg");
	            //從session中,獲取一個訊息生產者
	            producer = session.createProducer(destination);
	            //設定生產者的模式,有兩種可選
	            //DeliveryMode.PERSISTENT 當activemq關閉的時候,佇列資料將會被儲存
	            //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,佇列裡面的資料將會被清空
	            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	            
	            //建立一條訊息,當然,訊息的型別有很多,如文字,位元組,物件等,可以通過session.create..方法來創建出來
	            TextMessage textMsg = session.createTextMessage("呵呵");
	            for(int i = 0 ; i < 100 ; i ++){
	                //傳送一條訊息
	                producer.send(textMsg);
	            }
	            
	            System.out.println("傳送訊息成功");
	            //即便生產者的物件關閉了,程式還在執行哦
	            producer.close();
	            
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }
	    }
	}

1.2點對點的接收方

package com.itsiji.test.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestQueueReceive {
	

	
	    //連線賬號
	    private String userName = "";
	    //連線密碼
	    private String password = "";
	    //連線地址
	    private String brokerURL = "tcp://127.0.0.1:61616";
	    //connection的工廠
	    private ConnectionFactory factory;
	    //連線物件
	    private Connection connection;
	    //一個操作會話
	    private Session session;
	    //目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic
	    private Destination destination;
	    //消費者,就是接收資料的物件
	    private MessageConsumer consumer;
	    public static void main(String[] args) {
	        TestQueueReceive receive = new TestQueueReceive();
	        receive.start();
	    }
	    
	    public void start(){
	        try {
	            //根據使用者名稱,密碼,url建立一個連線工廠
	            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
	            //從工廠中獲取一個連線
	            connection = factory.createConnection();
	            //測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了
	            connection.start();
	            //建立一個session
	            //第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED
	            //第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
	            //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。
	            //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。
	            //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。
	            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	            //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立
	            destination = session.createQueue("text-msg");
	            //根據session,建立一個接收者物件
	            consumer = session.createConsumer(destination);
	            
	            
	            //實現一個訊息的監聽器
	            //實現這個監聽器後,以後只要有訊息,就會通過這個監聽器接收到
	            consumer.setMessageListener(new MessageListener() {
	                @Override
	                public void onMessage(Message message) {
	                    try {
	                        //獲取到接收的資料
	                        String text = ((TextMessage)message).getText();
	                        System.out.println(text);
	                    } catch (JMSException e) {
	                        e.printStackTrace();
	                    }
	                }
	            });
	            //關閉接收端,也不會終止程式哦
//	            consumer.close();
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }
	    }
	}

2.訂閱/釋出模式的實現程式碼

2.1訂閱模式的釋出方

package com.itsiji.test.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestTopicSend {
	
//連線賬號
private String userName = "";
//連線密碼
private String password = "";
//連線地址
private String brokerURL = "tcp://127.0.0.1:61616";
//connection的工廠
private ConnectionFactory factory;
//連線物件
private Connection connection;
//一個操作會話
private Session session;
//目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic
private Destination destination;
//生產者,就是產生資料的物件
private MessageProducer producer;

public static void main(String[] args) {
    TestTopicSend send = new TestTopicSend();
    send.start();
}

public void start(){
    try {
        //根據使用者名稱,密碼,url建立一個連線工廠
        factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
        //從工廠中獲取一個連線
        connection = factory.createConnection();
        //測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了
        connection.start();
        //建立一個session
        //第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED
        //第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
        //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。
        //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。
        //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立
        
        
        
        //=======================================================
        //點對點與訂閱模式唯一不同的地方,就是這一行程式碼,點對點建立的是Queue,而訂閱模式建立的是Topic
        destination = session.createTopic("topic-text");
        //=======================================================
        
        
        
        
        //從session中,獲取一個訊息生產者
        producer = session.createProducer(destination);
        //設定生產者的模式,有兩種可選
        //DeliveryMode.PERSISTENT 當activemq關閉的時候,佇列資料將會被儲存
        //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,佇列裡面的資料將會被清空
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        //建立一條訊息,當然,訊息的型別有很多,如文字,位元組,物件等,可以通過session.create..方法來創建出來
        TextMessage textMsg = session.createTextMessage("哈哈");
        long s = System.currentTimeMillis();
        for(int i = 0 ; i < 100 ; i ++){
            //傳送一條訊息
            producer.send(textMsg);
        }
        long e = System.currentTimeMillis();
        System.out.println("傳送訊息成功");
        System.out.println(e - s);
        //即便生產者的物件關閉了,程式還在執行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
}

2.2訂閱模式的接收方

package com.itsiji.test.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
public class TestTopicReceive {

    //連線賬號
    private String userName = "";
    //連線密碼
    private String password = "";
    //連線地址
    private String brokerURL = "tcp://127.0.0.1:61616";
    //connection的工廠
    private ConnectionFactory factory;
    //連線物件
    private Connection connection;
    //一個操作會話
    private Session session;
    //目的地,其實就是連線到哪個佇列,如果是點對點,那麼它的實現是Queue,如果是訂閱模式,那它的實現是Topic
    private Destination destination;
    //生產者,就是產生資料的物件
    private MessageProducer producer;
    
    public static void main(String[] args) {
        TestTopicReceive send = new TestTopicReceive();
        send.start();
    }
    
    public void start(){
        try {
            //根據使用者名稱,密碼,url建立一個連線工廠
            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
            //從工廠中獲取一個連線
            connection = factory.createConnection();
            //測試過這個步驟不寫也是可以的,但是網上的各個文件都寫了
            connection.start();
            //建立一個session
            //第一個引數:是否支援事務,如果為true,則會忽略第二個引數,被jms伺服器設定為SESSION_TRANSACTED
            //第二個引數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
            //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常傳送成功。
            //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會當作傳送成功,並刪除訊息。
            //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個佇列吧,這裡就是連線了一個名為"text-msg"的佇列,這個會話將會到這個佇列,當然,如果這個佇列不存在,將會被建立
            
            
            
            //=======================================================
            //點對點與訂閱模式唯一不同的地方,就是這一行程式碼,點對點建立的是Queue,而訂閱模式建立的是Topic
            destination = session.createTopic("topic-text");
            //=======================================================
            
            
            
            
            //從session中,獲取一個訊息生產者
            producer = session.createProducer(destination);
            //設定生產者的模式,有兩種可選
            //DeliveryMode.PERSISTENT 當activemq關閉的時候,佇列資料將會被儲存
            //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,佇列裡面的資料將會被清空
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            
            //建立一條訊息,當然,訊息的型別有很多,如文字,位元組,物件等,可以通過session.create..方法來創建出來
            TextMessage textMsg = session.createTextMessage("哈哈");
            long s = System.currentTimeMillis();
            for(int i = 0 ; i < 100 ; i ++){
                //傳送一條訊息
                textMsg.setText("哈哈" + i);
                producer.send(textMsg);
            }
            long e = System.currentTimeMillis();
            System.out.println("傳送訊息成功");
            System.out.println(e - s);
            //即便生產者的物件關閉了,程式還在執行哦
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
	
}

3.傳送訊息的資料型別

javax.jms.Message 這個介面,只要是這個介面的資料,都可以被髮送

//純字串的資料
session.createTextMessage();
//序列化的物件
session.createObjectMessage();
//流,可以用來傳遞檔案等
session.createStreamMessage();
 //用來傳遞位元組
session.createBytesMessage();
//這個方法創建出來的就是一個map,可以把它當作map來用,當你看了它的一些方法,你就懂了
session.createMapMessage();
//這個方法,拿到的是javax.jms.Message,是所有message的介面
session.createMessage();