topic 和queue的區別和應用 activeMQ
剛接觸activeMQ,在這裡分析一下topic和queue的區別,在我這的理解,最基本的區別便是queue是佇列,訊息可以存在佇列裡面,只要觀察者去接收,這個訊息就被接收到了,就消失了,就像生產啤酒,生產了很多,看你什麼時候運走,然後topic就相當是廣播功能,在某個時刻廣播一條訊息,不論多少觀察者都能接收這條訊息,但是如果沒有一個觀察者,訊息就丟失了,所以queue是永遠只給一個人。
topic是併發的
queue是非同步的
給出topic的實現步驟:
傳送訊息:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SendTopic {
private static final int SEND_NUMBER = 5;
public static void sendMessage(Session session, MessageProducer producer) throws JMSException {
for ( int i = 1; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage("ActiveMq傳送的訊息" + i);
//傳送訊息到目的地方
System. out.println("傳送訊息:" + "ActiveMq 傳送的訊息" + i);
producer.send(message);
}
}
public static void main(String[] args) {
// ConnectionFactory:連線工廠,JMS用它建立連線
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的連線
Connection connection = null;
// Session:一個傳送或接收訊息的執行緒
Session session;
// Destination:訊息的目的地;訊息傳送給誰.
Destination destination;
// MessageProducer:訊息傳送者
MessageProducer producer;
// TextMessage message;
//構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection. DEFAULT_USER,
ActiveMQConnection. DEFAULT_PASSWORD,
"tcp://192.168.1.16:61616");
try {
//構造從工廠得到連線物件
connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取操作連線
session = connection.createSession( true, Session. AUTO_ACKNOWLEDGE);
//獲取session注意引數值FirstTopic是一個伺服器的topic(與queue訊息的傳送相比,這裡是唯一的不同)
destination = session.createTopic("FirstTopic");
//得到訊息生成者【傳送者】
producer = session.createProducer(destination);
//設定不持久化,此處學習,實際根據專案決定
producer.setDeliveryMode(DeliveryMode. PERSISTENT);
//構造訊息,此處寫死,專案就是引數,或者方法獲取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if ( null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
接收訊息
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ReceiveTopic implements Runnable {
private String threadName;
ReceiveTopic(String threadName) {
this.threadName = threadName;
}
public void run() {
// ConnectionFactory:連線工廠,JMS用它建立連線
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的連線
Connection connection = null;
// Session:一個傳送或接收訊息的執行緒
Session session;
// Destination:訊息的目的地;訊息傳送給誰.
Destination destination;
//消費者,訊息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection. DEFAULT_USER,
ActiveMQConnection. DEFAULT_PASSWORD,"tcp://192.168.1.16:61616");
try {
//構造從工廠得到連線物件
connection = connectionFactory.createConnection();
//啟動
connection.start();
//獲取操作連線,預設自動向伺服器傳送接收成功的響應
session = connection.createSession( false, Session. AUTO_ACKNOWLEDGE);
//獲取session注意引數值FirstTopic是一個伺服器的topic
destination = session.createTopic("FirstTopic");
consumer = session.createConsumer(destination);
while ( true) {
//設定接收者接收訊息的時間,為了便於測試,這裡設定為100s
TextMessage message = (TextMessage) consumer
.receive(100 * 1000);
if ( null != message) {
System. out.println("執行緒"+threadName+"收到訊息:" + message.getText());
} else {
continue;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if ( null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
public static void main(String[] args) {
//這裡啟動3個執行緒來監聽FirstTopic的訊息,與queue的方式不一樣三個執行緒都能收到同樣的訊息
ReceiveTopic receive1= new ReceiveTopic("thread1");
ReceiveTopic receive2= new ReceiveTopic("thread2");
ReceiveTopic receive3= new ReceiveTopic("thread3");
Thread thread1= new Thread(receive1);
Thread thread2= new Thread(receive2);
Thread thread3= new Thread(receive3);
thread1.start();
thread2.start();
thread3.start();
}
}