【AMQ】 二:點對點模式Dome
阿新 • • 發佈:2018-12-14
AMQ通訊分為兩種,一種是點對點模式,另一種是釋出訂閱模式,本文主要介紹點對點模式和簡單實現。
什麼是點對點模式? 點對點模式是AMQ的一種通過佇列方式通訊的模式, 即生產者會把生產的訊息放在某個佇列中,消費者從佇列中取得訊息進行通訊的方式。
基本實現:
生產者:
package www.amp.com; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * Created by wangpengzhi1 on 2018/10/11. */ public class QueueProducer { /** 預設使用者名稱 */ public static final String USERNAME = "admin"; /** 預設密碼 */ public static final String PASSWORD = "admin"; /** 預設連線地址(格式如:tcp://IP:61616) */ public static final String BROKER_URL = "tcp://192.168.198.138:61616"; /** 佇列名稱 */ public static final String QUEUE_NAME = "hello amq"; // 連線工廠(在AMQ中由ActiveMQConnectionFactory實現) private ConnectionFactory connectionFactory; // 連線物件 private Connection connection; // 會話物件 private Session session; // 訊息目的地(對於點對點模型,是Queue物件;對於釋出訂閱模型,是Topic物件;它們都繼承或實現了該介面) private Destination destination; // 訊息傳送(生產)者 private MessageProducer messageProducer; public static void main(String[] args) { QueueProducer producer = new QueueProducer(); producer.doSend(); } public void doSend() { try { /** * 1.建立連線工廠<br> * 建構函式有多個過載,預設連線本地MQ伺服器,也可以手動設定使用者名稱、密碼、連線地址資訊<br> * new ActiveMQConnectionFactory(userName, password, brokerURL) */ connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKER_URL); /** * 2.建立連線 */ connection = connectionFactory.createConnection(); /** * 3.啟動連線 */ connection.start(); /** * 4.建立會話<br> * param1:是否支援事務,若為true,則會忽略第二個引數,預設為SESSION_TRANSACTED<br> * param2:確認訊息模式,若第一個引數為false時,該引數有以下幾種狀態<br> * -Session.AUTO_ACKNOWLEDGE:自動確認。客戶端傳送和接收訊息不需要做額外的工作,即使接收端發生異常, * 也會被當作正常傳送成功 <br> * -Session.CLIENT_ACKNOWLEDGE:客戶端確認。客戶端接收到訊息後,必須呼叫message. * acknowledge() 方法給予收到反饋,JMS伺服器才會把該訊息當做傳送成功,並刪除<br> * -Session.DUPS_OK_ACKNOWLEDGE:副本確認。一旦接收端應用程式的方法呼叫從處理訊息處返回, * 會話物件就會確認訊息的接收,而且允許重複確認。 */ session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); /** * 5.建立(傳送)訊息目的地,即佇列,引數為佇列名稱 */ destination = session.createQueue(QUEUE_NAME); /** * 6.建立一個訊息生產者,並指定目的地 */ messageProducer = session.createProducer(destination); /** * 其他操作: 設定生產者的生產模式,預設為持久化<br> * 引數有以下兩種狀態:<br> * -DeliveryMode.NON_PERSISTENT:訊息不持久化,訊息被消費之後或者超時之後將從佇列中刪除 * -DeliveryMode.PERSISTENT:訊息會持久化,即使接收端消費訊息之後仍然會儲存 */ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); /** * 其他操作:設定訊息的存活時間(單位:毫秒) */ messageProducer.setTimeToLive(60000); for (int i = 0; i < 5; i++) { /** * 7.建立文字訊息<br> * 此外,還有多種型別的訊息如物件,位元組……都可以通過session.createXXXMessage()方法建立 */ TextMessage message = session.createTextMessage("send content:" + i); /** * 8. 傳送 */ messageProducer.send(message); } System.out.println("訊息傳送完成!"); /** * 如果有事務操作也可以提交事務 */ session.commit(); /** * 9.關閉生產者物件(即使關閉了程式也在執行) */ messageProducer.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { /** * 10.關閉連線(將會關閉程式) */ connection.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
消費者端:
package www.amp.com; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class QueueConsumer { private ConnectionFactory connectionFactory; private Connection connection; private Session session; private Destination destination; // 注意這裡是訊息接收(消費)者 private MessageConsumer messageConsumer; /** 預設使用者名稱 */ public static final String USERNAME = "admin"; /** 預設密碼 */ public static final String PASSWORD = "admin"; /** 預設連線地址(格式如:tcp://IP:61616) */ public static final String BROKER_URL = "tcp://192.168.198.138:61616"; public static void main(String[] args) { QueueConsumer consumer = new QueueConsumer(); consumer.doReceive(); } public void doReceive() { try { connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(QueueProducer.QUEUE_NAME); /** * 注意:這裡要建立一個訊息消費,並指定目的地(即訊息源佇列) */ messageConsumer = session.createConsumer(destination); // 方式一:監聽接收 receiveByListener(); // 方式二:阻塞接收 // receiveByManual(); /** * 注意:這裡不能再關閉物件了 */ // messageConsumer.close(); } catch (Exception e) { e.printStackTrace(); } finally { /** * 注意:這裡不能再關閉Connection了 */ // connection.close(); } } /** * 通過註冊監聽器的方式接收訊息,屬於被動監聽 */ private void receiveByListener() { try { messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { try { TextMessage msg = (TextMessage) message; System.out.println("Received:“" + msg.getText()+ "”"); // 可以通過此方法反饋訊息已收到 msg.acknowledge(); } catch (Exception e) { e.printStackTrace(); } } } }); } catch (Exception e) { e.printStackTrace(); } } /** * 通過手動去接收訊息的方式,屬於主動獲取 */ private void receiveByManual() { while (true) { try { /** * 通過receive()方法阻塞接收訊息,引數為超時時間(單位:毫秒) */ TextMessage message = (TextMessage) messageConsumer.receive(60000); if (message != null) { System.out.println("Received:“" + message.getText() + "”"); } } catch (Exception e) { e.printStackTrace(); } } } }