1. 程式人生 > >java使用activemq示例程式碼

java使用activemq示例程式碼

一、點對點通訊

1、訊息傳送者    

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTest {

	private static final int SENDNUM = 10;
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory;//連線工程
		Connection connection = null;//連線
		Session session;//會話 結束或簽字傳送訊息的執行緒
		Destination destination;//訊息的目的地
		MessageProducer messageProducer;//訊息生產者
		try {
			
			//例項化連線工廠
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616");
			//通過連線工程獲取連線
			connection = connectionFactory.createConnection();
			connection.start();//啟動連線
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立session
			destination = session.createQueue("FirstQueue1");//建立佇列
			messageProducer = session.createProducer(destination);//建立訊息生產者
			
			sendMessage(session, messageProducer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null)
				connection.close();
		}
		
	}
	
	public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
		for (int i = 0; i < SENDNUM; i++) {
			TextMessage message = session.createTextMessage("ActiveMq 傳送訊息"+i);
			messageProducer.send(message);
		}
	}
}

2、消費者

  (1)非監聽器模式

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訊息消費者
 * @author Administrator
 *
 */
public class JMSConsumer {

	private static final String USERNAME="admin"; // 預設的連線使用者名稱
	private static final String PASSWORD="admin"; // 預設的連線密碼
	private static final String BROKEURL = "tcp://ip:61616";
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; // 連線工廠
		Connection connection = null; // 連線
		Session session; // 會話 接受或者傳送訊息的執行緒
		Destination destination; // 訊息的目的地
		MessageConsumer messageConsumer; // 訊息的消費者
		// 例項化連線工廠
		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
		try {
			connection=connectionFactory.createConnection();  // 通過連線工廠獲取連線
			connection.start(); // 啟動連線
			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
			destination=session.createQueue("FirstQueue1");  // 建立連線的訊息佇列
			messageConsumer=session.createConsumer(destination); // 建立訊息消費者
			while(true){
				TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
				if(textMessage!=null){
					System.out.println("收到的訊息:"+textMessage.getText());
				}else{
					break;
				}
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
	}
}

(2)、監聽器模式,消費者會實時訊息監聽,若有訊息則立即消費

     a、監聽器:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 訊息監聽
 * @author Administrator
 *
 */
public class Listener implements MessageListener{

	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		try {
			System.out.println("收到的訊息:"+((TextMessage)message).getText());
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

b、消費者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訊息消費者
 * @author Administrator
 *
 */
public class JMSConsumer2 {

	private static final String USERNAME="admin"; // 預設的連線使用者名稱
	private static final String PASSWORD="admin"; // 預設的連線密碼
	
	private static final String BROKEURL="tcp://ip:61616"; // 預設的連線地址
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; // 連線工廠
		Connection connection = null; // 連線
		Session session; // 會話 接受或者傳送訊息的執行緒
		Destination destination; // 訊息的目的地
		MessageConsumer messageConsumer; // 訊息的消費者
		
		// 例項化連線工廠
		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
				
		try {
			connection=connectionFactory.createConnection();  // 通過連線工廠獲取連線
			connection.start(); // 啟動連線
			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
			destination=session.createQueue("FirstQueue1");  // 建立連線的訊息佇列
			messageConsumer=session.createConsumer(destination); // 建立訊息消費者
			messageConsumer.setMessageListener(new Listener()); // 註冊訊息監聽
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
	}
}

二、訂閱模式

一個釋出多個訂閱者都會受到訊息,先訂閱後釋出

1、訊息生產者


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ProducerTest2 {

	private static final int SENDNUM = 10;
	public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory;//連線工程
		Connection connection = null;//連線
		Session session;//會話 結束或簽字傳送訊息的執行緒
		Topic createTopic;//訊息的目的地
		MessageProducer messageProducer;//訊息生產者
		try {
			
			//例項化連線工廠
			
			connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616");
			//通過連線工程獲取連線
			connection = connectionFactory.createConnection();
			connection.start();//啟動連線
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立session
			createTopic = session.createTopic("FirstTopic1");
			messageProducer = session.createProducer(createTopic);//建立訊息生產者
			
			sendMessage(session, messageProducer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			if(connection != null)
				connection.close();
		}
		
	}
	
	public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
		for (int i = 0; i < 1; i++) {
			TextMessage message = session.createTextMessage("ActiveMq 傳送訊息"+i);
			messageProducer.send(message);
		}
	}
}

2、消費者(這邊就採用監聽器模式)

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 訊息消費者,訂閱模式,一個釋出多個訂閱者都會受到訊息,先訂閱後釋出
 * @author Administrator
 *
 */
public class JMSConsumer2 {

	private static final String USERNAME="admin"; // 預設的連線使用者名稱
	private static final String PASSWORD="admin"; // 預設的連線密碼
	private static final String BROKEURL="tcp://ip:61616"; // 預設的連線地址
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; // 連線工廠
		Connection connection = null; // 連線
		Session session; // 會話 接受或者傳送訊息的執行緒
		Destination destination; // 訊息的目的地
		MessageConsumer messageConsumer; // 訊息的消費者
		
		// 例項化連線工廠
		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
				
		try {
			connection=connectionFactory.createConnection();  // 通過連線工廠獲取連線
			connection.start(); // 啟動連線
			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
			destination=session.createTopic("FirstTopic1"); 
			messageConsumer=session.createConsumer(destination); // 建立訊息消費者
			messageConsumer.setMessageListener(new Listener()); // 註冊訊息監聽
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
	}
}

訂閱模式需要先訂閱才能消費到資訊,也就是若先啟動生產者進行生產訊息,在用消費者是無法接收到資訊,要先使用消費者訂閱完,再使用生產者才行

生產者釋出一個訊息,能夠被多個訂閱的消費者接收到