java使用activemq示例程式碼
阿新 • • 發佈:2018-11-12
一、點對點通訊
1、訊息傳送者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ProducerTest { private static final int SENDNUM = 10; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory;//連線工程 Connection connection = null;//連線 Session session;//會話 結束或簽字傳送訊息的執行緒 Destination destination;//訊息的目的地 MessageProducer messageProducer;//訊息生產者 try { //例項化連線工廠 connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616"); //通過連線工程獲取連線 connection = connectionFactory.createConnection(); connection.start();//啟動連線 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立session destination = session.createQueue("FirstQueue1");//建立佇列 messageProducer = session.createProducer(destination);//建立訊息生產者 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); }finally{ if(connection != null) connection.close(); } } public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < SENDNUM; i++) { TextMessage message = session.createTextMessage("ActiveMq 傳送訊息"+i); messageProducer.send(message); } } }
2、消費者
(1)非監聽器模式
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 訊息消費者 * @author Administrator * */ public class JMSConsumer { private static final String USERNAME="admin"; // 預設的連線使用者名稱 private static final String PASSWORD="admin"; // 預設的連線密碼 private static final String BROKEURL = "tcp://ip:61616"; public static void main(String[] args) { ConnectionFactory connectionFactory; // 連線工廠 Connection connection = null; // 連線 Session session; // 會話 接受或者傳送訊息的執行緒 Destination destination; // 訊息的目的地 MessageConsumer messageConsumer; // 訊息的消費者 // 例項化連線工廠 connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通過連線工廠獲取連線 connection.start(); // 啟動連線 session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session destination=session.createQueue("FirstQueue1"); // 建立連線的訊息佇列 messageConsumer=session.createConsumer(destination); // 建立訊息消費者 while(true){ TextMessage textMessage=(TextMessage)messageConsumer.receive(100000); if(textMessage!=null){ System.out.println("收到的訊息:"+textMessage.getText()); }else{ break; } } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
(2)、監聽器模式,消費者會實時訊息監聽,若有訊息則立即消費
a、監聽器:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 訊息監聽 * @author Administrator * */ public class Listener implements MessageListener{ @Override public void onMessage(Message message) { // TODO Auto-generated method stub try { System.out.println("收到的訊息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
b、消費者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 訊息消費者
* @author Administrator
*
*/
public class JMSConsumer2 {
private static final String USERNAME="admin"; // 預設的連線使用者名稱
private static final String PASSWORD="admin"; // 預設的連線密碼
private static final String BROKEURL="tcp://ip:61616"; // 預設的連線地址
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 連線工廠
Connection connection = null; // 連線
Session session; // 會話 接受或者傳送訊息的執行緒
Destination destination; // 訊息的目的地
MessageConsumer messageConsumer; // 訊息的消費者
// 例項化連線工廠
connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通過連線工廠獲取連線
connection.start(); // 啟動連線
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
destination=session.createQueue("FirstQueue1"); // 建立連線的訊息佇列
messageConsumer=session.createConsumer(destination); // 建立訊息消費者
messageConsumer.setMessageListener(new Listener()); // 註冊訊息監聽
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
二、訂閱模式
一個釋出多個訂閱者都會受到訊息,先訂閱後釋出
1、訊息生產者
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ProducerTest2 {
private static final int SENDNUM = 10;
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory;//連線工程
Connection connection = null;//連線
Session session;//會話 結束或簽字傳送訊息的執行緒
Topic createTopic;//訊息的目的地
MessageProducer messageProducer;//訊息生產者
try {
//例項化連線工廠
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://ip:61616");
//通過連線工程獲取連線
connection = connectionFactory.createConnection();
connection.start();//啟動連線
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立session
createTopic = session.createTopic("FirstTopic1");
messageProducer = session.createProducer(createTopic);//建立訊息生產者
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null)
connection.close();
}
}
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < 1; i++) {
TextMessage message = session.createTextMessage("ActiveMq 傳送訊息"+i);
messageProducer.send(message);
}
}
}
2、消費者(這邊就採用監聽器模式)
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 訊息消費者,訂閱模式,一個釋出多個訂閱者都會受到訊息,先訂閱後釋出
* @author Administrator
*
*/
public class JMSConsumer2 {
private static final String USERNAME="admin"; // 預設的連線使用者名稱
private static final String PASSWORD="admin"; // 預設的連線密碼
private static final String BROKEURL="tcp://ip:61616"; // 預設的連線地址
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 連線工廠
Connection connection = null; // 連線
Session session; // 會話 接受或者傳送訊息的執行緒
Destination destination; // 訊息的目的地
MessageConsumer messageConsumer; // 訊息的消費者
// 例項化連線工廠
connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通過連線工廠獲取連線
connection.start(); // 啟動連線
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
destination=session.createTopic("FirstTopic1");
messageConsumer=session.createConsumer(destination); // 建立訊息消費者
messageConsumer.setMessageListener(new Listener()); // 註冊訊息監聽
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
訂閱模式需要先訂閱才能消費到資訊,也就是若先啟動生產者進行生產訊息,在用消費者是無法接收到資訊,要先使用消費者訂閱完,再使用生產者才行
生產者釋出一個訊息,能夠被多個訂閱的消費者接收到