訊息佇列activemq入門教程
阿新 • • 發佈:2019-02-10
關於web系統中佇列的使用參看文章http://jinnianshilongnian.iteye.com/blog/2321715
前言:
訊息佇列僅僅是佇列中的一個分支應用,使用場景:各個微服務之間的通訊,包括資料非同步同步等等場合,比如下訂單業務,可能涉及到使用者中心服務,訂單服務,財務服務,產品服務,倉庫服務等微服務;傳統做法是寫大量的介面,各個服務間呼叫介面來進行建立訂單、付款、支付、扣減庫存等等操作,一方面大量介面後期難以維護,可能改一個介面會導致整個訂單流程掛掉;另一方面介面之間會出現呼叫失敗、相應超時等問題。在這種情況下,引入訊息佇列,可以解決這些問題。通過訊息佇列可實現非同步處理、系統解耦。
1.下載ActiveMQ
http://activemq.apache.org/
通過url可以看出來,apache出品~
2.安裝使用
解壓下載的zip檔案,執行bin目錄的activemq.bat。啟動後,登陸:http://localhost:8161/admin/,使用者名稱和密碼都是admin
3.例項
程式需要的jar包如下:
activemq-client-5.9.0.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
log4j-1.2.17.jar
slf4j-api-1.7.5.jar
slf4j-log4j12-1.7.5.jar
我這裡模擬生產者/消費者,或者可以叫生產者/接受者。
先寫傳送模組
接收模組import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 測試activeMQ訊息 * * @author *** * */ public class Sender { private static final int SEND_NUMBER = 5; public void sendMessage() { // 連線工廠,JMS 用它建立連線 ConnectionFactory connectionFactory; // JMS 客戶端到JMS Provider 的連線 Connection connection = null; // 一個傳送或接收訊息的執行緒 Session session; // 訊息的目的地 Destination destination; // 訊息傳送者 MessageProducer producer; // TextMessage message; // 構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 構造從工廠得到連線物件 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連線 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 獲取session,注意引數值,接收要同樣的Queue destination = session.createQueue("FirstQueue"); // 得到訊息生成者 producer = session.createProducer(destination); // 設定不持久化,此處學習,實際根據專案決定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 構造訊息,此處寫死,專案就是引數,或者方法獲取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } private void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMq 傳送的訊息" + i); producer.send(message); } } }
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
public static void main(String[] args) {
// 連線工廠,JMS 用它建立連線
ConnectionFactory connectionFactory;
// JMS 客戶端到JMS Provider 的連線
Connection connection = null;
// 一個傳送或接收訊息的執行緒
Session session;
// 訊息的目的地
Destination destination;
// 訊息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
try {
// 構造從工廠得到連線物件
connection = connectionFactory.createConnection();
// 啟動
connection.start();
// 獲取操作連線
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 獲取session
destination = session.createQueue("FirstQueue");
consumer = session.createConsumer(destination);
while (true) {
// 設定接收者接收訊息的時間
// 0意思是程式會一直執行
TextMessage message = (TextMessage) consumer.receive(0);
if (null != message) {
System.out.println("收到訊息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
測試的時候,可以在Sender.java中增加一個main方法,先呼叫sendMessage(),然後啟動Receiver.java,可以在控制檯看到打印出了接受的值。
也可以登入http://localhost:8161/admin/檢視傳送接收的訊息。
我自己覺得activemq就是一個轉發伺服器,具體技術細節待後續研究。