架構設計:系統間通訊(21)——ActiveMQ的安裝與使用
1、前言
之前我們通過兩篇文章(架構設計:系統間通訊(19)——MQ:訊息協議(上)、架構設計:系統間通訊(20)——MQ:訊息協議(下))從理論層面上為大家介紹了訊息協議的基本定義,並花了較大篇幅向讀者介紹了三種典型的訊息協議:XMPP協議、Stomp協議和AMQP協議。本小節開始,我們基於之前的知識點講解這些協議在具體的“訊息佇列中介軟體”中是如何被我們操作的。由於本人在實際工作中經常使用ActiveMQ和RabbitMQ,所以就選取這兩個“訊息佇列中介軟體”進行講解。如果讀者可以補充其他“訊息佇列中介軟體”的使用,那當然是再好不過了。
2、ActiveMQ的安裝和使用
ActiveMQ是Apache軟體基金會的開源產品,支援AMQP協議、MQTT協議(和XMPP協議作用類似)、Openwire協議和Stomp協議等多種訊息協議。並且ActiveMQ完整支援JMS API介面規範(當然Apache也提供多種其他語言的客戶端,例如:C、C++、C#、Ruby、Perl)。
2-1、ActiveMQ的安裝
在本文釋出之時,ActiveMQ最新的版本號是5.13.2(版本號升級很快,不過並不推薦使用最新的版本)。由ActiveMQ的安裝是很簡單,所以這個過程並不值得我們花很大篇幅進行討論。具體的過程就是:下載->解壓->配置環境變數->執行:
- 下載軟體
- 解壓安裝
將下載的安裝包放置在root使用者的home目錄內,解壓即可(當然您可以根據自己的需要加壓到不同的檔案路徑下)。如下所示:
[root@localhost ~]# tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz
以上解壓使用的是root使用者,這是為了演示方便。正式環境中還是建議禁用root使用者,為activeMQ的執行專門建立一個使用者和使用者組。
- 配置環境變數(不是必須的)
如果您只是在測試環境使用Apache ActiveMQ,以便熟悉訊息中介軟體本身的特性和使用方式。那麼您無需對解壓後的軟體進行任何配置,所有可執行的命令都在軟體安裝目錄的./bin目錄下。為了使用方便,最好配置一下環境變數,如下所示(注意,根據您自己的軟體安裝位置,環境變數的設定是不一樣的,請不要盲目貼上複製):
設定該次會話的環境變數:
[root@localhost ~]# export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;
永久設定環境變數:
[root@localhost ~]# echo "export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;" >> /etc/profile
在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 針對作業系統進行了更深入的優化,所以您可以看到./bin目錄下,有一個針對32位Linux執行命令的./linux-x86-32目錄,和針對64位Linux執行命令的./linux-x86-64目錄。請按照您自己的情況進行環境變數設定和命令執行。
- 執行程式
現在您可以在任何目錄,執行activemq命令了。注意activemq命令一共有6個引數(console | start | stop | restart | status | dump),啟動Apache ActiveMQ使用的命令是activemq start:
[root@localhost ~]# activemq start
如果啟動成功,就可以在瀏覽器上訪問服務節點在8161埠的管理頁面了(例如http://localhost:8161):
點選‘manage ActiveMQ broker’連線,可以進入管理主介面(預設的使用者和密碼都是admin)。以上就是Apache ActiveMQ訊息中介軟體最簡的安裝和執行方式。在後續的文章中,我們會陸續討論ActiveMQ的叢集和高效能優化,那時會介紹對應的ActiveMQ的配置問題。
2-2、ActiveMQ的其他命令引數
如同上文講到的,activemq命令除了start引數用於啟動activemq程式以外,還有另外5個引數可以使用:console | stop | restart | status | dump。他們代表的使用意義是:
stop:停止當前ActiveMQ節點的執行。
restart:重新啟動當前ActiveMQ節點。
status:檢視當前ActiveMQ節點的執行狀態。如果當前ActiveMQ節點沒有執行,那麼將返回“ActiveMQ Broker is not running”的提示資訊。注意,status命令只能告訴開發人員當前節點時停止的還是執行的,除此之外不能從status命令獲取更多的資訊。例如,ActiveMQ為什麼建立Queue失敗?當前ActiveMQ使用了多少記憶體?而要獲取這些資訊,需要使用以下引數啟動ActiveMQ節點。
console:使用控制檯模式啟動ActiveMQ節點;在這種模式下,開發人員可以除錯、監控當前ActivieMQ節點的實時情況,並獲取實時狀態。
dump:如果您採用console模式執行ActiveMQ,那麼就可以使用dump引數,在console控制檯上獲取當前ActiveMQ節點的執行緒狀態快照。
2-3、在ActiveMQ中傳遞Stomp訊息
好吧,既然我們已經討論過如何安裝和執行ActiveMQ,也討論了Stomp協議的組織結構,為什麼我們不立即動手試一試操作ActiveMQ承載Stomp協議的訊息呢?
下面我們使用ActiveMQ提供的JAVA 客戶端(實際上就是ActiveMQ對JMS規範的實現),向ActiveMQ中的Queue(示例程式碼中將這個Queue命名為’test’)傳送一條Stomp協議訊息,然後再使用JAVA語言的客戶端,從ActiveMQ上接受這條訊息:
- 使用ActiveMQ的API傳送Stomp協議訊息:
package mq.test.stomp;
import java.net.Socket;
import java.util.Date;
import org.apache.activemq.transport.stomp.StompConnection;
// 訊息生產者
public class TestProducer {
public static void main(String[] args) {
try {
// 建立Stomp協議的連線
StompConnection con = new StompConnection();
Socket so = new Socket("192.168.61.138", 61613);
con.open(so);
// 注意,協議版本可以是1.2,也可以是1.1
con.setVersion("1.2");
// 使用者名稱和密碼,這個不必多說了
con.connect("admin", "admin");
// 以下發送一條資訊(您也可以使用“事務”方式)
con.send("/test", "234543" + new Date().getTime());
} catch(Exception e) {
e.printStackTrace(System.out);
}
}
}
- 使用ActiveMQ的API接收Stomp協議訊息:
package mq.test.stomp;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
public class TestConsumer {
public static void main(String[] args) throws Exception {
// 建立連線
StompConnection con = new StompConnection();
Socket so = new Socket("192.168.61.138", 61613);
con.open(so);
con.setVersion("1.2");
con.connect("admin", "admin");
String ack = "client";
con.subscribe("/test", "client");
// 接受訊息(使用迴圈進行)
for(;;) {
StompFrame frame = null;
try {
// 注意,如果沒有接收到訊息,
// 這個消費者執行緒會停在這裡,直到本次等待超時
frame = con.receive();
} catch(SocketTimeoutException e) {
continue;
}
// 列印本次接收到的訊息
System.out.println("frame.getAction() = " + frame.getAction());
Map<String, String> headers = frame.getHeaders();
String meesage_id = headers.get("message-id");
System.out.println("frame.getBody() = " + frame.getBody());
System.out.println("frame.getCommandId() = " + frame.getCommandId());
// 在ack是client標記的情況下,確認訊息
if("client".equals(ack)) {
con.ack(meesage_id);
}
}
}
}
以上分別是使用Activie提供的Stomp協議的訊息生產端和Stomp協議的訊息消費端的程式碼(如果您不清楚Stomp協議的細節,可以參考我另一篇文章:《架構設計:系統間通訊(19)——MQ:訊息協議(上)》)。請注意在程式碼片段中,並沒有出現任何一個帶有jms名稱的包或者類——這是因為ActiveMQ為Stomp協議提供的JAVA API在內部進行了JMS規範的封裝。
您可以檢視activemq-stomp中關於協議轉換部分的原始碼:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父級介面:org.apache.activemq.transport.stomp.FrameTranslator來驗證這件事情(關於ActiveMQ對JMS規範的實現設計,如果後續有時間再回頭進行講解)。
以下是Stomp協議的消費者端的執行效果(在生產者端已經向ActiveMQ插入了一條訊息之後):
frame.getAction() = MESSAGE
frame.getBody() = 2345431458460073204
frame.getCommandId() = 0
注意,由於訊息體中插入了一個時間戳,所以您複製貼上程式碼後執行效果並不會和我的演示程式完全一致。
2-4、ActiveMQ中的Queue和Topics
如果您細心的話,在ActiveMQ提供的管理頁面上已經看到有兩個功能頁面:Queue和Topic。Queue和Topic是JMS為開發人員提供的兩種不同工作機制的訊息佇列。 在ActiveMQ官方的解釋是:
- Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
中文的可以譯做:JMS-Topic 佇列基於“訂閱-釋出”模式,當操作者釋出一條訊息後,所有對這條訊息感興趣的訂閱者都可以收到它——也就是說這條訊息會被拷貝成多份,進行分發。只有當前“活動的”訂閱者能夠收到訊息(換句話說,如果當前JMS-Topic佇列中沒有訂閱者,這條訊息將被丟棄)。
- Queue
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
So Queues implement a reliable load balancer in JMS.
中文的可以譯做:JMS-Queue是一種“負載均衡模式”的實現。一個訊息能且只能被一個消費者接受。如果當前JMS-Queue中沒有任何的消費者,那麼這條訊息將會被Queue儲存起來(實際應用中可以儲存在磁碟上,也可以儲存在資料庫中,看軟體的配置),直到有一個消費者連線上。另外,如果消費者在接受到訊息後,在他斷開與JMS-Queue連線之前,沒有傳送ack資訊(可以是客戶端手動傳送,也可以是自動傳送),那麼這條訊息將被髮送給其他消費者。
以下表格摘自網際網路上的資料,基本上把Queue和Topic這兩種佇列的不同特性說清楚了:
比較專案 | Topic 模式佇列 | Queue 模式佇列 |
---|---|---|
工作模式 | “訂閱-釋出”模式,如果當前沒有訂閱者,訊息將會被丟棄。如果有多個訂閱者,那麼這些訂閱者都會收到訊息 | “負載均衡”模式,如果當前沒有消費者,訊息也不會丟棄;如果有多個消費者,那麼一條訊息也只會傳送給其中一個消費者,並且要求消費者ack資訊。 |
有無狀態 | 無狀態 | Queue資料預設會在mq伺服器上以檔案形式儲存,比如Active MQ一般儲存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB儲存。 |
傳遞完整性 | 如果沒有訂閱者,訊息會被丟棄 | 訊息不會丟棄 |
處理效率 | 由於訊息要按照訂閱者的數量進行復制,所以處理效能會隨著訂閱者的增加而明顯降低,並且還要結合不同訊息協議自身的效能差異 | 由於一條訊息只發送給一個消費者,所以就算消費者再多,效能也不會有明顯降低。當然不同訊息協議的具體效能也是有差異的 |
2-5、JMS和協議間轉換
上文已經說到,JMS這套面向訊息通訊的 JAVA API 是一個和廠商無關的規範。通過JMS,我們能實現不同訊息中介軟體廠商、不同協議間的轉換和互動。這一小節我們就來討論一下這個問題。如果用一張圖來表示JMS在訊息中介軟體中的作用話,那麼就可以這麼來畫:
首先您使用的MQ訊息中介軟體需要實現了JMS規範;那麼通過JMS規範,開發人員可以忽略各種訊息協議的細節,只要訊息在同一佇列中,就能夠保證各種訊息協議間實現互相轉換。下面我們首先來看一個使用JMS API在ActiveMQ中操作openwire協議訊息的簡單示例,然後再給出一個通過JMS,實現Stomp訊息協議和Openwire訊息協議間的互轉示例。
2-5-1、JMS操作
- 以下程式碼使用向某個Queue(命名為test)中傳送一條訊息:
package jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 測試使用JMS API連線ActiveMQ
* @author yinwenjie
*/
public class JMSProducer {
/**
* 由於是測試程式碼,這裡忽略了異常處理。
* 正是程式碼可不能這樣做
* @param args
* @throws RuntimeException
*/
public static void main (String[] args) throws Exception {
// 定義JMS-ActiveMQ連線資訊(預設為Openwire協議)
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
Session session = null;
Destination sendQueue;
Connection connection = null;
//進行連線
connection = connectionFactory.createQueueConnection();
connection.start();
//建立會話(設定一個帶有事務特性的會話)
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//建立queue(當然如果有了就不會重複建立)
sendQueue = session.createQueue("/test");
//建立訊息傳送者物件
MessageProducer sender = session.createProducer(sendQueue);
TextMessage outMessage = session.createTextMessage();
outMessage.setText("這是傳送的訊息內容");
//傳送(JMS是支援事務的)
sender.send(outMessage);
session.commit();
//關閉
sender.close();
connection.close();
}
}
當以上程式碼執行到“start”的位置時,我們可以通過觀察ActiveMQ管理介面中connection列表中的連線資訊,發現訊息生產者已經建立了一個Openwire協議的連線:
從而確定我們通過JMS API建立了一個openwire協議的通訊連線。接著我們使用以下程式碼,建立一個基於openwire協議的“消費者”。注意:訊息生產者和訊息消費者,對映的佇列必須一致。(在示例程式碼中,它們都對映名稱為test的JMS-Queue)
- 以下程式碼使用JMS從某個Queue中接收訊息:
package jms;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 測試使用JMS API連線ActiveMQ
* @author yinwenjie
*/
public class JMSConsumer {
/**
* 由於是測試程式碼,這裡忽略了異常處理。
* 正是程式碼可不能這樣做
* @param args
* @throws RuntimeException
*/
public static void main (String[] args) throws Exception {
// 定義JMS-ActiveMQ連線資訊
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.61.138:61616");
Session session = null;
Destination sendQueue;
Connection connection = null;
//進行連線
connection = connectionFactory.createQueueConnection();
connection.start();
//建立會話(設定為自動ack)
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立Queue(當然如果有了就不會重複建立)
sendQueue = session.createQueue("/test");
//建立訊息傳送者物件
MessageConsumer consumer = session.createConsumer(sendQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message arg0) {
// 接收到訊息後,不需要再發送ack了。
System.out.println("Message = " + arg0);
}
});
synchronized (JMSConsumer.class) {
JMSConsumer.class.wait();
}
//關閉
consumer.close();
connection.close();
}
}
當以上“消費者”程式碼執行到start的位置時,我們通過ActiveMQ提供的管理介面可以看到,基於Openwire協議的連線增加到了兩條:
注意,您在執行以上測試程式碼時,不用和我的執行順序一致。由於Queue模式的佇列是要進行訊息狀態儲存的,所以無論您是先執行“消費者”端,還是先執行“生產者”端,最後“消費者”都會收到一條訊息。類似如下的效果:
Message = ActiveMQTextMessage {commandId = 6, responseRequired = false, messageId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:yinwenjie-240-60482-1458616972423-1:1:1:1, destination = queue:///test, transactionId = TX:ID:yinwenjie-240-60482-1458616972423-1:1:1, expiration = 0, timestamp = 1458617840154, arrival = 0, brokerInTime = 1458617840166, brokerOutTime = 1458617840187, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@66968df8, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 這是傳送的訊息內容}
2-5-2、協議間轉換
下面我們將Openwire協議的訊息通過JMS送入Queue佇列,並且讓基於Stomp協議的消費者接收到這條訊息。為了節約篇幅,基於Openwire協議的生產者的程式碼請參考上一小節2-5-1中“生產者”的程式碼片段。這裡只列出Stomp訊息的接受者程式碼(實際上這段程式碼在上文中也可以找到):
- Stomp協議的訊息消費者(訊息接收者):
package mq.test.stomp;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Map;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.transport.stomp.StompFrame;
public class TestConsumer {
public static void main(String[] args) throws Exception {
// 建立連線(注意,Stomp協議的連線埠是61613)
StompConnection con = new StompConnection();
Socket so = new Socket("192.168.61.138", 61613);
con.open(so);
con.setVersion("1.2");
con.connect("admin", "admin");
String ack = "client";
con.subscribe("/test", "client");
// 接受訊息(使用迴圈進行)
for(;;) {
StompFrame frame = null;
try {
// 注意,如果沒有接收到訊息,
// 這個消費者執行緒會停在這裡,直到本次等待超時
frame = con.receive();
} catch(SocketTimeoutException e) {
continue;
}
// 列印本次接收到的訊息
System.out.println("frame.getAction() = " + frame.getAction());
Map<String, String> headers = frame.getHeaders();
String meesage_id = headers.get("message-id");
System.out.println("frame.getBody() = " + frame.getBody());
System.out.println("frame.getCommandId() = " + frame.getCommandId());
// 在ack是client模式的情況下,確認訊息
if("client".equals(ack)) {
con.ack(meesage_id);
}
}
}
}
當您同時執行Openwire訊息傳送者和Stomp訊息接收者時,您可以在ActiveMQ的管理介面看到這兩種協議的連線資訊:
以下是Stomp協議消費者接收到的訊息內容(經過轉換的openwire協議訊息):
frame.getAction() = MESSAGE
frame.getBody() = 這是傳送的訊息內容
frame.getCommandId() = 0
接下文