1. 程式人生 > >ActiveMQ API 詳解

ActiveMQ API 詳解

4.1 開發JSM的步驟

廣義上說,一個JMS 應用是幾個JMS 客戶端交換訊息,開發JMS 客戶端應用由以下幾步構成:     用JNDI 得到ConnectionFactory 物件;     用ConnectionFactory 建立Connection 物件;     用Connection 物件建立一個或多個JMS Session;

用JNDI 得到目標佇列或主題物件,即Destination 物件;    用Session 和Destination 建立MessageProducer 和MessageConsumer;     通知Connection 開始傳送訊息。

4.2 程式設計模版

4.2.1 ConnectionFactory

要初始化JMS,則需要使用連線工廠。客戶端通過建立ConnectionFactory建立到ActveMQ的連線,一個連線工廠封裝了一組連線配置引數,這組引數在配置ActiveMQ時已經定義,例如brokerURL引數,此引數傳入的是ActiveMQ服務地址和埠,支援openwire協議的預設連線為tcp://localhost:61616,支援stomp協議的預設連線為tcp://localhost:61613。

注:由於C++客戶端暫時僅支援stomp協議,所以需要使用tcp://localhost:61613。

ConnectionFactory支援併發。

Java客戶端:

ActiveMQConnectionFactory構造方法:

ActiveMQConnectionFactory();

ActiveMQConnectionFactory(String brokerURL);

ActiveMQConnectionFactory(String userName, String password, String brokerURL) ;      

ActiveMQConnectionFactory(String userName, String password, URI brokerURL) ;

ActiveMQConnectionFactory(URI brokerURL);

其中brokerURL為ActiveMQ服務地址和埠。

例如:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.135:61616");

或者

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

connectionFactory. setBrokerURL("tcp://192.168.0.135:61616");

4.2.2 Connection

在成功建立正確的ConnectionFactory後,下一步將是建立一個連線,它是JMS定義的一個介面。ConnectionFactory負責返回可以與底層訊息傳遞系統進行通訊的Connection實現。通常客戶端只使用單一連線。根據JMS文件,Connection的目的是“利用JMS提供者封裝開放的連線”,以及表示“客戶端與提供者服務例程之間的開放TCP/IP套接字”。該文件還指出Connection應該是進行客戶端身份驗證的地方,除了其他一些事項外,客戶端還可以指定惟一標誌符。

當一個Connection被建立時,它的傳輸預設是關閉的,必須使用start方法開啟。

一個Connection可以建立一個或多個的Session。    當一個程式執行完成後,必須關閉之前建立的Connection,否則ActiveMQ不能釋放資源,關閉一個Connection同樣也關閉了Session,MessageProducer和MessageConsumer。

Connection支援併發。

4.2.2.1 建立ConnectionJava客戶端:

ActiveMQConnectionFactory方法:

Connection createConnection();

Connection createConnection(String userName, String password);

例如:

Connection connection = connectionFactory.createConnection();

4.2.2.2 開啟ConnectionJava客戶端:

ActiveMQConnection方法:

void start();

例如:

Connection.start();

4.2.2.3  關閉ConnectionJava客戶端:

ActiveMQConnection方法:

void close();

例如:

Connection.close();

4.2.3 Session

一旦從ConnectionFactory中獲得一個Connection,就必須從Connection中建立一個或者多個Session。Session是一個傳送或接收訊息的執行緒,可以使用Session建立MessageProducer,MessageConsumer和Message。

Session可以被事務化,也可以不被事務化,通常,可以通過向Connection上的適當建立方法傳遞一個布林引數對此進行設定。

Java客戶端:

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted為使用事務標識,acknowledgeMode為簽收模式。

例如:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

4.2.4 Destination

Destination是一個客戶端用來指定生產訊息目標和消費訊息來源的物件。

在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題。在程式中可以使用多個Queue和Topic。

Java客戶端:

ActiveMQSession方法:

Queue createQueue(String queueName);

TemporaryQueue createTemporaryQueue();

Topic createTopic(String topicName);

TemporaryTopic createTemporaryTopic();

例如:

Destination destination = session.createQueue("TEST.FOO");

或者

Destination destination = session.createTopic("TEST.FOO");

4.2.5 MessageProducer

MessageProducer是一個由Session建立的物件,用來向Destination傳送訊息。

4.2.5.1 建立MessageProducerJava客戶端:

ActiveMQSession方法:

MessageProducer createProducer(Destination destination);

例如:

MessageProducer producer = session.createProducer(destination);

4.2.5.2 傳送訊息Java客戶端:

ActiveMQMessageProducer方法:

void send(Destination destination, Message message);

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode為傳送模式,priority為訊息優先順序,timeToLive為訊息過期時間。

例如:

producer.send(message);

4.2.6 MessageConsumer

MessageConsumer是一個由Session建立的物件,用來從Destination接收訊息。

4.2.6.1 建立MessageConsumerJava客戶端:

ActiveMQSession方法:

MessageConsumer createConsumer(Destination destination);

MessageConsumer createConsumer(Destination destination, String messageSelector); 

MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);

TopicSubscriber createDurableSubscriber(Topic topic, String name);

TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);

其中messageSelector為訊息選擇器;noLocal標誌預設為false,當設定為true時限制消費者只能接收和自己相同的連線(Connection)所釋出的訊息,此標誌只適用於主題,不適用於佇列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時需要設定此引數。

例如:

MessageConsumer consumer = session.createConsumer(destination);

4.2.6.2訊息的同步和非同步接收訊息的同步接收是指客戶端主動去接收訊息,客戶端可以採用MessageConsumer 的receive方法去接收下一個訊息。    訊息的非同步接收是指當訊息到達時,ActiveMQ主動通知客戶端。客戶端可以通過註冊一個實現MessageListener 介面的物件到MessageConsumer。MessageListener只有一個必須實現的方法 —— onMessage,它只接收一個引數,即Message。在為每個傳送到Destination的訊息實現onMessage時,將呼叫該方法。

Java客戶端:

ActiveMQMessageConsumer方法:

Message receive()

Message receive(long timeout)

Message receiveNoWait()

其中timeout為等待時間,單位為毫秒。

或者

實現MessageListener介面,每當訊息到達時,ActiveMQ會呼叫MessageListener中的onMessage 函式。

例如:

Message message = consumer.receive();

4.2.6.3訊息選擇器JMS提供了一種機制,使用它,訊息服務可根據訊息選擇器中的標準來執行訊息過濾。生產者可在訊息中放入應用程式特有的屬性,而消費者可使用基於這些屬性的選擇標準來表明對訊息是否感興趣。這就簡化了客戶端的工作,並避免了向不需要這些訊息的消費者傳送訊息的開銷。然而,它也使得處理選擇標準的訊息服務增加了一些額外開銷

訊息選擇器是用於MessageConsumer的過濾器,可以用來過濾傳入訊息的屬性和訊息頭部分(但不過濾訊息體),並確定是否將實際消費該訊息。按照JMS文件的說法,訊息選擇器是一些字串,它們基於某種語法,而這種語法是SQL-92的子集。可以將訊息選擇器作為MessageConsumer建立的一部分。

Java客戶端:

例如:

public final String SELECTOR = “JMSType = ‘TOPIC_PUBLISHER’”;

該選擇器檢查了傳入訊息的JMSType屬性,並確定了這個屬性的值是否等於TOPIC_PUBLISHER。如果相等,則訊息被消費;如果不相等,那麼訊息會被忽略。

4.2.7 Message

JMS程式的最終目的是生產和消費的訊息能被其他程式使用,JMS的 Message是一個既簡單又不乏靈活性的基本格式,允許建立不同平臺上符合非JMS程式格式的訊息。Message由以下幾部分組成:訊息頭,屬性和訊息體。

Java客戶端:

ActiveMQSession方法:

BlobMessage createBlobMessage(File file)

BlobMessage createBlobMessage(InputStream in)

BlobMessage createBlobMessage(URL url)

BlobMessage createBlobMessage(URL url, boolean deletedByBroker)

BytesMessage createBytesMessage()

MapMessage createMapMessage()

Message createMessage()

ObjectMessage createObjectMessage()

ObjectMessage createObjectMessage(Serializable object)

TextMessage createTextMessage()

TextMessage createTextMessage(String text)

例如:

下例演示建立併發送一個TextMessage到一個佇列:

TextMessage message = queueSession.createTextMessage();

message.setText(msg_text); // msg_text is a String

queueSender.send(message);

下例演示接收訊息並轉換為合適的訊息型別:

Message m = queueReceiver.receive();

if (m instanceof TextMessage) {

               TextMessage message = (TextMessage) m;               System.out.println("Reading message: " + message.getText());} else {               // Handle error}

4.3 可靠性機制

傳送訊息最可靠的方法就是在事務中傳送永續性的訊息,ActiveMQ預設傳送永續性訊息。結束事務有兩種方法:提交或者回滾。當一個事務提交,訊息被處理。如果事務中有一個步驟失敗,事務就回滾,這個事務中的已經執行的動作將被撤銷。

接收訊息最可靠的方法就是在事務中接收資訊,不管是從PTP模式的非臨時佇列接收訊息還是從Pub/Sub模式持久訂閱中接收訊息。

對於其他程式,低可靠性可以降低開銷和提高效能,例如傳送訊息時可以更改訊息的優先順序或者指定訊息的過期時間。

訊息傳送的可靠性越高,需要的開銷和頻寬就越多。效能和可靠性之間的折衷是設計時要重點考慮的一個方面。可以選擇生成和使用非永續性訊息來獲得最佳效能。另一方面,也可以通過生成和使用永續性訊息並使用事務會話來獲得最佳可靠性。在這兩種極端之間有許多選擇,這取決於應用程式的要求。

4.3.1 基本可靠性機制

4.3.1.1 控制訊息的簽收(Acknowledgment)客戶端成功接收一條訊息的標誌是這條訊息被簽收。成功接收一條訊息一般包括如下三個階段:

1.客戶端接收訊息;

2.客戶端處理訊息;

3.訊息被簽收。簽收可以由ActiveMQ發起,也可以由客戶端發起,取決於Session簽收模式的設定。

在帶事務的Session中,簽收自動發生在事務提交時。如果事務回滾,所有已經接收的訊息將會被再次傳送。

在不帶事務的Session中,一條訊息何時和如何被簽收取決於Session的設定。

1.Session.AUTO_ACKNOWLEDGE

當客戶端從receive或onMessage成功返回時,Session自動簽收客戶端的這條訊息的收條。在AUTO_ACKNOWLEDGE的Session中,同步接收receive是上述三個階段的一個例外,在這種情況下,收條和簽收緊隨在處理訊息之後發生。

2.Session.CLIENT_ACKNOWLEDGE

    客戶端通過呼叫訊息的acknowledge方法簽收訊息。在這種情況下,簽收發生在Session層面:簽收一個已消費的訊息會自動地簽收這個Session所有已消費訊息的收條。

3.Session.DUPS_OK_ACKNOWLEDGE

   此選項指示Session不必確保對傳送訊息的簽收。它可能引起訊息的重複,但是降低了Session的開銷,所以只有客戶端能容忍重複的訊息,才可使用(如果ActiveMQ再次傳送同一訊息,那麼訊息頭中的JMSRedelivered將被設定為true)。

Java客戶端:

簽收模式分別為:

1.  Session.AUTO_ACKNOWLEDGE

2.  Session.CLIENT_ACKNOWLEDGE

3.  Session.DUPS_OK_ACKNOWLEDGE

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

例如:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

對佇列來說,如果當一個Session終止時它接收了訊息但是沒有簽收,那麼ActiveMQ將保留這些訊息並將再次傳送給下一個進入佇列的消費者。

對主題來說,如果持久訂閱使用者終止時,它已消費未簽收的訊息也將被保留,直到再次傳送給這個使用者。對於非持久訂閱,AtiveMQ在使用者Session關閉時將刪除這些訊息。

如果使用佇列和持久訂閱,並且Session沒有使用事務,那麼可以使用Session的recover方法停止Session,再次啟動後將收到它第一條沒有簽收的訊息,事實上,重啟後Session一系列訊息的傳送都是以上一次最後一條已簽收訊息的下一條為起點。如果這時有訊息過期或者高優先順序的訊息到來,那麼這時訊息的傳送將會和最初的有所不同。對於非持久訂閱使用者,重啟後,ActiveMQ有可能刪除所有沒有簽收的訊息。

4.3.1.2 指定訊息傳送模式ActiveMQ支援兩種訊息傳送模式:PERSISTENT和NON_PERSISTENT兩種。

1.PERSISTENT(永續性訊息)

        這是ActiveMQ的預設傳送模式,此模式保證這些訊息只被傳送一次和成功使用一次。對於這些訊息,可靠性是優先考慮的因素。可靠性的另一個重要方面是確保永續性訊息傳送至目標後,訊息服務在向消費者傳送它們之前不會丟失這些訊息。這意味著在永續性訊息傳送至目標時,訊息服務將其放入永續性資料儲存。如果訊息服務由於某種原因導致失敗,它可以恢復此訊息並將此訊息傳送至相應的消費者。雖然這樣增加了訊息傳送的開銷,但卻增加了可靠性。

2.NON_PERSISTENT(非永續性訊息)

    保證這些訊息最多被傳送一次。對於這些訊息,可靠性並非主要的考慮因素。此模式並不要求永續性的資料儲存,也不保證訊息服務由於某種原因導致失敗後訊息不會丟失。

有兩種方法指定傳送模式:

1.使用setDeliveryMode方法,這樣所有的訊息都採用此傳送模式;

2.使用send方法為每一條訊息設定傳送模式;

Java客戶端:

傳送模式分別為:

1.  DeliveryMode.PERSISTENT

2.  DeliveryMode.NON_PERSISTENT

ActiveMQMessageProducer方法:

void setDeliveryMode(int newDeliveryMode);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode為傳送模式,priority為訊息優先順序,timeToLive為訊息過期時間。

例如:

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

如果不指定傳送模式,那麼預設是永續性訊息。如果容忍訊息丟失,那麼使用非永續性訊息可以改善效能和減少儲存的開銷。

4.3.1.3 設定訊息優先順序通常,可以確保將單個會話向目標傳送的所有訊息按其傳送順序傳送至消費者。然而,如果為這些訊息分配了不同的優先順序,訊息傳送系統將首先嚐試傳送優先順序較高的訊息。

有兩種方法設定訊息的優先順序:

1.使用setDeliveryMode方法,這樣所有的訊息都採用此傳送模式;

2.使用send方法為每一條訊息設定傳送模式;

Java客戶端:

ActiveMQMessageProducer方法:

void setPriority(int newDefaultPriority);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode為傳送模式,priority為訊息優先順序,timeToLive為訊息過期時間。

例如:

producer.setPriority(4);

訊息優先順序從0-9十個級別,0-4是普通訊息,5-9是加急訊息。如果不指定優先順序,則預設為4。JMS不要求嚴格按照這十個優先順序傳送訊息,但必須保證加急訊息要先於普通訊息到達。

4.3.1.4 允許訊息過期預設情況下,訊息永不會過期。如果訊息在特定週期內失去意義,那麼可以設定過期時間。

有兩種方法設定訊息的過期時間,時間單位為毫秒:

1.使用setTimeToLive方法為所有的訊息設定過期時間;

2.使用send方法為每一條訊息設定過期時間;

Java客戶端:

ActiveMQMessageProducer方法:

void setTimeToLive(long timeToLive);

或者

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

其中deliveryMode為傳送模式,priority為訊息優先順序,timeToLive為訊息過期時間。

例如:

producer.setTimeToLive(1000);

訊息過期時間,send 方法中的timeToLive 值加上傳送時刻的GMT 時間值。如果timeToLive值等於零,則JMSExpiration 被設為零,表示該訊息永不過期。如果傳送後,在訊息過期時間之後訊息還沒有被髮送到目的地,則該訊息被清除。

4.3.1.5 建立臨時目標ActiveMQ通過createTemporaryQueue和createTemporaryTopic建立臨時目標,這些目標持續到建立它的Connection關閉。只有建立臨時目標的Connection所建立的客戶端才可以從臨時目標中接收訊息,但是任何的生產者都可以向臨時目標中傳送訊息。如果關閉了建立此目標的Connection,那麼臨時目標被關閉,內容也將消失。

Java客戶端:

ActiveMQSession方法:

TemporaryQueue createTemporaryQueue();

TemporaryTopic createTemporaryTopic();

某些客戶端需要一個目標來接收對傳送至其他客戶端的訊息的回覆。這時可以使用臨時目標。Message的屬性之一是JMSReplyTo屬性,這個屬性就是用於這個目的的。可以建立一個臨時的Destination,並把它放入Message的JMSReplyTo屬性中,收到該訊息的消費者可以用它來響應生產者。

Java客戶端:

如下所示程式碼段,將建立臨時的Destination,並將它放置在TextMessage的JMSReplyTo屬性中:

// Create a temporary queue for replies...

Destination tempQueue = session.createTemporaryQueue();

// Set ReplyTo to temporary queue...

msg.setJMSReplyTo(tempQueue);

消費者接收這條訊息時,會從JMSReplyTo欄位中提取臨時Destination,並且會通過應用程式構造一個MessageProducer,以便將響應訊息傳送回生產者。這展示瞭如何使用JMS Message的屬性,並顯示了私有的臨時Destination的有用之處。它還展示了客戶端可以既是訊息的生產者,又可以是訊息的消費者。

// Get the temporary queue from the JMSReplyTo

// property of the message...

Destination tempQueue = msg.getJMSReplyTo();

...

// create a Sender for the temporary queue

MessageProducer Sender = session. createProducer(tempQueue);

TextMessage msg = session.createTextMessage();

msg.setText(REPLYTO_TEXT);

...

// Send the message to the temporary queue...

sender.send(msg);

4.3.2 高階可靠性機制

4.3.2.1 建立持久訂閱通過為釋出者設定PERSISTENT傳送模式,為訂閱者時使用持久訂閱,這樣可以保證Pub/Sub程式接收所有釋出的訊息。

訊息訂閱分為非持久訂閱(non-durable subscription)和持久訂閱(durable subscription),非持久訂閱只有當客戶端處於啟用狀態,也就是和ActiveMQ保持連線狀態才能收到傳送到某個主題的訊息,而當客戶端處於離線狀態,這個時間段發到主題的訊息將會丟失,永遠不會收到。持久訂閱時,客戶端向ActiveMQ註冊一個識別自己身份的ID,當這個客戶端處於離線時,ActiveMQ會為這個ID 儲存所有傳送到主題的訊息,當客戶端再次連線到ActiveMQ時,會根據自己的ID 得到所有當自己處於離線時傳送到主題的訊息。持久訂閱會增加開銷,同一時間在持久訂閱中只有一個啟用的使用者。

建立持久訂閱的步驟:

1.  為連線設定一個客戶ID;

2.  為訂閱的主題指定一個訂閱名稱;

上述組合必須唯一。

4.3.2.1.1 建立持久訂閱Java客戶端:

ActiveMQConnection方法:

void setClientID(String newClientID)

ActiveMQSession方法:

TopicSubscriber createDurableSubscriber(Topic topic, String name)

TopicSubscriber createDurableSubscriber(Topic topic, String name, String

messageSelector, boolean noLocal)

其中messageSelector為訊息選擇器;noLocal標誌預設為false,當設定為true時限制消費者只能接收和自己相同的連線(Connection)所釋出的訊息,此標誌只適用於主題,不適用於佇列;name標識訂閱主題所對應的訂閱名稱,持久訂閱時需要設定此引數。

4.3.2.1.2 刪除持久訂閱Java客戶端:

ActiveMQSession方法:

void unsubscribe(String name);

4.3.2.2 使用本地事務在事務中生成或使用訊息時,ActiveMQ跟蹤各個傳送和接收過程,並在客戶端發出提交事務的呼叫時完成這些操作。如果事務中特定的傳送或接收操作失敗,則出現異常。客戶端程式碼通過忽略異常、重試操作或回滾整個事務來處理異常。在事務提交時,將完成所有成功的操作。在事務進行回滾時,將取消所有成功的操作。

本地事務的範圍始終為一個會話。也就是說,可以將單個會話的上下文中執行的一個或多個生產者或消費者操作組成一個本地事務。

不但單個會話可以訪問 Queue 或 Topic (任一型別的 Destination ),而且單個會話例項可以用來操縱一個或多個佇列以及一個或多個主題,一切都在單個事務中進行。這意味著單個會話可以(例如)建立佇列和主題中的生產者,然後使用單個事務來同時傳送佇列和主題中的訊息。因為單個事務跨越兩個目標,所以,要麼佇列和主題的訊息都得到傳送,要麼都未得到傳送。類似地,單個事務可以用來接收佇列中的訊息並將訊息傳送到主題上,反過來也可以。

由於事務的範圍只能為單個的會話,因此不存在既包括訊息生成又包括訊息使用的端對端事務。(換句話說,至目標的訊息傳送和隨後進行的至客戶端的訊息傳送不能放在同一個事務中。)

4.3.2.2.1 使用事務Java客戶端:

ActiveMQConnection方法:

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted為使用事務標識,acknowledgeMode為簽收模式。

例如:

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

4.3.2.2.2 提交Java客戶端:

ActiveMQSession方法:

void commit();

例如:

try {

        producer.send(consumer.receive());

        session.commit();

}

catch (JMSException ex) {

        session.rollback();

}

4.3.2.2.3 回滾Java客戶端:

ActiveMQSession方法:

void rollback();

4.4 高階特徵

4.4.1 非同步傳送訊息

ActiveMQ支援生產者以同步或非同步模式傳送訊息。使用不同的模式對send方法的反應時間有巨大的影響,反映時間是衡量ActiveMQ吞吐量的重要因素,使用非同步傳送可以提高系統的效能。

在預設大多數情況下,AcitveMQ是以非同步模式傳送訊息。例外的情況:在沒有使用事務的情況下,生產者以PERSISTENT傳送模式傳送訊息。在這種情況下,send方法都是同步的,並且一直阻塞直到ActiveMQ發回確認訊息:訊息已經儲存在永續性資料儲存中。這種確認機制保證訊息不會丟失,但會造成生產者阻塞從而影響反應時間。

高效能的程式一般都能容忍在故障情況下丟失少量資料。如果編寫這樣的程式,可以通過使用非同步傳送來提高吞吐量(甚至在使用PERSISTENT傳送模式的情況下)。

Java客戶端:

使用Connection URI配置非同步傳送:

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

在ConnectionFactory層面配置非同步傳送:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

在Connection層面配置非同步傳送,此層面的設定將覆蓋ConnectionFactory層面的設定:

((ActiveMQConnection)connection).setUseAsyncSend(true);

4.4.2 消費者特色

4.4.2.1 消費者非同步分派在ActiveMQ4中,支援ActiveMQ以同步或非同步模式向消費者分派訊息。這樣的意義:可以以非同步模式向處理訊息慢的消費者分配訊息;以同步模式向處理訊息快的消費者分配訊息。

ActiveMQ預設以同步模式分派訊息,這樣的設定可以提高效能。但是對於處理訊息慢的消費者,需要以非同步模式分派。

Java客戶端:

在ConnectionFactory層面配置同步分派:

((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);

在Connection層面配置同步分派,此層面的設定將覆蓋ConnectionFactory層面的設定:

((ActiveMQConnection)connection).setDispatchAsync(false);

在消費者層面以Destination URI配置同步分派,此層面的設定將覆蓋ConnectionFactory和Connection層面的設定:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");

consumer = session.createConsumer(queue);

4.4.2.2 消費者優先順序在ActveMQ分散式環境中,在有消費者存在的情況下,如果更希望ActveMQ傳送訊息給消費者而不是其他的ActveMQ到ActveMQ的傳送,可以如下設定:

Java客戶端:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prority=10");

consumer = session.createConsumer(queue);

4.4.2.3 獨佔的消費者ActiveMQ維護佇列訊息的順序並順序把訊息分派給消費者。但是如果建立了多個Session和MessageConsumer,那麼同一時刻多個執行緒同時從一個佇列中接收訊息時就並不能保證處理時有序。

有時候有序處理訊息是非常重要的。ActiveMQ4支援獨佔的消費。ActiveMQ挑選一個MessageConsumer,並把一個佇列中所有訊息按順序分派給它。如果消費者發生故障,那麼ActiveMQ將自動故障轉移並選擇另一個消費者。可以如下設定:

Java客戶端:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

consumer = session.createConsumer(queue);

4.4.2.4 再次傳送策略在以下三種情況中,訊息會被再次傳送給消費者:

1.在使用事務的Session中,呼叫rollback()方法;

2.在使用事務的Session中,呼叫commit()方法之前就關閉了Session;

3.在Session中使用CLIENT_ACKNOWLEDGE簽收模式,並且呼叫了recover()方法。

可以通過設定ActiveMQConnectionFactory和ActiveMQConnection來定製想要的再次傳送策略。

屬性

預設值

描述

collisionAvoidanceFactor

0.15

The percentage of range of collision avoidance if enabled

maximumRedeliveries

6

Sets the maximum number of times a message will be redelivered before it is considered a poisoned pill and returned to the broker so it can go to a Dead Letter Queue

initialRedeliveryDelay

1000L

The initial redelivery delay in milliseconds

useCollisionAvoidance

false

Should the redelivery policy use collision avoidance

useExponentialBackOff

false

Should exponential back-off be used (i.e. to exponentially increase the timeout)

backOffMultiplier

5

The back-off multiplier

4.4.3 目標特色

4.4.3.1 複合目標在1.1版本之後,ActiveMQ支援混合目標技術。它允許在一個JMS目標中使用一組JMS目標。

例如可以利用混合目標在同一操作中用向12個佇列傳送同一條訊息或者在同一操作中向一個主題和一個佇列傳送同一條訊息。

在混合目標中,通過“,”來分隔不同的目標。

Java客戶端:

例如:

// send to 3 queues as one logical operation

Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");

producer.send(queue, someMessage);

如果在一個目標中混合不同類別的目標,可以通過使用“queue://”和“topic://”字首來識別不同的目標。

例如:

// send to queues and topic one logical operation

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

producer.send(queue, someMessage);

4.4.3.2 目標選項

屬性

預設值

描述

consumer.prefetchSize

variable

The number of message the consumer will .

consumer.maximumPendingMessageLimit

0

Use to control if messages are dropped if a situation exists.

consumer.noLocal

false

Same as the noLocal flag on a Topic consumer. Exposed here so that it can be used with a queue.

consumer.dispatchAsync

false

consumer.retroactive

false

consumer.selector

null

JMS Selector used with the consumer.

consumer.exclusive

false

consumer.priority

0

Java客戶端:

例如:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

4.4.4 訊息預取

ActiveMQ的目標之一就是高效能的資料傳送,所以ActiveMQ使用“預取限制”來控制有多少訊息能及時的傳送給任何地方的消費者。

一旦預取數量達到限制,那麼就不會有訊息被分派給這個消費者直到它發回簽收訊息(用來標識所有的訊息已經被處理)。

可以為每個消費者指定訊息預取。如果有大量的訊息並且希望更高的效能,那麼可以為這個消費者增大預取值。如果有少量的訊息並且每條訊息的處理都要花費很長的時間,那麼可以設定預取值為1,這樣同一時間,ActiveMQ只會為這個消費者分派一條訊息。

Java客戶端:

在ConnectionFactory層面為所有消費者配置預取值:

tcp://localhost:61616?jms.prefetchPolicy.all=50

在ConnectionFactory層面為佇列消費者配置預取值:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

使用“目標選項”為一個消費者配置預取值:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

4.4.5 配置連線URL

ActiveMQ支援通過Configuration URI明確的配置連線屬性。

例如:當要設定非同步傳送時,可以通過在Configuration URI中使用jms.$PROPERTY來設定。

tcp://localhost:61616?jms.useAsyncSend=true

以下的選項在URI必須以“jms.”為字首。

屬性

預設值

描述

alwaysSessionAsync

true

If this flag is set then a seperate thread is not used for dispatching messages for each Session in the Connection. However, a separate thread is always used if there is more than one session, or the session isn't in auto acknowledge or dups ok mode

clientID

null

Sets the JMS clientID to use for the connection

closeTimeout

15000 (milliseconds)

Sets the timeout before a close is considered complete. Normally a close() on a connection waits for confirmation from the broker; this allows that operation to timeout to save the client hanging if there is no broker.

copyMessageOnSend

true

Should a JMS message be copied to a new JMS Message object as part of the send() method in JMS. This is enabled by default to be compliant with the JMS specification. You can disable it if you do not mutate JMS messages after they are sent for a performance boost.

disableTimeStampsByDefault

false

Sets whether or not timestamps on messages should be disabled or not. If you disable them it adds a small performance boost.

dispatchAsync

false

nestedMapAndListEnabled

true

Enables/disables whether or not are supported so that Message properties and MapMessage entries can contain nested Map and List objects. Available since version 4.1 onwards

objectMessageSerializationDefered

false

When an object is set on an ObjectMessage, the JMS spec requires the object to be serialized by that set method. Enabling this flag causes the object to not get serialized. The object may subsequently get serialized if the message needs to be sent over a socket or stored to disk.

optimizeAcknowledge

false

Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually. Alternatively, you could use Session.DUPS_OK_ACKNOWLEDGE acknowledgement mode for the consumers which can often be faster. WARNINGenabling this issue could cause some issues with auto-acknowledgement on reconnection

optimizedMessageDispatch

true

If this flag is set then an larger prefetch limit is used - only applicable for durable topic subscribers 

useAsyncSend

false

Forces the use of  which adds a massive performance boost; but means that the send() method will return immediately whether the message has been sent or not which could lead to message loss.

useCompression

false

Enables the use of compression of the message bodies

useRetroactiveConsumer

false

Sets whether or not retroactive consumers are enabled. Retroactive consumers allow non-durable topic subscribers to receive old messages that were published before the non-durable subscriber started.

4.5 優化

    優化部分請參閱:http://devzone.logicblaze.com/site/how-to-tune-activemq.html

5. ActiveMQ配置

5.1 配置檔案

ActiveMQ配置檔案:$AcrtiveMQ/conf/activemq.xml

5.2 配置ActiveMQ服務IP和埠

    <transportConnectors>

       <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>

       <transportConnector name="ssl"     uri="ssl://localhost:61617"/>

       <transportConnector name="stomp"   uri="stomp://localhost:61613"/>

    </transportConnectors>

在transportConnectors標識中配置ActiveMQ服務IP和埠,其中name屬性指定協議的名稱,uri屬性指定協議所對應的協議名,IP地址和埠號。上述IP地址和埠可以根據實際需要指定。Java客戶端預設使用openwire協議,所以ActiveMQ服務地址為tcp://localhost:61616;目前C++客戶端僅支援stomp協議,所以ActiveMQ服務地址為tcp://localhost:61613。

5.3 分散式部署

分散式部署請參閱:http://activemq.apache.org/networks-of-brokers.html

5.4 監控ActiveMQ

本節將使用JXM和JXM控制檯(JDK1.5控制檯)監控ActiveMQ。

5.4.1 配置JXM

<broker brokerName="emv219" useJmx="true" xmlns="http://activemq.org/config/1.0">

<managementContext>

          <managementContext connectorPort="1099" jmxDomainName="org.apache.activemq"/>

</managementContext>

</broker>

配置JXM步驟如下:

1. 設定broker標識的useJmx屬性為true;

2. 取消對managementContext標識的註釋(系統預設註釋managementContext標識),監控的預設埠為1099。

5.4.2 在Windows平臺監控進入%JAVA_HOME%/bin,雙擊jconsole.exe即出現如下畫面,在對話方塊中輸入ActiveMQ服務主機的地址,JXM的埠和主機登陸帳號。 

6. 目前存在問題

6.1 C++客戶端丟失訊息問題ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT

C++客戶端版本:ActiveMQ CPP 1.1 Release

測試中發現,當C++客戶端異常退出時(即沒有正常呼叫close函式關閉連線),ActiveMQ並不能檢測到C++客戶端的連線已經中斷,這時如果向佇列中傳送訊息,那麼第一條訊息就會丟失,這時ActiveMQ才能檢測到這個連線是中斷的。

在ActiveMQ論壇反應此問題後,開發人員答覆並建議使用CLIENT_ACKNOWLEDGE簽收模式。但是此模式會造成訊息重複接收。

    測試ActiveMQ 4.2SNAPSHOT時並未發現上述問題。

6.2 佇列訊息堆積過多後有可能阻塞程式

預設activemq.xml中配置的記憶體是20M,這就意味著當訊息堆積超過20M後,程式可能出現問題。在mial list中其他使用者對此問題的描述是:send方法會阻塞或丟擲異常。ActiveMQ開發人員的答覆:The memory model is different for ActiveMQ 4.1 in that for Queues, only small references to the Queue messages are held in memory. This means that the Queue depth can be considerably bigger than for ActiveMQ 3.2.x.However, our next major release (5.0 nee 4.2) has a more robust model in that Queue messages are paged in from storage only when space is available - hence Queue depth is now limited by how much disk space you have.

6.3 目前版本的C++客戶端僅支援stomp協議

目前版本的C++客戶端程式(ActiveMQ CPP 1.1 Release)僅支援stomp協議,因此傳輸訊息的速度應該沒有使用openwire協議的Java客戶端快。ActiveMQ網站顯示不久將會有支援openwire協議的C++客戶端程式釋出。

6.4 分散式部署問題

ActiveMQ版本:ActiveMQ 4.1.1SNAPSHOT和ActiveMQ 4.2SNAPSHOT

測試選用上述兩個未正式釋出的版本,未選用正式釋出的ActiveMQ 4.1.0 Release版本是因為此版本bug較多。

在測試中發現,如果重啟其中一臺機器上的ActiveMQ,其他機器的ActiveMQ有可能會列印:

    java.io.EOFException

        at java.io.DataInputStream.readInt(DataInputStream.java:358)

        at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267)

        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156)

        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)

        at java.lang.Thread.run(Thread.java:595)

WARN  TransportConnection            - Unexpected extra broker info command received: BrokerInfo {commandId = 6, responseRequired = false, brokerId = ID:emv219n-33945-1174458770157-1:0, brokerURL = tcp://emv219n:61616, slaveBroker = false, masterBroker = false, faultTolerantConfiguration = false, networkConnection = false, duplexConnection = false, peerBrokerInfos = [], brokerName = emv219, connectionId = 0}.

INFO  FailoverTransport