1. 程式人生 > >ActiveMQ(中文)參考手冊

ActiveMQ(中文)參考手冊

1 JMS

在介紹ActiveMQ之前,首先簡要介紹一下JMS規範。

1.1 JMS的基本構件

1.1.1 連線工廠

連線工廠是客戶用來建立連線的物件,例如ActiveMQ提供的ActiveMQConnectionFactory。

1.1.2 連線

JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連線。

1.1.3 會話

JMS Session是生產和消費訊息的一個單執行緒上下文。會話用於建立訊息生產者(producer)、訊息消費者(consumer)和訊息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組傳送和接收被組合到了一個原子操作中。

1.1.4 目的地

目的地是客戶用來指定它生產的訊息的目標和它消費的訊息的來源的物件。JMS1.0.2規範中定義了兩種訊息傳遞域:點對點(PTP)訊息傳遞域和釋出/訂閱訊息傳遞域。 點對點訊息傳遞域的特點如下:
• 每個訊息只能有一個消費者。
• 訊息的生產者和消費者之間沒有時間上的相關性。無論消費者在生產者傳送訊息的時候是否處於執行狀態,它都可以提取訊息。
釋出/訂閱訊息傳遞域的特點如下:
• 每個訊息可以有多個消費者。
• 生產者和消費者之間有時間上的相關性。訂閱一個主題的消費者只能消費自它訂閱之後釋出的訊息。JMS規範允許客戶建立持久訂閱,這在一定程度上放鬆了時間上的相關性要求。持久訂閱允許消費者消費它在未處於啟用狀態時傳送的訊息。
在點對點訊息傳遞域中,目的地被成為佇列(queue);在釋出/訂閱訊息傳遞域中,目的地被成為主題(topic)。

 1.1.5 訊息生產者

訊息生產者是由會話建立的一個物件,用於把訊息傳送到一個目的地。

1.1.6 訊息消費者

訊息消費者是由會話建立的一個物件,它用於接收發送到目的地的訊息。訊息的消費可以採用以下兩種方法之一:
• 同步消費。通過呼叫消費者的receive方法從目的地中顯式提取訊息。receive方法可以一直阻塞到訊息到達。
• 非同步消費。客戶可以為消費者註冊一個訊息監聽器,以定義在訊息到達時所採取的動作。

1.1.7 訊息

JMS訊息由以下三部分組成的:
• 訊息頭。每個訊息頭欄位都有相應的getter和setter方法。
• 訊息屬性。如果需要除訊息頭欄位以外的值,那麼可以使用訊息屬性。
• 訊息體。JMS定義的訊息型別有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

1.2JMS的可靠性機制

1.2.1 確認JMS訊息

只有在被確認之後,才認為已經被成功地消費了。訊息的成功消費通常包含三個階段:客戶接收訊息、客戶處理訊息和訊息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,訊息何時被確認取決於建立會話時的應答模式(acknowledgement mode)。該引數有以下三個可選值:
• Session.AUTO_ACKNOWLEDGE。當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的訊息。
• Session.CLIENT_ACKNOWLEDGE。客戶通過訊息的acknowledge方法確認訊息。需要注意的是,在這種模式中,確認是在會話層上進行:確認一個被消費的訊息將自動確認所有已被會話消費的訊息。例如,如果一個訊息消費者消費了10個訊息,然後確認第5個訊息,那麼所有10個訊息都被確認。
• Session.DUPS_ACKNOWLEDGE。該選擇只是會話遲鈍的確認訊息的提交。如果JMS Provider失敗,那麼可能會導致一些重複的訊息。如果是重複的訊息,那麼JMS Provider必須把訊息頭的JMSRedelivered欄位設定為true。

1.2.2 永續性

JMS 支援以下兩種訊息提交模式:
• PERSISTENT。指示JMS Provider持久儲存訊息,以保證訊息不會因為JMS Provider的失敗而丟失。
• NON_PERSISTENT。不要求JMS Provider持久儲存訊息。

1.2.3 優先順序

可以使用訊息優先順序來指示JMS Provider首先提交緊急的訊息。優先順序分10個級別,從0(最低)到9(最高)。如果不指定優先順序,預設級別是4。需要注意的是,JMS Provider並不一定保證按照優先順序的順序提交訊息。

1.2.4 訊息過期

可以設定訊息在一定時間後過期,預設是永不過期。

1.2.5 臨時目的地

可以通過會話上的createTemporaryQueue方法和createTemporaryTopic方法來建立臨時目的地。它們的存在時間只限於建立它們的連線所保持的時間。只有建立該臨時目的地的連線上的訊息消費者才能夠從臨時目的地中提取訊息。

1.2.6 持久訂閱

首先訊息生產者必須使用PERSISTENT提交訊息。客戶可以通過會話上的createDurableSubscriber方法來建立一個持久訂閱,該方法的第一個引數必須是一個topic,第二個引數是訂閱的名稱。 JMS Provider會儲存釋出到持久訂閱對應的topic上的訊息。如果最初建立持久訂閱的客戶或者任何其它客戶使用相同的連線工廠和連線的客戶ID、相同的主題和相同的訂閱名再次呼叫會話上的createDurableSubscriber方法,那麼該持久訂閱就會被啟用。JMS Provider會象客戶傳送客戶處於非啟用狀態時所釋出的訊息。 持久訂閱在某個時刻只能有一個啟用的訂閱者。持久訂閱在建立之後會一直保留,直到應用程式呼叫會話上的unsubscribe方法。

1.2.7 本地事務

在一個JMS客戶端,可以使用本地事務來組合訊息的傳送和接收。JMS Session介面提供了commit和rollback方法。事務提交意味著生產的所有訊息被髮送,消費的所有訊息被確認;事務回滾意味著生產的所有訊息被銷燬,消費的所有訊息被恢復並重新提交,除非它們已經過期。 事務性的會話總是牽涉到事務處理中,commit或rollback方法一旦被呼叫,一個事務就結束了,而另一個事務被開始。關閉事務性會話將回滾其中的事務。 需要注意的是,如果使用請求/回覆機制,即傳送一個訊息,同時希望在同一個事務中等待接收該訊息的回覆,那麼程式將被掛起,因為知道事務提交,傳送操作才會真正執行。 需要注意的還有一個,訊息的生產和消費不能包含在同一個事務中。

1.3JMS 規範的變遷

JMS的最新版本的是1.1。它和同1.0.2版本之間最大的差別是,JMS1.1通過統一的訊息傳遞域簡化了訊息傳遞。這不僅簡化了JMS API,也有利於開發人員靈活選擇訊息傳遞域,同時也有助於程式的重用和維護。 以下是不同訊息傳遞域的相應介面:
JMS公共                              點對點域                                                 釋出/訂閱域
ConnectionFactory            QueueConnectionFactory                 TopicConnectionFactory
Connection                          QueueConnection                              TopicConnection
Destination                          Queue                                                   Topic
Session                                QueueSession                                    TopicSession
MessageProducer             QueueSender                                                TopicPublisher
MessageConsumer           QueueReceiver                                    TopicSubscriber

2.ActiveMQ

2.1Broker

2.1.1 執行Broker

ActiveMQ 5.0 的二進位制釋出包中bin目錄中包含一個名為activemq的指令碼,直接執行這個指令碼就可以啟動一個broker。 此外也可以通過Broker Configuration URI或Broker XBean URI對broker進行配置,以下是一些命令列引數的例子:

Example

Description

activemq

Runs a broker using the default 'xbean:activemq.xml' as the broker configuration file.

activemq xbean:myconfig.xml

Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath.

activemq xbean:file:./conf/broker1.xml

Runs a broker using the file broker1.xml as the broker configuration file that is located in the relative file path ./conf/broker1.xml

activemq xbean:file:C:/ActiveMQ/conf/broker2.xml

Runs a broker using the file broker2.xml as the broker configuration file that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml

activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true

Runs a broker with two transport connectors and JMX enabled.

activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false

Runs a broker with 1 transport connector and 1 network connector with persistence disabled.

2.1.2 嵌入式Broker

可以通過在應用程式中以編碼的方式啟動broker,例如:
Java程式碼
1. BrokerService broker = new BrokerService();
2. broker.addConnector("tcp://localhost:61616");
3. broker.start();
如果需要啟動多個broker,那麼需要為broker設定一個名字。例如:
Java程式碼
1. BrokerService broker = new BrokerService();
2. broker.setName("fred");
3. broker.addConnector("tcp://localhost:61616");
4. broker.start();
如果希望在同一個JVM內訪問這個broker,那麼可以使用VM Transport,URI是:vm://brokerName。關於更多的broker屬性,可以參考Apache的官方文件。 此外,也可以通過BrokerFactory來建立broker,例如:
Java程式碼
1. BrokerService broker = BrokerFactory.createBroker(new URI(someURI));
someURI的可選值如下:

URI scheme

Example

Description

xbean:

xbean:activemq.xml

Searches the classpath for an XML document with the given URI (activemq.xml in this case) which will then be used as the Xml Configuration

file:

file:foo/bar/activemq.xml

Loads the given file (in this example foo/bar/activemq.xml) as the Xml Configuration

broker:

broker:tcp://localhost:61616

Uses the Broker Configuration URI to configure the broker

當使用XBean的配置方式的時候,需要指定一個xml配置檔案,例如:
Java程式碼
1. BrokerService broker = BrokerFactory.createBroker(newURI("xbean:com/test/activemq.xml"));
使用Spring的配置方式如下:
Xml程式碼
1. <bean id="broker"class="org.apache.activemq.xbean.BrokerFactoryBean">
2.     <propertyname="config"value="classpath:org/apache/activemq/xbean/activemq.xml" />
3.     <property name="start"value="true" />
4. </bean>

2.1.3 監控Broker

2.1.3.1 JMX

在使用JMX監控broker之前,首先要啟用broker的JMX監控功能,例如在配置檔案中設定useJmx="true",如下:
Xml程式碼
1. <broker useJmx="true" brokerName="broker1”>
2. <managementContext>
3. <managementContext createConnector="true"/>
4. </managementContext>
5. ...
6. </broker>
接下來執行JDK自帶的jconsole。在運行了jconsole後,它會彈出對話方塊來選擇需要連線到的agent。如果是在啟動broker的主機上執行jconsole,那麼ActiveMQ broker會出現在jconsole的Local 標籤中。如果要連線到遠端的broker,那麼可以在Advanced標籤中指定JMX URL,以下是一個連線到本機的JMX URL: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
在jconsole的MBeans標籤中,可以檢視詳細資訊,也可以執行相應的operation。需要注意的是,在jconsole連線到broker的時候,並不需要輸入使用者名稱和密碼,如果這存在潛在的安全問題,那麼就需要為JMX Connector配置密碼保護(需要使用1.5以上版本的JDK)。
首先要禁止ActiveMQ建立自己的connector,例如:
Xml程式碼
1. <broker xmlns="http://activemq.org/config/1.0"brokerName="localhost"useJmx="true">
2. <managementContext>
3. <managementContext createConnector="false"/>
4. </managementContext>
5. </broker>
然後在ActiveMQ的conf目錄下建立一個訪問控制檔案和密碼檔案,如下:

conf/jmx.access:

# The "monitorRole" role has readonlyaccess.

# The "controlRole" role has readwriteaccess.

monitorRole readonly

controlRole readwrite
conf/jmx.password:

# The "monitorRole" role has password"abc123".

# The "controlRole" role has password"abcd1234".

monitorRole abc123

controlRole abcd1234
然後修改ActiveMQ的bin目錄下activemq的啟動指令碼,查詢包含"SUNJMX="的一行如下:
REM set SUNJMX=-Dcom.sun.management.jmxremote.port=1616

-Dcom.sun.management.jmxremote.authenticate=false

-Dcom.sun.management.jmxremote.ssl=false

把它替換成

setSUNJMX=-Dcom.sun.management.jmxremote.port=1616

-Dcom.sun.management.jmxremote.authenticate=true

-Dcom.sun.management.jmxremote.ssl=false

-Dcom.sun.management.jmxremote.password.file=%ACTIVEMQ_BASE%/conf/jmx.password

-Dcom.sun.management.jmxremote.access.file=%ACTIVEMQ_BASE%/conf/jmx.access

最後重啟ActiveMQ和jconsole,這時候需要強制login。如果在啟動activemq的過程中出現以下錯誤,那麼需要為這個檔案增加訪問控制。Windows平臺上的具體解決方法請參考如下網址:

Error: Password file read access must berestricted:

D:\apache-activemq-5.0.0\bin\../conf/jmx.password

2.1.3.2 Web Console

Web Console被整合到了ActiveMQ的二進位制釋出包中,因此預設訪問http://localhost:8161/admin即可訪問Web Console。 在配置檔案中,可以通過修改nioConnector的port屬性來修改Web console的預設埠:
Xml程式碼
1. <jetty xmlns="http://mortbay.com/schemas/jetty/1.0">
2. <connectors>
3. <nioConnector port="8161" />
4. </connectors>
5. ...
6. </jetty>
出於安全性或者可靠性的考慮,Web Console 可以被部署到不同於ActiveMQ的程序中。例如把activemq-web-console.war部署到一個單獨的web容器中(Tomcat,Jetty等)。在ActiveMQ5.0的二進位制釋出包中不包含activemq-web-console.war,因此需要下載ActiveMQ的原始碼,然後進入到${activemq.base}/src/activemq-web-console目錄中執行mvn instanll。如果一切正常,那麼預設會在${activemq.base}/src/activemq-web-console/target目錄中生成activemq-web-console-5.0.0.war。然後將activemq-web-console-5.0.0.war拷貝到Tomcat的webapps目錄中,並重命名成activemq-web-console.war。
需要注意的是,要將activemq-all-5.0.0.jar拷貝到WEB-INF\lib目錄中(可能還需要拷貝jms.jar)。還要為Tomcat設定以下五個系統屬性(修改catalina.bat檔案):
set JAVA_OPTS=%JAVA_OPTS% -Dwebconsole.type="properties"

set JAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jms.url="tcp://localhost:61616"

set JAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jmx.url="service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"

set JAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jmx.role=""

set JAVA_OPTS=%JAVA_OPTS%-Dwebconsole.jmx.password=""
如果JMX沒有配置密碼保護,那麼webconsole.jmx.role和webconsole.jmx.password設定成""即可。如果broker被配置成了Master/Slave模式,那麼可以配置成使用failover transport,例如:
-Dwebconsole.jms.url=failover:(tcp://serverA:61616,tcp://serverB:61616)
順便說一下,由於webconsole.type 屬性是properties,因此實際上起作用的Web Console的配置檔案是WEB-INF/ webconsole-properties.xml。最後啟動被監控的ActiveMQ,訪問http://localhost:8080/activemq-web-console/,檢視顯示是否正常。

2.1.3.3 Advisory Message

ActiveMQ 支援Advisory Messages,它允許你通過標準的JMS 訊息來監控系統。目前的Advisory Messages支援:
• consumers, producers and connections starting and stopping
• temporary destinations being created and destroyed
• messages expiring on topics and queues
• brokers sending messages to destinations with no consumers.
• connections starting and stopping
Advisory Messages可以被想象成某種的管理通道,通過它你可以得到關於JMS Provider、producers、consumers和destinations的資訊。Advisory topics都使用ActiveMQ.Advisory.這個字首,以下是目前支援的topics:
Client based advisories

Advisory Topics

Description

ActiveMQ.Advisory.Connection

Connection start & stop messages

ActiveMQ.Advisory.Producer.Queue

Producer start & stop messages on a Queue

ActiveMQ.Advisory.Producer.Topic

Producer start & stop messages on a Topic

ActiveMQ.Advisory.Consumer.Queue

Consumer start & stop messages on a Queue

ActiveMQ.Advisory.Consumer.Topic

Consumer start & stop messages on a Topic

在消費者啟動/停止的Advisory Messages的訊息頭中有個consumerCount屬性,他用來指明目前desination上活躍的consumer的數量。
Destination and Message based advisories

Advisory Topics

Description

ActiveMQ.Advisory.Queue

Queue create & destroy

ActiveMQ.Advisory.Topic

Topic create & destroy

ActiveMQ.Advisory.TempQueue

Temporary Queue create & destroy

ActiveMQ.Advisory.TempTopic

Temporary Topic create & destroy

ActiveMQ.Advisory.Expired.Queue

Expired messages on a Queue

ActiveMQ.Advisory.Expired.Topic

Expired messages on a Topic

ActiveMQ.Advisory.NoConsumer.Queue

No consumer is available to process messages being sent on a Queue

ActiveMQ.Advisory.NoConsumer.Topic

No consumer is available to process messages being sent on a Topic

以上的這些destnations都可以用來作為字首,在其後面追加其它的重要資訊,例如topic、queue、clientID、producderID和consumerID等。這令你可以利用Wildcards 和 Selectors 來過濾Advisory Messages(關於Wildcard和Selector會在稍後介紹)。
例如,如果你希望訂閱FOO.BAR這個queue上Consumer的start/stop的訊息,那麼可以訂閱ActiveMQ.Advisory.Consumer.Queue.FOO.BAR;如果希望訂閱所有queue上的start/stop訊息,那麼可以訂閱ActiveMQ.Advisory.Consumer.Queue.>;如果希望訂閱所有queue或者topic上的start/stop訊息,那麼可以訂閱ActiveMQ.Advisory.Consumer. >。
org.apache.activemq.advisory.AdvisorySupport類上有如下的helper methods,用來在程式中得到advisory destination objects。
Java程式碼
1. AdvisorySupport.getConsumerAdvisoryTopic()
2. AdvisorySupport.getProducerAdvisoryTopic()
3. AdvisorySupport.getDestinationAdvisoryTopic()
4. AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
5. AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
6. AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
7. AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
以下是段使用Advisory Messages的程式程式碼:
Java程式碼
1. Destination advisoryDestination =AdvisorySupport.getProducerAdvisoryTopic(destination)
2. MessageConsumer consumer = session.createConsumer(advisoryDestination);
3. consumer.setMessageListener(this);
4. ...
5. public void onMessage(Message msg){
6. if (msg instanceof ActiveMQMessage){
7. try {
8. ActiveMQMessage aMsg = (ActiveMQMessage)msg;
9. ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
10. } catch (JMSException e) {
11. log.error("Failed to process message: " + msg);
12. }
13. }
14.}

2.1.3.4 Command Agent

在介紹CommandAgent前首先簡要介紹一下XMPP(Jabber)協議,XMPP是一種基於XML的即時通訊協議,它由Jabber軟體基金會開發。在配置檔案中通過增加transportConnector來支援XMPP協議:
Xml程式碼
1. <broker xmlns="http://activemq.org/config/1.0">
2. <transportConnectors>
3. ...
4. <transportConnector name="xmpp"uri="xmpp://localhost:61222"/>
5. </transportConnectors>
6. </broker>
ActiveMQ提供了ActiveMQmessages和XMPP之間的雙向橋接:
• 如果客戶加入了一個聊天室,那麼這個聊天室的名字會被對映到一個JMS topic。
• 嘗試在聊天室內傳送訊息會導致一個JMS訊息被髮送到這個topic。
• 呆在一個聊天室中意味著這將保持一個對相應JMS topic的訂閱。因此傳送到這個topic的JMS訊息也會被髮送到聊天室。
推薦XMPP客戶端Spark(http://www.igniterealtime.org/)。
從4.2版本起,ActiveMQ支援Command Agent。在配置檔案中,通過設定commandAgent來啟用Command Agent:
Xml程式碼
1. <beans>
2. <broker useJmx="true" xmlns="http://activemq.org/config/1.0">
3. ...
4. </broker>
5. <commandAgent xmlns="http://activemq.org/config/1.0"/>
6. </beans>
啟用了Command Agent的broker上會有一個來自Command Agent的連線,它同時訂閱topic: ActiveMQ.Agent。在你啟動XMPP客戶端,加入到ActiveMQ.Agent聊天室後,就可以同broker進行交談了。通過在XMPP客戶端中鍵入help,可以得到幫助資訊。 需要注意的是,ActiveMQ5.0版本有個小bug,如果broker沒有采用預設的使用者名稱和密碼,那麼Command Agent便無法正常啟動。Apache官方文件說,此bug已經被修正,預定在5.2.0版本上體現。修改方式如下:
Xml程式碼
1. <commandAgent xmlns="http://activemq.org/config/1.0"brokerUser="user" brokerPassword="passward"/>

2.1.3.5 Visualization Plugin

ActiveMQ支援以broker外掛的形式生成DOT檔案(可以用agrviewer來檢視),以圖表的方式描述connections、sessions、producers、consumers、destinations等資訊。配置方式如下:
Xml程式碼
1. <broker xmlns="http://activemq.org/config/1.0"brokerName="localhost" useJmx="true">
2. ...
3. <plugins>
4. <connectionDotFilePlugin file="connection.dot"/>
5. <destinationDotFilePlugin file="destination.dot"/>
6. </plugins>
7. </broker>
需要注意的是,筆者認為ActiveMQ5.0版本的Visualization Plugin尚不穩定,存在諸多問題。例如:如果使用connectionDotFilePlugin,那麼brokerName必須是localhost;如果使用destinationDotFilePlugin可能會導致ArrayStoreException。

2.2 Transport

ActiveMQ目前支援的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。以下簡單介紹其中的幾種,更多請參考Apache官方文件。

2.2.1 VM Transport

VM transport允許在VM內部通訊,從而避免了網路傳輸的開銷。這時候採用的連線不是socket連線,而是直接地方法呼叫。 第一個建立VM 連線的客戶會啟動一個embed VM broker,接下來所有使用相同的broker名稱的VM連線都會使用這個broker。當這個broker上所有的連線都關閉的時候,這個broker也會自動關閉。 以下是配置語法:
vm://brokerName?transportOptions
例如:vm://broker1?marshal=false&broker.persistent=false
Transport Options的可選值如下:

Option Name

Default Value

Description

Marshal

false

If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat

wireFormat

default

The name of the WireFormat to use

wireFormat.*

All the properties with this prefix are used to configure the wireFormat

create

true

If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1

broker.*

All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information

以下是高階配置語法:
vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions
vm:broker:(tcp://localhost)?brokerOptions
例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false
Transport Options的可選值如下:

Option Name

Default Value

Description

marshal

false

If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat

wireFormat

default

The name of the WireFormat to use

wireFormat.*

All the propertieswith this prefix are used to configure the wireFormat

使用配置檔案的配置語法: vm://localhost?brokerConfig=xbean:activemq.xml

例如:vm://localhost?brokerConfig=xbean:com/test/activemq.xml
使用Spring的配置:
Xml程式碼
1. <bean id="broker"class="org.apache.activemq.xbean.BrokerFactoryBean">
2.  <property name="config"value="classpath:org/apache/activemq/xbean/activemq.xml" />
3.  <property name="start"value="true" />
4. </bean>
5.
6. <bean id="connectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory"depends-on="broker">
7.     <propertyname="brokerURL" value="vm://localhost"/>
8. </bean>
如果persistent是true,那麼ActiveMQ會在當前目錄下建立一個預設值是activemq-data的目錄用於持久化儲存資料。需要注意的是,如果程式中啟動了多個不同名字的VM broker,那麼可能會有如下警告:Failed to start jmx connector: Cannot bind to URL[rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException…可以通過在transportOptions中追加broker.useJmx=false來禁用JMX來避免這個警告。

2.2.2 TCP Transport

TCP transport 允許客戶端通過TCP socket連線到遠端的broker。以下是配置語法: tcp://hostname:port?transportOptions TransportOptions的可選值如下:

Option Name

Default Value

Description

minmumWireFormatVersion

0

The minimum version wireformat that is allowed

trace

false

Causes all commands that are sent over the transport to be logged

useLocalHost

true

When true, it causes the local machines name to resolve to "localhost".

socketBufferSize

64 * 1024

Sets the socket buffer size in bytes

soTimeout

0

sets the socket timeout in milliseconds

connectionTimeout

30000

A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored.

wireFormat

default

The name of the WireFormat to use

wireFormat.*

All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information

例如:tcp://localhost:61616?trace=false

2.2.3 Failover Transport

Failover Transport是一種重新連線的機制,它工作於其它transport的上層,用於建立可靠的傳輸。它的配置語法允許制定任意多個複合的URI。Failover transport會自動選擇其中的一個URI來嘗試建立連線。如果沒有成功,那麼會選擇一個其它的URI來建立一個新的連線。以下是配置語法:

failover:(uri1,...,uriN)?transportOptions

failover:uri1,...,uriN

Transport Options的可選值如下:

Option Name

Default Value

Description

initialReconnectDelay

10

How long to wait before the first reconnect attempt (in ms)

maxReconnectDelay

30000

The maximum amount of time we ever wait between reconnect attempts (in ms)

useExponentialBackOff

true

Should an exponential backoff be used between reconnect attempts

backOffMultiplier

2

The exponent used in the exponential backoff attempts

maxReconnectAttempts

0

If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client

randomize

true

use a random algorithm to choose the URI to use for reconnect from the list provided

backup

false

initialize and hold a second transport connection - to enable fast failover

例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100

2.2.4 Discovery transport

Discovery transport是可靠的tranport。它使用Discovery transport來定位用來連線的URI列表。以下是配置語法:

discovery:(discoveryAgentURI)?transportOptions

discovery:discoveryAgentURI

Transport Options的可選值如下:

Option Name

Default Value

Description

initialReconnectDelay

10

How long to wait before the first reconnect attempt

maxReconnectDelay

30000

The maximum amount of time we ever wait between reconnect attempts

useExponentialBackOff

true

Should an exponential backoff be used btween reconnect attempts

backOffMultiplier

2

The exponent used in the exponential backoff attempts

maxReconnectAttempts

0

If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client

例:discovery:(multicast://default)?initialReconnectDelay=100為了使用Discovery來發現broker,需要為broker啟用discovery agent。 以下是XML配置檔案中的一個例子:
Xml程式碼
1. <broker name="foo">
2. <transportConnectors>
3. <transportConnector uri="tcp://localhost:0"discoveryUri="multicast://default"/>
4. </transportConnectors>
5. ...
6. </broker>
在使用Failover Transport或Discovery transport等能夠自動重連的transport的時候,需要注意的是:設想有兩個broker,它們都啟用AMQ Message
Store作為持久化儲存,有一個producer和一個consumer連線到某個queue。當因其中一個broker失效時而切換到另一個broker的時候,如果失效的broker的queue中還有未被consumer消費的訊息,那麼這個queue裡的訊息仍然滯留在失效broker的中,直到失效的broker被修復並重新切換回這個被修復的broker後,之前被保留的訊息才會被consumer消費掉。如果被處理的訊息有時序限制,那麼應用程式就需要處理這個問題。另外也可以通過ActiveMQ叢集來解決這個問題。
在transport重連的時候,可以在connection上註冊TransportListener來獲得回撥,例如:
Java程式碼
1. (ActiveMQConnection)connection).addTransportListener(new TransportListener(){
2. public void onCommand(Object cmd) {
3. }
4.
5. public void onException(IOException exp) {
6. }
7.
8. public void transportInterupted() {
9. // The transport has suffered an interruption from which it hopes torecover.
10. }
11.
12. public void transportResumed() {
13. // The transport has resumed after an interruption.
14. }
15.});

2.3 持久化

2.3.1 AMQ Message Store

AMQ Message Store是ActiveMQ5.0預設的持久化儲存。Message commands被儲存到transactional journal(由rolling data logs組成)。Messages被儲存到data logs中,同時被reference store進行索引以提高存取速度。Date logs由一些單獨的data log檔案組成,預設的檔案大小是32M,如果某個訊息的大小超過了data log檔案的大小,那麼可以修改配置以增加data log檔案的大小。如果某個data log檔案中所有的訊息都被成功消費了,那麼這個data log檔案將會被標記,以便在下一輪的清理中被刪除或者歸檔。以下是其配置的一個例子:
Xml程式碼
1. <broker brokerName="broker" persistent="true"useShutdownHook="false">
2. <persistenceAdapter>
3. <amqPersistenceAdapter directory="${activemq.base}/data"maxFileLength="32mb"/>
4. </persistenceAdapter>
5. </broker>

Property Name

Default Value

Comments

directory

activemq-data

the path to the directory to use to store the message store data and log files

useNIO

true

use NIO to write messages to the data logs

syncOnWrite

false

sync every write to disk

maxFileLength

32mb

a hint to set the maximum size of the message data logs

persistentIndex

true

use a persistent index for the message logs. If this is false, an in-memory structure is maintained

maxCheckpointMessageAddSize

4kb

the maximum number of messages to keep in a transaction before automatically committing

cleanupInterval

30000

time (ms) before checking for a discarding/moving message data logs that are no longer used

indexBinSize

1024

default number of bins used by the index. The bigger the bin size -the better the relative performance of the index

indexKeySize

96

the size of the index key - the key is the message id

indexPageSize

16kb

the size of the index page -the bigger the page - the better the write performance of the index

directoryArchive

archive

the path to the directory to use to store discarded data logs

archiveDataLogs

false

if true data logs are moved to the archive directory instead of being deleted

2.3.2 Kaha Persistence

Kaha Persistence 是一個專門針對訊息持久化的解決方案。它對典型的訊息使用模式進行了優化。在Kaha中,資料被追加到data logs中。當不再需要log檔案中的資料的時候,log檔案會被丟棄。以下是其配置的一個例子:
Xml程式碼
1. <broker brokerName="broker" persistent="true"useShutdownHook="false">
2. <persistenceAdapter>
3. <kahaPersistenceAdapter directory="activemq-data"maxDataFileLength="33554432"/>
4. </persistenceAdapter>
5. </broker>

2.3.3 JDBC Persistence

目前支援的資料庫有ApacheDerby, Axion, DB2, HSQL, Informix, MaxDB, MySQL, Oracle, Postgresql, SQLServer,Sybase。 如果你使用的資料庫不被支援,那麼可以調整StatementProvider 來保證使用正確的SQL方言(flavour of SQL)。通常絕大多數資料庫支援以下adaptor:
• org.activemq.store.jdbc.adapter.BlobJDBCAdapter
• org.activemq.store.jdbc.adapter.BytesJDBCAdapter
• org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
• org.activemq.store.jdbc.adapter.ImageJDBCAdapter
也可以在配置檔案中直接指定JDBC adaptor,例如:
Xml程式碼
1. <jdbcPersistenceAdapteradapterClass="org.apache.activemq.store.jdbc.adapter.ImageBasedJDBCAdaptor"/>以下是其配置的一個例子:
Xml程式碼
1. <persistence>
2. <jdbcPersistence dataSourceRef=" mysql-ds"/>
3. </persistence>
4.
5. <bean id="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close">
6. <property name="driverClassName"value="com.mysql.jdbc.Driver"/>
7. <property name="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
8. <property name="username" value="activemq"/>
9. <property name="password" value="activemq"/>
10. <property name="poolPreparedStatements"value="true"/>
11.</bean>
需要注意的是,如果使用MySQL,那麼需要設定relaxAutoCommit 標誌為true。

2.3.4 Disable Persistence

以下是其配置的一個例子:
Xml程式碼
1. <broker persistent="false">
2. </broker>

2.4 安全機制

ActiveMQ支援可插拔的安全機制,用以在不同的provider之間切換。

2.4.1 Simple Authentication Plugin

Simple Authentication Plugin適用於簡單的認證需求,或者用於建立測試環境。它允許在XML配置檔案中指定使用者、使用者組和密碼等資訊。以下是ActiveMQ配置的一個例子:
Xml程式碼
1. <plugins>
2. ...
3. <simpleAuthenticationPlugin>
4. <users>
5. <authenticationUser username="system"password="manager" groups="users,admins"/>
6. <authenticationUser username="user" password="password"groups="users"/>
7. <authenticationUser username="guest"password="password" groups="guests"/>
8. </users>
9. </simpleAuthenticationPlugin>
10.</plugins>

2.4.2 JAAS Authentication Plugin

JAAS Authentication Plugin依賴標準的JAAS機制來實現認證。通常情況下,你需要通過設定java.security.auth.login.config系統屬性來配置login modules的配置檔案。如果沒有指定這個系統屬性,那麼JAAS Authentication Plugin會預設使用login.config作為檔名。以下是一個login.config檔案的例子:
activemq-domain {

org.apache.activemq.jaas.PropertiesLoginModulerequired debug=true

org.apache.activemq.jaas.properties.user="users.properties"

org.apache.activemq.jaas.properties.group="groups.properties";

};

這個login.config檔案中設定了兩個屬性:org.apache.activemq.jaas.properties.user和org.apache.activemq.jaas.properties.group分別用來指向user.properties和group.properties檔案。需要注意的是,PropertiesLoginModule使用本地檔案的查詢方式,而且查詢時採用的base directory是login.config檔案所在的目錄。因此這個login.config說明user.properties和group.properties檔案存放在跟login.config檔案相同的目錄裡。 以下是ActiveMQ配置的一個例子:
Xml程式碼
1. <plugins>
2. ...
3. <jaasAuthenticationPlugin configuration="activemq-domain" />
4. </plugins>
基於以上的配置,在JAAS的LoginContext中會使用activemq-domain中配置的PropertiesLoginModule來進行登陸。 ActiveMQ JAAS還支援LDAPLoginModule、CertificateLoginModule、TextFileCertificateLoginModule等login module。

2.4.3 Custom Authentication Implementation

可以通過編碼的方式為ActiveMQ增加認證功能。例如編寫一個類繼承自XBeanBrokerService。
Java程式碼
1. package com.yourpackage;
2.
3. import java.net.URI;
4. import java.util.HashMap;
5. import java.util.Map;
6.
7. import org.apache.activemq.broker.Broker;
8. import org.apache.activemq.broker.BrokerFactory;
9. import org.apache.activemq.broker.BrokerService;
10.import org.apache.activemq.security.SimpleAuthenticationBroker;
11.import org.apache.activemq.xbean.XBeanBrokerService;
12.
13.public class SimpleAuthBroker extends XBeanBrokerService {
14. //
15. private String user;
16. private String password;
17.
18. @SuppressWarnings("unchecked")
19. protected Broker addInterceptors(Broker broker) throws Exception {
20. broker = super.addInterceptors(broker);
21. Map passwords = new HashMap();
22. passwords.put(getUser(), getPassword());
23. broker = new SimpleAuthenticationBroker(broker, passwords, new HashMap());
24. return broker;
25. }
26.
27. public String getUser() {
28. return user;
29. }
30.
31. public void setUser(String user) {
32. this.user = user;
33. }
34.
35. public String getPassword() {
36. return password;
37. }
38.
39. public void setPassword(String password) {
40. this.password = password;
41. }
42.}
以下是ActiveMQ配置檔案的一個例子:
Xml程式碼
1. <beans>
2. …
3. <auth:SimpleAuthBroker
4. xmlns:auth="java://com.yourpackage"
5. xmlns="http://activemq.org/config/1.0"brokerName="SimpleAuthBroker1" user="user"password="password" useJmx="true">
6.
7. <transportConnectors>
8. <transportConnector uri="tcp://localhost:61616"/>
9. </transportConnectors>
10. </auth:SimpleAuthBroker>
11. …
12.</beans>
在這個配置檔案中增加了一個namespace auth,用於指向之前編寫的哪個類。同時為SimpleAuthBroker注入了兩個屬性值user和password,因此在被SimpleAuthBroker改寫的addInterceptors方法裡,可以使用這兩個屬性進行認證了。ActiveMQ提供的SimpleAuthenticationBroker類繼承自BrokerFilter(可以簡單的看成是Broker的Adaptor),它的建構函式中的兩個Map分別是userPasswords和userGroups。 SimpleAuthenticationBroker在 addConnection方法中使用userPasswords進行認證,同時會把userGroups的資訊儲存到ConnectionContext中 。

2.4.4 Authorization Plugin

可以通過AuthorizationPlugin為認證後的使用者授權,以下ActiveMQ配置檔案的一個例子:
Xml程式碼
1. <plugins>
2. <jaasAuthenticationPlugin configuration="activemq-domain"/>
3.
4. <authorizationPlugin>
5. <map>
6. <authorizationMap>
7. <authorizationEntries>
8. <authorizationEntry queue=">" read="admins"write="admins" admin="admins" />
9. <authorizationEntry queue="USERS.>" read="users"write="users" admin="users" />
10. <authorizationEntry queue="GUEST.>" read="guests"write="guests,users" admin="guests,users" />
11.
12. <authorizationEntry topic=">" read="admins"write="admins" admin="admins" />
13. <authorizationEntry topic="USERS.>" read="users"write="users" admin="users" />
14. <authorizationEntry topic="GUEST.>" read="guests"write="guests,users" admin="guests,users" />
15.
16. <authorizationEntry topic="ActiveMQ.Advisory.>"read="guests,users" write="guests,users"admin="guests,users"/>
17. </authorizationEntries>
18. </authorizationMap>
19. </map>
20. </authorizationPlugin>
21.</plugins>

2.5 Clustering

ActiveMQ從多種不同的方面提供了叢集的支援。

2.5.1 Queue consumer clusters

ActiveMQ支援訂閱同一個queue的consumers上的叢集。如果一個consumer失效,那麼所有未被確認(unacknowledged)的訊息都會被髮送到這個queue上其它的consumers。如果某個consumer的處理速度比其它consumers更快,那麼這個consumer就會消費更多的訊息。 需要注意的是, 筆者發現AcitveMQ5.0版本的Queue consumer clusters存在一個bug: 採用AMQMessage Store,執行一個producer,兩個consumer,並採用如下的配置檔案:
Xml程式碼
1. <beans>
2. <broker xmlns="http://activemq.org/config/1.0"brokerName="BugBroker1" useJmx="true">
3.
4. <transportConnectors>
5. <transportConnector uri="tcp://localhost:61616"/>
6. </transportConnectors>
7.
8. <persistenceAdapter>
9. <amqPersistenceAdapter directory="activemq-data/BugBroker1"maxFileLength="32mb"/>
10. </persistenceAdapter>
11.
12. </broker>
13.</beans>
那麼經過一段時間後可能會報出如下錯誤:

ERROR [ActiveMQ Transport: tcp:///127.0.0.1:1843 -RecoveryListenerAdapter.java:58 - RecoveryListenerAdapter]

Message idID:versus-1837-1203915536609-0:2:1:1:419 could not be recovered from the datastore! Apache官方文件說,此bug已經被修正,預定在5.1.0版本上體現。

2.5.2 Broker clusters

一個常見的場景是有多個JMS broker,有一個客戶連線到其中一個broker。如果這個broker失效,那麼客戶會自動重新連線到其它的broker。在ActiveMQ中使用failover:// 協議來實現這個功能。ActiveMQ3.x版本的reliable://協議已經變更為failover://。 如果某個網路上有多個brokers而且客戶使用靜態發現(使用Static Transport或Failover Transport)或動態發現(使用Discovery Transport),那麼客戶可以容易地在某個broker失效的情況下切換到其它的brokers。然而,stand alone brokers並不瞭解其它brokers上的consumers,也就是說如果某個broker上沒有consumers,那麼這個broker上的訊息可能會因得不到處理而積壓起來。目前的解決方案是使用Network of brokers,以便在broker之間儲存轉發訊息。ActiveMQ在未來會有更好的特性,用來在客戶端處理這個問題。 從ActiveMQ1.1版本起,ActiveMQ支援networks of brokers。它支援分散式的queues和topics。一個broker會相同對待所有的訂閱(subscription):不管他們是來自本地的客戶連線,還是來自遠端broker,它都會遞送有關的訊息拷貝到每個訂閱。遠端broker得到這個訊息拷貝後,會依次把它遞送到其內部的本地連線上。有兩種方式配置Network of brokers,一種是使用static transport,如下:
Xml程式碼
1. <broker brokerName="receiver" persistent="false"useJmx="false">
2.     <transportConnectors>
3.              <transportConnectoruri="tcp://localhost:62002"/>
4.     </transportConnectors>
5.     <networkConnectors>
6.              <networkConnectoruri="static:( tcp://localhost:61616,tcp://remotehost:61616)"/>
7.     </networkConnectors>
8. …
9. </broker>

另外一種是使用multicastdiscovery,如下:
Xml程式碼
1. <broker name="sender" persistent="false"useJmx="false">
2.     <transportConnectors>
3.              <transportConnectoruri="tcp://localhost:0" discoveryUri="multicast://default"/>
4.     </transportConnectors>
5.     <networkConnectors>
6.              <networkConnectoruri="multicast://default"/>
7.     </networkConnectors>
8. ...
9. </broker>

Network Connector有以下屬性:

Property

Default Value

Description

name

bridge

name of the network - for more than one network connector between the same two brokers -use different names

dynamicOnly

false

if true, only forward messages if a consumer is active on the connected broker

decreaseNetworkConsumerPriority

false

decrease the priority for dispatching to a Queue consumer the further away it is (in network hops) from the producer

networkTTL

1

the number of brokers in the network that messages and subscriptions can pass through

conduitSubscriptions

true

multiple consumers subscribing to the same destination are treated as one consumer by the network

excludedDestinations

empty

destinations matching this list won't be forwarded across the network

dynamicallyIncludedDestinations

empty

destinations that match this list will be forwarded across the network n.b. an empty list means all destinations not in the excluded list will be forwarded

staticallyIncludedDestinations

empty

destinations that match will always be passed across the network -even if no consumers have ever registered an interest

duplex

false

if true, a network connection will be used to both produce AND Consume messages. This is useful for hub and spoke scenarios when the hub is behind a firewall etc.

關於conduitSubscriptions屬性,這裡稍稍說明一下。設想有兩個brokers,分別是brokerA和brokerB,它們之間用forwarding bridge連線。有一個consumer連線到brokerA並訂閱queue:Q.TEST。有兩個consumers連線到brokerB,也是訂閱queue:Q.TEST。這三個consumers有相同的優先順序。然後啟動一個producer,它傳送了30條訊息到brokerA。如果conduitSubscriptions=true,那麼brokerA上的consumer會得到15條訊息, 另外15條訊息會發送給brokerB。此時負載並不均衡,因為此時brokerA將brokerB上的兩個consumers視為一個;如果conduitSubscriptions=false,那麼每個consumer上都會收到10條訊息。以下是關於NetworkConnector屬性的一個例子:
Xml程式碼
1. <networkConnectors>
2. <networkConnector uri="static://(tcp://localhost:61617)"
3.              name="bridge"dynamicOnly="false" conduitSubscriptions="true"
4.              decreaseNetworkConsumerPriority="false">
5.     <excludedDestinations>
6.              <queuephysicalName="exclude.test.foo"/>
7.              <topicphysicalName="exclude.test.bar"/>
8.     </excludedDestinations>
9.     <dynamicallyIncludedDestinations>
10.            <queuephysicalName="include.test.foo"/>
11.            <topicphysicalName="include.test.bar"/>
12.   </dynamicallyIncludedDestinations>
13.   <staticallyIncludedDestinations>
14.            <queuephysicalName="always.include.queue"/>
15.            <topicphysicalName="always.include.topic"/>
16.   </staticallyIncludedDestinations>
17. </networkConnector>
18.</networkConnectors>

2.5.3 Master Slave

在一個網路內執行多個brokers或者standalone brokers時存在一個問題,這就是訊息在物理上只被一個broker持有,因此當某個broker失效,那麼你只能等待直到它重啟後,這個broker上的訊息才能夠被繼續傳送(如果沒有設定持久化,那麼在這種情況下,訊息將會丟失)。Master Slave 背後的想法是,訊息被複制到slave broker,因此即使master broker遇到了像硬體故障之類的錯誤,你也可以立即切換到slave broker而不丟失任何訊息。 Master Slave是目前ActiveMQ推薦的高可靠性和容錯的解決方案。以下是幾種不同的型別:

Master Slave Type

Requirements

Pros

Cons

Pure Master Slave

None

No central point of failure

Requires manual restart to bring back a failed master and can only support 1 slave

Shared File System Master Slave

A Shared File system such as a SAN

Run as many slaves as required. Automatic recovery of old masters

Requires shared file system

JDBC Master Slave

A Shared database

Run as many slaves as required. Automatic recovery of old masters

Requires a shared database. Also relatively slow as it cannot use the high performance journal

2.5.3.1 Pure Master Slave的工作方式
• Slave broker消費masterbroker上所有的訊息狀態,例如訊息、確認和事務狀態等。只要slave broker連線到了master broker,它不會(也不被允許)啟動任何network connectors或者transport connectors,所以唯一的目的就是複製master broker的狀態。
• Master broker只有在訊息成功被複制到slave broker之後才會響應客戶。例如,客戶的commit請求只有在master broker和slave broker都處理完畢commit請求之後才會結束。
• 當masterbroker失效的時候,slavebroker有兩種選擇,一種是slavebroker啟動所有的networkconnectors和transportconnectors,這允許客戶端切換到slave broker;另外一種是slave broker停止。這種情況下,slave broker只是複製了master broker的狀態。
• 客戶應該使用failovertransport並且應該首先嚐試連線master broker。例如:failover://(tcp://masterhost:61616,tcp://slavehost:61616)?randomize=false 設定randomize為false就可以讓客戶總是首先嚐試連線master broker(slave broker並不會接受任何連線,直到它成為了master broker)。
Pure Master Slave具有以下限制:
• 只能有一個slavebroker連線到masterbroker。
• 在因masterbroker失效而導致slavebroker成為master之後,之前的master broker只有在當前的master broker(原slave broker)停止後才能重新生效。
• Master broker失效後而切換到slavebroker後,最安全的恢復masterbroker的方式是人工處理。首先要停止slave broker(這意味著所有的客戶也要停止)。然後把slave broker的資料目錄中所有的資料拷貝到master broker的資料目錄中。然後重啟master broker和slave broker。
Master broker不需要特殊的配置。Slavebroker需要進行以下配置
Xml程式碼
1. <broker masterConnectorURI="tcp://masterhost:62001"shutdownOnMasterFailure="false">
2. ...
3. <transportConnectors>
4. <transportConnector uri="tcp://slavehost:61616"/>
5. </transportConnectors>
6. </broker>
其中的masterConnectorURI用於指向master broker,shutdownOnMasterFailure用於指定slave broker在master broker失效的時候是否需要停止。此外,也可以使用如下配置:
Xml程式碼
1. <broker brokerName="slave" useJmx="false"deleteAllMessagesOnStartup="true" xmlns="http://activemq.org/config/1.0">
2. ...
3. <services>
4. <masterConnector remoteURI= "tcp://localhost:62001"userName="user" password="password"/>
5. </services>
6. </broker>
需要注意的是,筆者認為ActiveMQ5.0版本的Pure Master Slave仍然不夠穩定。

2.5.3.2 Shared File System Master Slave

如果你使用SAN或者共享檔案系統,那麼你可以使用Shared File System Master Slave。基本上,你可以執行多個broker,這些broker共享資料目錄。當第一個broker得到檔案上的排他鎖之後,其它的broker便會在迴圈中等待獲得這把鎖。客戶端使用failover transport來連線到可用的broker。當master broker失效的時候會釋放這把鎖,這時候其中一個slave broker會得到這把鎖從而成為master broker。以下是ActiveMQ配置的一個例子:
Xml程式碼
1. <broker useJmx="false" xmlns="http://activemq.org/config/1.0">
2. <persistenceAdapter>