1. 程式人生 > >ActiveMQ5.15 - 入門 demo

ActiveMQ5.15 - 入門 demo

第一步:先下載activeMQ http://activemq.apache.org/ 我下載的是5.12,最新

匯入pom依賴:

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.3</version>
 </dependency>

第二部:解壓檔案apache-activemq-5.15.3-bin.zip 後啟動apache-activemq-5.15.3\bin\win64\activemq.bat

第三部:訪問http://localhost:8161/admin/  提示登陸:輸入使用者:admin,密碼:admin ,檢視你的訊息

接下來說重點,生產者producer和消費者consumer,producer生成訊息放到指定的佇列中,consumer從指定的佇列中消費訊息。

程式碼:

生產者:

package com.test;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
	static String NAME = ActiveMQConnection.DEFAULT_USER;//admin
	static String PSWD = ActiveMQConnection.DEFAULT_PASSWORD;//admin
	static String URL = "tcp://localhost:61616";
	
	static ConnectionFactory connectionFactory = null;
	static Connection connection = null;
	static Session session;
	static Queue queue;
	static MessageProducer producer;
	static Integer index = 0;

	public static void main(String[] args) {
	    
        //連結工廠,用來建立連結
		connectionFactory = new ActiveMQConnectionFactory(Producer.NAME, Producer.PSWD, Producer.URL);
		
		try {
			//建立連結
			connection = connectionFactory.createConnection();
			//啟動連結
			connection.start();
			//建立會話
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			//建立列隊
			queue = session.createQueue("cmjQueue");
			
			//建立生產者,將資訊生產到queue中
			producer = session.createProducer(queue);
			//設定生成資訊的策略.不序列化,實際看自己的專案業務情況而定,這裡只是學習
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			while(true){
				index++;
				//建立資訊
				MapMessage mapMessage = session.createMapMessage();
				mapMessage.setString("name", "曹明傑"+index);
				mapMessage.setString("age", "23");
				//傳送資訊
				System.out.println("訊息已傳送");
				producer.send(mapMessage);
				Thread.sleep(3000);
				session.commit();//提交,資訊真正的傳送
			}
		} catch (Exception e) {
			// TODO: handle exception
		}finally {
			try {
				if (null != connection)	 connection.close(); //關閉  
			 } catch (Throwable ignore){
				 
			 }
		}
				

	}

}


消費者:

package com.test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
	static String NAME = ActiveMQConnection.DEFAULT_USER;
	static String PSWD = ActiveMQConnection.DEFAULT_PASSWORD;
	static String URL = "tcp://localhost:61616";
	
	static ConnectionFactory connectionFactory = null;
	static Connection connection = null;
	static Session session;
	static Queue queue;
	static MessageConsumer consumer;

	public static void main(String[] args) {
		//建立連結
		connectionFactory = new ActiveMQConnectionFactory(NAME, PSWD, URL);
		try {
			 //建立連結
			connection = connectionFactory.createConnection();
			//啟動
			connection.start();
			//建立會話
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			//建立目的地即queue 
			queue = session.createQueue("cmjQueue");
			//建立消費者
			consumer = session.createConsumer(queue);
			System.out.println("開始消費");
			while(true){
				//等待3秒向訊息佇列中檢視是否有訊息,有就開始消費
				 MapMessage receive = (MapMessage)consumer.receive(3000);
				
				 System.out.println("消費資訊為:");
				 if(receive!=null){
					System.out.println(receive.getString("name"));
					System.out.println(receive.getString("age"));
				 }else{
					
				 }
			}
			
		} catch (Exception e) {
			// TODO: handle exception
		} finally {
			 try { 
				  if (null != connection) connection.close();
		     } catch (Throwable ignore){}
		}

	}

}

我們可以多起幾個執行緒進行消費。並檢視console輸出。

總結:

         queue是一對一的,即一個訊息只能被一個consumer消費,但是一個queue可以關聯好多個消費者,這時消費者也要排隊,先來者進行消費然後是下一個消費者。queue中的訊息被consumer申請並消費(自動拉取),息不能被重複消費,

         topic是廣播模式。即一對多,多個consumer同時進行消費,不過是topic主動的推送給消費者(自動推送),就好像我們訂閱的新聞資訊,自動推送給使用者檢視,資訊可以重複消費。即一個訊息傳送給多個consumer。