JMS : Java Message Service (Java訊息服務)
1 引言
1.1 編寫目的
本文作為B2bi專案中開源產品JORAM的使用指導文件,旨在幫助專案組人員方便明瞭的進行JMS模組的詳細設計和開發工作。本文件主要包含建設銀行EAI平臺B2Bi子系統中使用的開源JMS產品——JORAM的使用說明
1.2 名詞解釋
B2Bi:
Business to Business integration (企業間整合)
JMS:
Java Message Service (Java訊息服務)
JORAM:
ObjectWeb的Java開源專案
JNDI:
Java命名和目錄介面
1.3 參考資料
《Joram-4.3-en.pdf》——JORAM使用手冊(英文)
《Joram4_0_SAMPLES.pdf》——JORAM使用舉例(英文)
《Joram4_0_ADMIN.pdf》——JORAM管理員手冊(英文)
2 JMS簡介
2.1 JMS基本概念
JMS(Java Message Service)是訪問企業訊息系統的標準API,它便於訊息系統中的Java應用程式進行訊息交換,並且通過提供標準的產生、傳送、接收訊息的介面簡化企業應用的開發。
JMS應用由以下幾部分組成:
JMS provider :是一個訊息系統,它實現了JMS 介面並提供管理和控制的功能。
JMS clients :是用Java語言寫的一些程式和元件,它們產生和使用訊息。
Messages :是在JMS clients之間傳遞的訊息的物件。
Administered objects :是由使用JMS clients 的人生成的預選設定好的JMS 物件。有兩種這樣的物件:destinations和connection factories。
2.2 JMS基本功能
JMS是用於和麵向訊息的中介軟體相互通訊的應用程式介面。它既支援點對點(point-to-point)的域,又支援釋出/訂閱 (publish/subscribe)型別的域,並且提供對下列型別的支援:經認可的訊息傳遞,事務型訊息的傳遞,一致性訊息和具有永續性的訂閱者支 持。JMS還提供了另一種方式來對您的應用與舊的後臺系統相整合。
2.3 訊息服務型別
1) point-to-point (PTP)方式:點到點的模型。訊息由一個JMS客戶機(釋出者)傳送到伺服器上的一個目的地,即一個佇列(queue)。而另一個JMS客戶機(訂閱者)則可以訪問這個佇列,並從該伺服器獲取這條訊息。
point-to-point (PTP)方式有以下特點:
a) 每一個message只有一個使用者。
b) 一個message的sender和receiver沒有時間上的依賴關係。無論sendere有沒有在執行,Receiver都可提取message。
c) Receiver完成對message處理這後,發出確認。
d) 當你所發出的每一個訊息必須由一個使用者成功處理的情況下,使用 PTP messaging機制。
2) publish/subscribe (pub/sub)方式:釋出-訂閱模型。這裡仍然是由一個JMS客戶機將一條訊息釋出到伺服器上的一個目的地上,但是這次這個目的地叫做一個主題 (topic),可有多個訂閱者去訪問該訊息。訊息將一直維持在主題中,直到這個主題的所有訂閱者都取走了該訊息的一個副本。訊息也包括了一個引數,用於 定義了該訊息的耐久性(它能夠在伺服器上等待訂閱者多長時間)。
Pub/sub messaging有如下的特點:
a) 每一個message可以有多個使用者;
b) Publishers和subscribers在時間上有依賴關係。一個訂閱了某一個topic的客戶,只能使用在它生成訂閱之後釋出的message, 並且subscriber必須一直保持活動狀態。
c) JMS API允許客戶生成永續性的訂閱,從而在某種程度上放寬了這種時間上的依賴關係,提高了靈活性處可靠性。
3) Messaging的使用
Messaging本身是非同步的,使message的使用者之間沒有時間上的依賴關係。但是,JMS規範給出了更精確的定義,使Message可以以兩種方式被使用:
a) Synchronously同步:subscriber或receiver可以通過呼叫receive方法實時地從destination上提取message。Receive方法在收到一個 message後結束,或當message 在一定的時間限制內沒有收到時超時結束。
b) Asynchronously非同步:客戶可以為某一個使用者註冊一個message listener。message listener和event listener很相似。當一個message到達了destination, JMS provider通過呼叫listener的onMessage方法將message傳遞過去,由onMessage方法負責處理message。
更詳細的JMS規範可參考SUN相關文件。
2.4 JMS介面類圖
2.5 JMS基本檢視
1.JMS介面描述
JMS 支援兩種訊息型別PTP 和Pub/Sub,分別稱作:PTP Domain 和Pub/Sub Domain,這兩種介面都繼承統一的JMS父介面,JMS 主要介面如下所示:
MS父介面 PTP Pub/Sub
ConnectionFactory QueueConnectionFactory TopicConnectionFactory
Connection QueueConnection TopicConnection
Destination Queue Topic
Session QueueSession TopicSession
MessageProducer QueueSender TopicPublisher
MessageConsumer QueueReceiver,QueueBrowse r TopicSubscriber
ConnectionFactory :連線工廠,JMS 用它建立連線
Connection :JMS 客戶端到JMS Provider 的連線
Destination :訊息的目的地
Session: 一個傳送或接收訊息的執行緒
MessageProducer: 由Session 物件建立的用來發送訊息的物件
MessageConsumer: 由Session 物件建立的用來接收訊息的物件
2.JMS訊息模型
JMS 訊息由以下幾部分組成:訊息頭,屬性,訊息體。
2.1 訊息頭(Header) - 訊息頭包含訊息的識別資訊和路由資訊,訊息頭包含一些標準的屬性如:JMSDestination,JMSMessageID 等。
訊息頭 由誰設定
JMSDestination send 或 publish 方法
JMSDeliveryMode send 或 publish 方法
JMSExpiration send 或 publish 方法
JMSPriority send 或 publish 方法
JMSMessageID send 或 publish 方法
JMSTimestamp send 或 publish 方法
JMSCorrelationID 客戶
JMSReplyTo 客戶
JMSType 客戶
JMSRedelivered JMS Provider
2.2 屬性(Properties) - 除了訊息頭中定義好的標準屬性外,JMS 提供一種機制增加新屬性到訊息頭 中,這種新屬性包含以下幾種:
1. 應用需要用到的屬性;
2. 訊息頭中原有的一些可選屬性;
3. JMS Provider 需要用到的屬性。
標準的JMS 訊息頭包含以下屬性:
JMSDestination 訊息傳送的目的地
JMSDeliveryMode 傳遞模式, 有兩種模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示該訊息一定要被送到目的地,否則會導致應用錯誤。NON_PERSISTENT 表示偶然丟失該訊息是被允許的,這兩種模式使開發者可以在訊息傳遞的可靠性和吞吐量之間找到平衡點。
JMSMessageID 唯一識別每個訊息的標識,由JMS Provider 產生。
JMSTimestamp 一個訊息被提交給JMS Provider 到訊息被髮出的時間。
JMSCorrelationID 用來連線到另外一個訊息,典型的應用是在回覆訊息中連線到原訊息。
JMSReplyTo 提供本訊息回覆訊息的目的地址
JMSRedelivered 如果一個客戶端收到一個設定了JMSRedelivered 屬性的訊息,則表示可能該客戶端曾經在早些時候收到過該訊息,但並沒有簽收(acknowledged)。
JMSType 訊息型別的識別符。
JMSExpiration 訊息過期時間,等於QueueSender 的send 方法中的timeToLive 值或TopicPublisher 的publish 方法中的timeToLive 值加上傳送時刻的GMT 時間值。如果timeToLive值等於零,則JMSExpiration 被設為零,表示該訊息永不過期。如果傳送後,在訊息過期時間之後訊息還沒有被髮送到目的地,則該訊息被清除。
JMSPriority 訊息優先順序,從0-9 十個級別,0-4 是普通訊息,5-9 是加急訊息。JMS 不要求JMS Provider 嚴格按照這十個優先順序傳送訊息,但必須保證加急訊息要先於普通訊息到達。
2.3 訊息體(Body) - JMS API 定義了5種訊息體格式,也叫訊息型別,你可以使用不同形式傳送接收 資料並可以相容現有的訊息格式,下面描述這5種類型:
訊息型別 訊息體
TextMessage java.lang.String物件,如xml檔案內容
MapMessage 名/值對的集合,名是String物件,值型別可以是Java任何基本型別
BytesMessage 位元組流
StreamMessage Java中的輸入輸出流
ObjectMessage Java中的可序列化物件
Message 沒有訊息體,只有訊息頭和屬性
下例演示建立併發送一個TextMessage到一個佇列:
TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String
queueSender.send(message);
下例演示接收訊息並轉換為合適的訊息型別:
Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}
3. 訊息的同步非同步接收
訊息的同步接收是指客戶端主動去接收訊息,JMS 客戶端可以採用MessageConsumer 的receive方法去接收下一個訊息。
訊息的非同步接收是指當訊息到達時,主動通知客戶端。JMS 客戶端可以通過註冊一個實 現MessageListener 介面的物件到MessageConsumer,這樣,每當訊息到達時,JMS Provider 會呼叫MessageListener中的onMessage 方法。
4. PTP模型
PTP(Point-to-Point)模型是基於佇列的,傳送方發訊息到佇列,接收方從佇列接收訊息,佇列的存在使得訊息的非同步傳輸成為可能。和郵件系統中的郵箱一樣,佇列可以包含各種訊息,JMS Provider 提 供工具管理佇列的建立、刪除。JMS PTP 模型定義了客戶端如何向佇列傳送訊息,從佇列接收訊息,瀏覽佇列中的訊息。
下面描述JMS PTP 模型中的主要概念和物件:
名稱 描述
Queue 由JMS Provider 管理,佇列由佇列名識別,客戶端可以通過JNDI 介面用佇列名得到一個佇列物件。
TemporaryQueue 由QueueConnection 建立,而且只能由建立它的QueueConnection 使用。
QueueConnectionFactory 客戶端用QueueConnectionFactory 建立QueueConnection 物件。
QueueConnection 一個到JMS PTP provider 的連線,客戶端可以用QueueConnection 建立QueueSession 來發送和接收訊息。
QueueSession 提供一些方法建立QueueReceiver 、QueueSender、QueueBrowser 和TemporaryQueue。如果在QueueSession 關閉時,有一些訊息已經被收到,但還沒有被簽收(acknowledged),那麼,當接收者下次連線到相同的佇列時,這些訊息還會被再次接收。
QueueReceiver 客戶端用QueueReceiver 接收佇列中的訊息,如果使用者在QueueReceiver 中設定了訊息選擇條件,那麼不符合條件的訊息會留在佇列中,不會被接收到。
QueueSender 客戶端用QueueSender 傳送訊息到佇列。
QueueBrowser 客戶端可以QueueBrowser 瀏覽佇列中的訊息,但不會收走訊息。
QueueRequestor JMS 提供QueueRequestor 類簡化訊息的收發過程。QueueRequestor 的建構函式有兩個引數:QueueSession 和queue,QueueRequestor 通過建立一個臨時佇列來完成最終的收發訊息請求。
可靠性(Reliability) 佇列可以長久地儲存訊息直到接收者收到訊息。接收者不需要因為擔心訊息會丟失而時刻和佇列保持啟用的連線狀態,充分體現了非同步傳輸模式的優勢。
5. PUB/SUB模型
JMS Pub/Sub 模型定義瞭如何向一個內容節點發布和訂閱訊息,這些節點被稱作主題(topic)。
主題可以被認為是訊息的傳輸中介,釋出者(publisher)釋出訊息到主題,訂閱者(subscribe)從主題訂閱訊息。主題使得訊息訂閱者和訊息釋出者保持互相獨立,不需要接觸即可保證訊息的傳送。
下面描述JMS Pub/Sub 模型中的主要概念和物件:
名稱 描述
訂閱(subscription) 訊息訂閱分為非持久訂閱(non-durable subscription)和持久訂閱(durable subscrip-tion),非持久訂閱只有當客戶端處於啟用狀態,也就是和JMS Provider 保持連線狀態才能收到傳送到某個主題的訊息,而當客戶端處於離線狀態,這個時間段發到主題的訊息將會丟失,永遠不會收到。持久訂閱時,客戶端向JMS 註冊一個識別自己身份的ID,當這個客戶端處於離線時,JMS Provider 會為這個ID 儲存所有傳送到主題的訊息,當客戶再次連線到JMS
Provider時,會根據自己的ID 得到所有當自己處於離線時傳送到主題的訊息。
Topic 主題由JMS Provider 管理,主題由主題名識別,客戶端可以通過JNDI 介面用主題名得到一個主題物件。JMS 沒有給出主題的組織和層次結構的定義,由JMS Provider 自己定義。
TemporaryTopic 臨時主題由TopicConnection 建立,而且只能由建立它的TopicConnection 使用。臨時主題不能提供持久訂閱功能。
TopicConnectionFactory 客戶端用TopicConnectionFactory 建立TopicConnection 物件。
TopicConnection TopicConnection 是一個到JMS Pub/Sub provider 的連線,客戶端可以用TopicConnection建立TopicSession 來發布和訂閱訊息。
TopicSession TopicSession 提供一些方法建立TopicPublisher、TopicSubscriber、TemporaryTopic 。它還提供unsubscribe 方法取消訊息的持久訂閱。
TopicPublisher 客戶端用TopicPublisher 釋出訊息到主題。
TopicSubscriber 客戶端用TopicSubscriber 接收發布到主題上的訊息。可以在TopicSubscriber 中設定訊息過濾功能,這樣,不符合要求的訊息不會被接收。
Durable TopicSubscriber 如果一個客戶端需要持久訂閱訊息,可以使用Durable TopicSubscriber,TopSession 提供一個方法createDurableSubscriber建立Durable TopicSubscriber 物件。
恢復和重新派送(Recovery and Redelivery) 非持久訂閱狀態下,不能恢復或重新派送一個未簽收的訊息。只有持久訂閱才能恢復或重新派送一個未簽收的訊息。
TopicRequestor JMS 提供TopicRequestor 類簡化訊息的收發過程。TopicRequestor 的建構函式有兩個引數:TopicSession 和topic。TopicRequestor 通過建立一個臨時主題來完成最終的釋出和接收訊息請求。
可靠性(Reliability) 當所有的訊息必須被接收,則用持久訂閱模式。當丟失訊息能夠被容忍,則用非持久訂閱模式。
3 JMS API程式設計模型
一個JMS應用由以下幾個模組組成:
3.1 Administered Objects
JMS應用的destinations和connection factories最後是通過管理而不是程式設計來使用,因為不同的provider使用他們的方法不一樣。
JMS 客戶應該使用統一的介面得到這些objects,從而使用JMS應用可以執行在不同provider上,而不需要修改或修改很少。通常管理員在JNDI上設定administered objects, 然後JMS clients 在JNDI上look up這些物件。
a) Connection Factories:
connection factory 是client用來生成與provider的connection的物件。connection factory封裝了一套由管理員定義的connection configuration引數。每個connection factory 是一個QueueConnectionFactory 或 TopicConnectionFactory介面的例項。
在JMS 客戶程式中, 通常先執行connection factory 的JNDI API lookup。 如下例:
Context ctx = new InitialContext();
QueueConnectionFactory queueConnectionFactory =
(QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
TopicConnectionFactory topicConnectionFactory =
(TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
如果呼叫不帶引數的InitialContext的lookup方法,就在當前classpath 找jndi.properties檔案。
b) Destinations:
Destination 是一個物件用於定義使用者產生的messages 的去向或使用者使用的messages 的來源。
在PTP裡,destinations被稱作queues, 可以用下面的命令來生成它們:
Queue queue = (Queue) Queue.create("queue");
在pub/sub裡, destinations被稱為topics, 可以用下面的J2EE SDK command 來生成它們:
Topic topic = (Topic) Topic.create("topic");
一個JMS應用可以同時使用多個queues 和/或topics。
除了lookup connection factory, 也常要lookup destination。例如:
Topic myTopic = (Topic) ctx.lookup("MyTopic");
Queue myQueue = (Queue) ctx.lookup("MyQueue");
3.2 Connection
Connection封裝了一個與JMS provider的虛擬連線。Connection表示在client和provider service daemon之間開啟的TCP/IP socket。可以用connection 生成一個或多個sessions。
就象connection factories, connections有兩種方式:實現QueueConnection或TopicConnection介面。例如, 當有一個QueueConnectionFactory 或TopicConnectionFactory物件, 可以用他們來創造一個connection:
QueueConnection queueConnection =
queueConnectionFactory.createQueueConnection();
TopicConnection topicConnection =
topicConnectionFactory.createTopicConnection();
注意:當應用程式完成後, 必須關閉你所建立的connections。否則JMS provider 無法釋放資源。關閉了connection同時也關閉了sessions和message產生者和message使用者。
queueConnection.close();
topicConnection.close();
在使用messages前, 必須呼叫connection的start方法。如果要暫時停止傳送message而不關閉connection, 可以呼叫stop方法。
connection factory 是client用來生成與provider的connection的物件。connection factory封裝了一套由管理員定義的connection configuration引數。每個connection factory 是一個QueueConnectionFactory 或 TopicConnectionFactory介面的例項。
3.3 Session
Session是單執行緒的context用於產生和使用messages。用Session建立 message producers、message consumers和messages。Session管理message listeners的執行順序。
Ssession提供事務模式,用於將一系列的sends和receives動作組合在一個工作單元裡。
Session被標記為事務模式的話,確認訊息就通過確認和校正來自動地處理。如果session沒有標記為事務模式,有三個用於訊息確認的選項:
• AUTO_ACKNOWLEDGE session將自動地確認收到一則訊息。
• CLIENT_ACKNOWLEDGE 客戶端程式將確認收到一則訊息,呼叫這則訊息的確認方法。
• DUPS_OK_ACKNOWLEDGE 這個選項命令session“懶散的”確認訊息傳遞,可以想到,這將導致訊息提供者傳遞的一些複製訊息可能會出錯。這種確認的方式只應當用於訊息消費程式可以容忍潛在的副本訊息存在的情況。
Sessions, 就象connections, 也有兩種方式:實現QueueSession或TopicSession介面。例如:
TopicSession topicSession =
topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
第一個引數表示sessiong不實現事務處理; 第二個引數表示當session成功收到messages後自動確認。
相同的,可以用QueueConnection物件建立QueueSession:
QueueSession queueSession =
queueConnection.createQueueSession(true, 0);
這裡, 第一個引數表示session實現事務處理; 第二個引數表示當session成功收到messages後不自動確認。
3.4 Message Producers—訊息生產者
message producer是由session建立的一個物件,用於將messages傳遞到目的地。PTP方式的message producer實現QueueSender介面。 pub/sub方式的message producer實現TopicPublisher介面。
例如::
QueueSender queueSender = queueSession.createSender(myQueue);
TopicPublisher topicPublisher = topicSession.createPublisher(myTopic);
用null作為createSender或createPublisher的引數,可以建立一個不確定的producer。用不確定的producer, 可以等到真正send或publish message的時候才指定destination
當建立了一個message producer, 就可以用它來發送messages。例如:
queueSender.send(message);
topicPublisher.publish(message);
3.5 Message Consumer—訊息消費者
message consumer也是由session建立的一個物件,用於接收發送到目的地的訊息。message consumer允許JMS client到JMS provider註冊感興趣的目的地。JMS provider管理messages從destination到註冊了這個destination的consumers之間的傳送。
PTP方式的message consumer實現QueueReceiver介面。pub/sub方式的message consumer實現TopicSubscriber介面。
例如:
QueueReceiver queueReceiver = queueSession.createReceiver(myQueue);
TopicSubscriber topicSubscriber = topicSession.createSubscriber(myTopic);
可以用TopicSession.createDurableSubscriber方法建立一個durable topic subscriber(非同步訊息訂閱 )。
當建立了一個message consumer, 它就是活動的,就可以用它接收messages。可以用QueueReceiver 或TopicSubscriber的close方法把message consumer變成非活動的。Message的傳送在呼叫了connection的start方法後才開始。
不論是QueueReceiver或TopicSubscriber, 都可以用receive方法來同步consume message。 可以在呼叫start方法後的任何時間呼叫它:
queueConnection.start();
Message m = queueReceiver.receive();
topicConnection.start();
Message m = topicSubscriber.receive(1000); // time out after a second
非同步consume message, 可以使用message listener。
a) Message Listeners
message listener是一個物件,用作充當messages的非同步事件處理器。它實現了MessageListener介面, 它只有一個方法:onMessage。 在onMessage方法內, 可以定義當收到一個message後做的事情。
用setMessageListener方法在某個QueueReceiver 或TopicSubscriber裡註冊message listener。例如:
TopicListener topicListener = new TopicListener();
topicSubscriber.setMessageListener(topicListener);
當註冊了message listener, 呼叫QueueConnection或TopicConnection的方法來開始傳送message。
當message開始傳送, 當有message送來,message consumer自動呼叫message listener的 onMessage方法。onMessage方法只有一個Message型別的引數。
message listener並不對應特定的destination型別. 相同的listener可以從queue或topic上得到message, 這取決於listener是由QueueReceiver還是 TopicSubscriber物件設定的。然而message listener通常對應某一個message型別或格式, 如果要回應messages, message listener必須建立一個message producer。
onMessage方法應該處理所有的exceptions。
Session負責管理message listeners的執行順序。任何時候,只有一個message listeners在執行。
b) Message Selectors
如果你的訊息應用程式需要過濾收到的messages, 可以用JMS API中的message selector來讓message consumer定義它所感興趣的messages。Message selectors負責過濾到JMS provider的message,而不是到應用程式的。
message selector是一個含有表示式的字串。表示式的語法是SQL92 conditional expression syntax的一個子集。當建立message consumer時, createReceiver, createSubscriber, 和createDurableSubscriber方法都可以定義某個message selector作為引數。
message consumer只接收headers和properties與selector匹配的messages。message selector不能根據message body的內容進行選擇。
3.6 Message—訊息組成
JMS 訊息由以下幾部分組成:訊息頭,屬性,訊息體
訊息頭(Header) - 訊息頭包含訊息的識別資訊和路由資訊,訊息頭包含一些標準的屬性如:JMSDestination,JMSMessageID 等。
訊息頭 由誰設定
JMSDestination send 或 publish 方法
JMSDeliveryMode send 或 publish 方法
JMSExpiration send 或 publish 方法
JMSPriority send 或 publish 方法
JMSMessageID send 或 publish 方法
JMSTimestamp send 或 publish 方法
JMSCorrelationID 客戶
JMSReplyTo 客戶
JMSType 客戶
JMSRedelivered JMS Provider
屬性(Properties) - 除了訊息頭中定義好的標準屬性外,JMS 提供一種機制增加新屬性到訊息頭中,這種新屬性包含以下幾種:
1. 應用需要用到的屬性;
2. 訊息頭中原有的一些可選屬性;
3. JMS Provider 需要用到的屬性。
標準的JMS 訊息頭包含以下屬性:
JMSDestination --訊息傳送的目的地
JMSDeliveryMode --傳遞模式, 有兩種模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示該訊息一定要被送到目的地,否則會導致應用錯誤。NON_PERSISTENT 表示偶然丟失該訊息是被允許的,這兩種模式使開發者可以在訊息傳遞的可靠性和吞吐量之間找到平衡點。
JMSMessageID 唯一識別每個訊息的標識,由JMS Provider 產生。
JMSTimestamp 一個訊息被提交給JMS Provider 到訊息被髮出的時間。
JMSCorrelationID 用來連線到另外一個訊息,典型的應用是在回覆訊息中連線到原訊息。
JMSReplyTo 提供本訊息回覆訊息的目的地址。
JMSRedelivered 如果一個客戶端收到一個設定了JMSRedelivered 屬性的訊息,則表示可能該客戶端曾經在早些時候收到過該訊息,但並沒有簽收(acknowledged)。
JMSType 訊息型別的識別符。
JMSExpiration 訊息過期時間,等於QueueSender 的send 方法中的timeToLive 值或TopicPublisher 的publish 方法中的timeToLive 值加上傳送時刻的GMT 時間值。如果timeToLive值等於零,則JMSExpiration 被設為零,表示該訊息永不過期。如果傳送後,在訊息過期時間之後訊息還沒有被髮送到目的地,則該訊息被清除。
JMSPriority 訊息優先順序,從0-9 十個級別,0-4 是普通訊息,5-9 是加急訊息。JMS 不要求JMS Provider 嚴格按照這十個優先順序傳送訊息,但必須保證加急訊息要先於普通訊息到達。
訊息體(Body) - JMS API 定義了5種訊息體格式,也叫訊息型別,你可以使用不同形式傳送接收資料並可以相容現有的訊息格式,下面描述這5種類型:
訊息型別 訊息體
TextMessage java.lang.String物件,如xml檔案內容
MapMessage 名/值對的集合,名是String物件,值型別可以是Java任何基本型別
BytesMessage 位元組流
StreamMessage Java中的輸入輸出流
ObjectMessage Java中的可序列化物件
Message 沒有訊息體,只有訊息頭和屬性。
JMS API為每一種messages都提供了一種create方法。例如:
TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String
queueSender.send(message);
在使用者一端, 必須將收到的Message 按照適當的message型別處理。 例如:
Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}
3.7 Exception Handling
JMS API的方法的Exception的根類是JMSException。JMSException類包括如下子類:
IllegalStateException
InvalidClientIDException
InvalidDestinationException
InvalidSelectorException
JMSSecurityException
MessageEOFException
MessageFormatException
MessageNotReadableException
MessageNotWriteableException
ResourceAllocationException
TransactionInProgressException
TransactionRolledBackException
4 JORAM產品介紹
JORAM是法國電信研究院支援的objectweb.org的開源JMS訊息中介軟體產品。objectweb是一個非常活躍的開源中介軟體團體。JORAM支援sun規範,實現JMS1.1。
4.1 Joram4.3包括
一個Message Server(訊息伺服器),提供Java的訊息服務功能。
一個JNDI Server。
一個客戶端類,可以訪問Message Server上的訊息。
一個圖形化的管理介面,配置管理Joram平臺。
4.2 系統要求
硬體要求:
2000年後的intel架構pc機
256M RAM 5G硬碟
支援TCP/IP的網路硬體裝置
軟體要求:
作業系統:Linux, Windows 2000 and XP, 等
網路協議:TCP/IP
Java執行環境:JDK1.4
4.3 Joram產品包目錄結構
doc/ ——Joram產品幫助文件
samples/ ——Joram產品使用舉例
bin/... ——Joram例子執行程式
config/... ——Joram例子配置檔案
class/... ——Joram例子class檔案
run/... ——Joram例子執行目錄
src/ ——Joram產品例子原始碼
joram/... ——J2EE環境的Joram原始碼
kjoram/… ——J2ME環境的Joram原始碼
ship/lib/... ——Joram所用的lib包
ship/licenses/... ——Joram的許可證
5 JORAM Classic Samples使用說明
Joram classic samples使用簡單的配置建立一個Message Server,釋出一個Queue和一個Topic,並允許匿名使用者訪問。其中包口:一個訊息傳送的例子、一個訊息釋出的例子、一個標準的訊息生產者的例 子、一個訊息接收的例子、一個訊息訂閱的例子、一個標準的訊息消費者的例子、一個瀏覽Queue的例子。Joram平臺執行在穩定模式。
可以利用ant工具直接執行Joram的上述例子
起動Joram訊息Server:
ant single_server
也可以用命令列啟動Joram Server,進入$JoramHome/samples/bin/目錄,執行single_server.bat(win)或single_server.sh(unix)。以下操作均有類似的命令列工具。
建立Queue和Topic:
ant classic_admin
執行Sender傳送訊息例子:
ant sender
執行Receiver接受訊息例子:
ant receiver
執行Browser 瀏覽Queue例子:
ant browser
執行訊息訂約的例子:
ant subscriber
執行訊息釋出的例子:
ant publisher
執行訊息消費的例子:
ant consumer
執行訊息生產的例子:
ant producer
以上操作也可以利用Eclipse3.1整合開發工具,將Joram例子直接匯入成Java工程,然後利用例子中的build.xml檔案執行。如圖:
右擊build.xml檔案選擇Run As – Ant Build...
彈出視窗如下:每個操作選項後面都有英文的註釋資訊,說明操作的內容,可以按照前面執行例子的順序,依次執行即可。
Joram Server也可以通過命令列啟動:
進入Joram產品的$JORAM_HOME/samples/bin目錄,有下列批處理命令,其中.bat檔案是windows環境下的命令,.sh檔案是unix環境下的命令。
admin.bat
--啟動Joram圖形控制檯介面
admin.sh
classadmin.bat --在啟動的Joram Server上建立去queue和topic
clean.bat --清理Joram的執行目錄
clean.sh
jmsclient.bat --啟動一個JMS客戶端
jmsclient.sh
receive.bat --啟動一個接收訊息程序
send.bat --啟動一個傳送訊息程序
single_server.bat --啟動Joram Server
single_server.sh
sslsingle_server.bat
sslsingle_server.sh
server.sh
6 JORAM使用程式碼舉例
6.1 建立訊息載體Queue和Topic並繫結Jndi
--用root使用者連線Joram Server
AdminModule.connect("root", "root", 60);
--建立Queue和Topic
Queue queue = (Queue) Queue.create("queue");
Topic topic = (Topic) Topic.create("topic");
--建立匿名訪問使用者
User user = User.create("anonymous", "anonymous");
--設定Queue和Topic為可讀、可寫
queue.setFreeReading();
topic.setFreeReading();
queue.setFreeWriting();
topic.setFreeWriting();
--建立連線工廠
javax.jms.ConnectionFactory cf =
TcpConnectionFactory.create("localhost", 16010);
javax.jms.QueueConnectionFactory qcf =
QueueTcpConnectionFactory.create("localhost", 16010);
javax.jms.TopicConnectionFactory tcf =
TopicTcpConnectionFactory.create("localhost", 16010);
--邦定Jndi
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
jndiCtx.bind("cf", cf);
jndiCtx.bind("qcf", qcf);
jndiCtx.bind("tcf", tcf);
jndiCtx.bind("queue", queue);
jndiCtx.bind("topic", topic);
jndiCtx.close();
--關閉連線
AdminModule.disconnect();
另一種建立Queue和Topic的方法:
可以將Queue和Topic的具體資訊寫入一個joramAdmin.XML配置檔案,然後在程式中讀取這個XML配置檔案即可建立。舉例如下:
joramAdmin.XML:
==============================================
<?xml version="1.0"?>
<JoramAdmin>
<AdminModule>
<connect host="localhost"
port="16010"
name="root"
password="root"/>
</AdminModule>
<InitialContext>
<property name="java.naming.factory.initial"
value="fr.dyade.aaa.jndi2.client.NamingContextFactory"/>
<property name="java.naming.factory.host" value="localhost"/>
<property name="java.naming.factory.port" value="16400"/>
</InitialContext>
<ConnectionFactory className="org.objectweb.joram.client.jms.tcp.TcpConnectionFactory">
<tcp host="localhost"
port="16010"/>
<jndi name="cf"/>
</ConnectionFactory>
<ConnectionFactory className="org.objectweb.joram.client.jms.tcp.TopicTcpConnectionFactory">
<tcp host="localhost"
port="16010"/>
<jndi name="tcf"/>
</ConnectionFactory>
<ConnectionFactory className="org.objectweb.joram.client.jms.tcp.QueueTcpConnectionFactory">
<tcp host="localhost"
port="16010"/>
<jndi name="qcf"/>
</ConnectionFactory>
<User name="anonymous"
password="anonymous"/>
<Queue name="queue">
<freeReader/>
<freeWriter/>
<jndi name="queue"/>
</Queue>
<Topic name="topic">
<freeReader/>
<freeWriter/>
<jndi name="topic"/>
</Topic>
</JoramAdmin>
==============================================
程式碼舉例:***代表配置檔案的具體路徑
AdminModule.connect("root", "root", 60);
AdminModule.executeXMLAdmin("***","joramAdmin.xml");
AdminModule.disconnect();
6.2 傳送訊息
--初始化上下文空間
ictx = new InitialContext();
--例項化Queue物件
Queue queue = (Queue) ictx.lookup("queue");
--通過Jndi邦定建立好的Queue建立連線工廠
QueueConnectionFactory qcf = (QueueConnectionFactory) ictx
.lookup("qcf");
ictx.close();
--建立Queue連線
QueueConnection qc = qcf.createQueueConnection();
--建立Queue會話
QueueSession qs = qc.createQueueSession(true, 0);
注意:此處建立的Session使用了事務模式,因此在執行具體的傳送訊息程式碼即qsend.send()方法後必須要提交Session事務,即:qs.commit(),否則訊息不會發送。其他舉例程式碼類似,不再逐一複述。
--建立QueueSender
QueueSender qsend = qs.createSender(queue);
--建立文字訊息
TextMessage msg = qs.createTextMessage();
int i;
for (i = 0; i < 10; i++) {
--設定訊息具體內容
msg.setText("Test number " + i);
--傳送的訊息可以設定具體的業務種類,或其他的型別標誌:
msg.setIntProperty("ywlx",i);
msg.setIntProperty("****",*)
--傳送訊息
qsend.send(msg);
}
--提交Queue會話
qs.commit();
System.out.println(i + " messages sent.");
--關閉Queue連線
qc.close();
訊息的主體也可以是Java物件:
--建立物件訊息
ObjectMessage omsg = qs.createObjectMessage();
omsg.clearBody();
--例項化並傳送的Java物件
TestObj to = new TestObj();
omsg.setObject(to);
qsend.send(omsg);
qs.commit();
qc.close();
6.3 接收訊息
--初始化上下文空間
ictx = new InitialContext();
--例項化Queue物件
Queue queue = (Queue) ictx.lookup("queue");
--通過Jndi邦定建立好的Queue建立連線工廠
QueueConnectionFactory qcf = (QueueConnectionFactory) ictx
.lookup("qcf");
ictx.close();
--建立Queue連線
QueueConnection qc = qcf.createQueueConnection();
--建立Queue會話
QueueSession qs = qc.createQueueSession(true, 0);
--建立QueueReceiver例項
QueueReceiver qrec = qs.createReceiver(queue);
--建立訊息接收體
Message msg;
TestObj to;
--起動Queue連線
qc.start();
int i;
for (i = 0; i < 10; i++) {
--接收訊息
msg = qrec.receive();
--接收文字訊息
if (msg instanceof TextMessage)
System.out.println("Msg received: "
+ ((TextMessage) msg).getText());
--接收Java物件訊息
else if (msg instanceof ObjectMessage) {
to = (TestObj) ((ObjectMessage) msg).getObject();
to.TestFunc();
} else
System.out.println("Msg received: " + msg);
}
--提交Queue會話
qs.commit();
System.out.println();
System.out.println(i + " messages received.");
--關閉Queue連線
qc.close();
接收指定業務種類的訊息:
--指定接收訊息的具體型別
此處用到了Message Selector來建立QueueReceiver物件,詳細介紹請參看前面的介紹。
String strMsgSelect = "ywlx=0";
QueueConnection qc = (QueueConnection) qcf.createQueueConnection();
QueueSession qs = (QueueSession) qc.createQueueSession(true, 0);
--用MesageSelecter建立QueueReceiver物件
QueueReceiver qrec = (QueueReceiver) qs.createReceiver(queue,
strMsgSelect);
--接收指定的訊息;
msg = (Message) qrec.receive();
6.4 釋出訊息
--初始化上下文空間
ictx = new InitialContext();
--例項化Topic物件
Topic topic = (Topic) ictx.lookup("topic");
--通過Jndi邦定建立好的Topic建立連線工廠
TopicConnectionFactory tcf = (TopicConnectionFactory) ictx.lookup("tcf");
ictx.close();
--建立Topic連線
TopicConnection tc = tcf.createTopicConnection();
--建立Topic會話
TopicSession ts = tc.createTopicSession(true, 0);
--建立訊息釋出例項
TopicPublisher tpub = ts.createPublisher(topic);
--建立文字訊息
TextMessage msg = ts.createTextMessage();
int i;
for (i = 0; i < 10; i++) {
msg.setText("Test number " + i);
--釋出訊息
tpub.publish(msg);
}
--提交Topic會話
ts.commit();
System.out.println(i + " messages published.");
--關閉Topic連線
tc.close();
6.5 訂閱訊息
--初始化上下文空間
ictx = new InitialContext();
--例項化Topic物件
Topic topic = (Topic) ictx.lookup("topic");
--通過Jndi邦定建立好的Topic建立連線工廠
TopicConnectionFactory tcf = (TopicConnectionFactory) ictx.lookup("tcf");
ictx.close();
--建立Topic連線
TopicConnection tc = tcf.createTopicConnection();
--建立Topic會話
TopicSession ts = tc.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
--建立訊息訂閱例項
TopicSubscriber tsub = ts.createSubscriber(topic);
--訂閱訊息
tsub.setMessageListener(new MsgListener());
--起動Topic連線
tc.start();
System.in.read();
--關閉Topoc連線
tc.close();
其中MsgListener類是一個實現了MessageListener介面的具體類,它實現了onMessage方法,具體處理監聽到的訊息。監聽啟動後,一旦佇列中出現了被監聽的訊息,系統會自動呼叫MsgListener的onMessage方法去處理訊息。
MsgListener類:
public class MsgListener implements MessageListener{
String ident = null;
public MsgListener(){}
public MsgListener(String ident){
this.ident = ident;
}
public void onMessage(Message msg){
try {--根據不同的訊息型別接收訊息
if (msg instanceof TextMessage) {
if (ident == null)
System.out.println(((TextMessage) msg).getText());
else
System.out.println(ident + ": " + ((TextMessage) msg).getText());
}
else if (msg instanceof ObjectMessage) {
if (ident == null)
System.out.println(((ObjectMessage) msg).getObject());
else
System.out.println(ident + ": " + ((ObjectMessage) msg).getObject());
}
}catch (JMSException jE) {
System.err.println("Exception in listener: " + jE);
}
}
}
6.6 產生訊息
--初始化上下文空間
ictx = new InitialContext();
--例項化Queue物件
Queue queue = (Queue) ictx.lookup("queue");
--例項化Topic物件
Topic topic = (Topic) ictx.lookup("topic");
--通過Jndi邦定建立連線工廠
ConnectionFactory cf = (ConnectionFactory) ictx.lookup("cf");
ictx.close();
--建立連線
Connection cnx = cf.createConnection();
--建立會話
Session sess = cnx.createSession(true, 0);
--建立產生訊息例項
MessageProducer producer = sess.createProducer(null);
--建立文字訊息
TextMessage msg = sess.createTextMessage();
int i;
for (i = 0; i < 10; i++) {
msg.setText("Test number " + i);
--在Queue上產生訊息
producer.send(queue, msg);
--在Topic上產生訊息
producer.send(topic, msg);
}
--提交會話
sess.commit();
System.out.println(i + " messages sent.");
--關閉連線
cnx.close();
6.7 消費訊息
--初始化上下文空間
ictx = new InitialContext();
--例項化Queue物件
Queue queue = (Queue) ictx.lookup("queue");
--例項化Topic物件
Topic topic = (Topic) ictx.lookup("topic");
--通過Jndi邦定建立連線工廠
ConnectionFactory cf = (ConnectionFactory) ictx.lookup("cf");
ictx.close();
--建立連線
Connection cnx = cf.createConnection();
--建立會話
Session sess = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
--建立消費訊息例項
MessageConsumer recv = sess.createConsumer(queue);
MessageConsumer subs = sess.createConsumer(topic);
--訂閱Queue和Topic上的訊息
recv.setMessageListener(new MsgListener("Queue listener"));
subs.setMessageListener(new MsgListener("Topic listener"));
--起動連線
cnx.start();
System.in.read();
cnx.close();
System.out.println();
System.out.println("Consumer closed.");
6.8 對Queue的清理操作
--建立連線
AdminModule.connect("root", "root", 60);
--讀取server.properties具體配置資訊
ictx = new InitialContext(JMSConstant.getProp());
Queue queue = (Queue) ictx.lookup("queue");
--清理Queue
queue.clear();
6.9 Connectiion的注意事項:
在建立jms客戶端的時候.有一個問題值得注意.在一個jms運用當中,至少需要兩個客戶端(訊息傳送端和訊息接手端).每個客戶端都需通過jndi去找連線工廠.並且建立一個連線。
訊息傳送端:
………………………………………………………………………………………….
ConnectionFactory cf = (ConnectionFactory) ictx.lookup("cf");
Connection cn = cf.createConnection();
……………………………………………………………………………………………...
訊息接收端
…………………………………………………………………………………………...
ConnectionFactory cf = (ConnectionFactory) ictx.lookup("cf");
Connection cn = cf.createConnection();
…………………………………………………………………………………………..
兩個客戶端都享有一個連線工廠,而且很可能佔有同一個連線,如果當接收端需要呼叫cn.close()關閉連線.cn.start ();開啟連線,或者傳送段cn.close()關閉連線.cn.start ();就會發生不可預料的情況.
要區別連線是很必要的.所以我們建立2個User.分別如下:
//User user= User.create(String username,PassWord password,Int serverID);
User user1=User.create(“user1”,”user1”,0);
User user2=User.create(“user2”,”user2”,0);
然後根據不同的使用者,來建立一個不同的連線.
訊息傳送端
…………………………………………………………………………………….
ConnectionFactory cf = (ConnectionFactory) ictx.lookup("cf");
Connection cn = cf.createConnection(“user1”,”user1”);
………………………………………………………………...
訊息接收端
………………………………………………………………….
ConnectionFactory cf= (ConnectionFactory) ictx.lookup(“cf”);
Connection cn = cf.createConnection(“user2”,”user2”);
6.10設定訊息監聽
在訊息的接收客戶端(包括接收者和訂閱者以及訊息消費者)都可以設定訊息監聽,當訊息監聽監聽到訊息目的地有訊息,就接收訊息,注意監聽和接收在程式級別上是不可分隔的。JMS提供了一個訊息監聽介面,只需實現訊息監聽的onMessage(Message Msg)方法。
程式碼如下:
建立一個訊息監聽實現類:
Public class MsgListener implments MessageListener{
Public void onMessage(Message Msg){
…………….//實現程式碼,處理資料。
}
};
P2p:receiver.setMessageListener(new MsgListener());
p/s: subscriber.setMessageListener(new MsgListener());
注意:一個訊息接收端設定一個監聽,如果共用一個監聽,可能會出現資源同步佔用。
onmessage由於
6.11訊息接收的過期
在訊息接收當中,可以在receiver(Long TimeOut)設定接收訊息的過期時間,如果在取訊息的時候沒有符合過濾器相對應的訊息,那麼在時間過期後返回一個null值,我們可以對這個null做判斷,來關閉連線.這個方法可用於各種訊息模式中.
注意:receive()這個動作是一次性的,一個recevie()動作只接收一個訊息.訊息的單位數也取決與send()這個動作的次數.既是,send()動作一次,那麼所傳送的訊息個數為一.
程式碼如下:
Session se =new Session();
Receiver re= se.create(Queue queue,String messageselector );
Message msg;
msg = qrec.receive(3000);;//設定訊息接收過期時間3秒.
………………………//訊息處理
Msg.getText();//如果訊息在過期時間內沒有接受訊息,那麼返回一個null值.
如果在接收的時候,不需要等待,那麼可以呼叫recevieNoWait();那麼如果在queue沒有相應的訊息,立即返回一個null.
程式碼如下:
Session se =new Session();
Receiver re= se.create(Queue queue,String messageselector );
Message msg;
msg = qrec.receiveNoWait();;// 接收訊息不等待.
………………………//訊息處理
Msg.getText();//如果訊息在過期時間內沒有接受訊息,那麼返回一個null值.
6.12.多執行緒的訊息傳送
程式碼說明:首先要在主類裡建立一個內部類繼承thread的run方法,以實現多執行緒的傳送,在主類裡構建一個建構函式,把一些傳送訊息的預備工作初始化,從而讓多執行緒只處理髮送的動作,節省每次執行緒初始傳送的預備動作,來達到快速的傳送訊息.
這個示例是建立一個訊息傳送器傳送兩個分別設定不同屬性的訊息構造器
public class Sendert {
private static transient final Log log = LogFactory.getLog(Sendert.class);
QueueConnection queueConnection = null;
QueueSession queueSession = null;
QueueSender queueSender = null;
final String MSG_TEXT1 =
new String("Here is a client-acknowledge message 4tetrtretertert4t45ewrwerwrgergrt4rqwerew5t479347597505890 34535&%^*&$&^$^%$&%&*)_*)&^*%*^$&$^(*&(*&(*%*%&^%*%^%*&^*&^&*^*&^*&^");
final String MSG_TEXT2 =
new String("Here is a Message 327893475984 ryeirrfr9urtyureyt&&&&&&&)))__^&&&))gerg)%%^%^%^%&(*(*(*)*(**)(*)*)*)T^HJCJBDKJFBKJJDOAIWEU(Q�(&E(*�Q*E&QEUODIJEDLJLHDKGDUWTE&QE");
TextMessage message1 = null;
TextMessage message2 = null;
static long time1;
static long time4;
public Sendert() throws JMSException, NamingException {
//javax.naming.Context ictx = new InitialContext(JMSConstant.getProp());
Properties pr =new Properties();
pr.put("82.0.176.214","16400");
javax.naming.Context ictx = new InitialContext(pr);
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) ictx.lookup("qcf");
queueConnection = queueConnectionFactory.createQueueConnection("user1","user1");
queueSession = queueConnection.createQueueSession(false,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue) ictx.lookup("queue");
ictx.close();
queueSender = queueSession.createSender(queue);//建立訊息傳送器
message1 = queueSession.createTextMessage();//建立訊息生成器
message2 = queueSession.createTextMessage();//建立訊息生成器
message1.setIntProperty("sessionID", 1);//設定訊息屬性.以便訊息接收的過濾
message2.setIntProperty("sessionID", 2);//設定訊息屬性, 以便訊息接收的過濾
}
class AsynSender extends Thread {
public void run () {
try {
for(int i=0;i<100;i++){
message1.setText("i"+MSG_TEXT1);//設定訊息
message2.setText("i"+MSG_TEXT2);//設定訊息
System.out.println(" message1: " + message1.getText());
System.out.println(" message2: " + message2.getText());
queueSender.send(message1);//傳送訊息
queueSender.send(message2);//傳送訊息
}
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
}
}
}
// finally {
// if(queueSender!=null){
// try{queueSender.close();
//
// }catch(JMSException e){}
// if(queueSession!=null)
// {
// try{
// queueSession.commit();
// }catch(JMSException e){}
// }
// if (queueConnection != null)
// {
// try {
// queueConnection.close();
// } catch (JMSException e) {}
// }
// }
// }
public void run_threads() {
AsynSender[] tt = new AsynSender[100];
for (int i = 0; i < tt.length; i++)
{
tt[i] = new AsynSender();
tt[i].start();
}
for (int i = 0; i < tt.length; i++) {
System.out.println("go1");
try {
tt[i].join();//讓子執行緒傳送訊息完畢後,才執行主控執行緒,關閉傳送器,連線,會話物件
} catch (InterruptedException je)
{
Je.printstacktrace();}
}
}
void close() throws JMSException {
queueSender.close();
queueSession.close();
queueConnection.close();
}
public static void main(String[] args) throws JMSException, NamingException, IOException {
System.out.println(" sender: Created client-acknowledge session");
Sendert se = new Sendert();
se.run_threads();
se.close();//傳送訊息完畢關閉訊息傳送器,會話,連線物件.
}
}
6.13 joram遠端部署.
Joarm的遠端部署,即jms伺服器和jms客戶端不在同一ip主機上,首先我們選定一伺服器主機名是KBF_BCR, ip地址是82.0.176.214.在這臺主機上安裝joram,配置好伺服器的a3server.xml.檔案如下:
<?xml version="1.0"?>
<config>
<property name="Transaction" value="fr.dyade.aaa.util.NullTransaction"/>
<server id="0" name="S0" hostname="KBF_BCR">
<service class="org.objectweb.joram.mom.proxies.ConnectionManager"
args="root root"/>
<service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService"
args="16010"/>
<service class="fr.dyade.aaa.jndi2.server.JndiServer" args="16400"/>
</server>
</config>
啟動伺服器.就ok了!
二:接下來是在選定另一臺主機做為jms的客戶端:我們執行如下程式碼,在伺服器下建立queue,topic,user,和jndi對它們的綁頂。程式碼如下:
public class Amdinx {
public static void main(String args[])throws Exception{
System.out.println("begin the admin test!");
//connecting to JORAM server
AdminModule.connect("82.0.176.214",16010,"root", "root", 60);
// AdminModule連線伺服器,引數為伺服器ip地址,連線工廠的埠,登陸使用者名稱,密碼和每
//秒嘗試連線的次數.
Queue queue = (Queue) Queue.create(0);//引數0為伺服器id,與a3server.xml對應.
User user1 = User.create("user1","user1",0); //引數0為伺服器id,與a3server.xml對應.
User user2 = User.create("user2","user2",0); //引數0為伺服器id,與a3server.xml對應.
User user3 = User.create("user3","user3",0); //引數0為伺服器id,與a3server.xml對應.
queue.setFreeReading(user1);//只讓user1讀
queue.setFreeWriting(user1);//只讓user1寫
javax.jms.QueueConnectionFactory qcf =
QueueTcpConnectionFactory.create("82.0.176.214", 16010);//伺服器的連線工廠,引數為伺服器ip,和連線工廠的埠號.
Properties pr =new Properties();
pr.put("82.0.176.214","16400");
javax.naming.Context jndiCtx = new InitialContext(pr);
// javax.naming.Context jndiCtx = new javax.naming.InitialContext(JMSConstant.getProp());
jndiCtx.bind("qcf", qcf);
jndiCtx.bind("queue", queue);
jndiCtx.close();//關閉上下文目錄
AdminModule.disconnect();//關閉AdminModule連線
System.out.println("close the admin!");
}
}
這樣就執行程式碼後,就在伺服器上建立了使用者,目的地,和對它們的jndi繫結.
三:然後執行多執行緒的傳送訊息的程式碼:(見6.12多執行緒的訊息傳送)
四:多執行緒的接收訊息,程式碼如下.
public class Receiver1
{
QueueReceiver qrec1=null;
QueueSession qs =null;
QueueConnection qc=null;
public Receiver1() throws NamingException, JMSException{
//ictx = new InitialContext(JMSConstant.getProp());
Properties pr =new Properties();
pr.put("82.0.176.214","16400");
Context ictx = new InitialContext(pr);
Queue queue = (Queue) ictx.lookup("queue");
QueueConnectionFactory qcf = (QueueConnectionFactory) ictx.lookup("qcf");
ictx.close();
qc = qcf.createQueueConnection("user1","user1");//建立只屬於user1的連線,以區別其他連線.
qs = qc.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
qrec1 = qs.createReceiver(queue,"sessionID=2");
}
class Brun extends Thread{
public void run(){
try{
qrec1.setMessageListener(new MsgListener1());
qc.start();//把連線開始應該放在