【EJB學習筆記】——JMS和訊息驅動Bean
認識訊息驅動Bean之前,先了解一下JMS。
JMS
JMS(Java Message Service):java訊息服務,客戶端與服務端之間可以通過JSM服務進行訊息的非同步傳輸(訊息的傳送和訊息的接收不是同時進行的,即傳送了訊息後,不需要等待訊息的返回就可以繼續執行),客戶端只管傳送,不需要考慮服務端什麼時候處理。因此,如果客戶端與服務端對訊息傳送和接收對時間相關不是很嚴格的話,用JMS可以很大程度上提高效能。
JMS支援兩種訊息模型:Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub)。
點對點模型(P2P)
生產者(傳送者)非同步把訊息傳送到佇列,消費者(接受者)從佇列中獲取訊息。訊息在被消費或超時之前,始終保持在訊息佇列中。
特點:
1、生產者和消費者之間沒有時間依賴性,無論消費者是否收到訊息,都不影響生產者傳送訊息;
2、消費者收到訊息後需要向佇列反饋;
3、適用於每條訊息都需要被消費者消費的場景。
釋出/訂閱模型(Pub/Sub)
與P2P不同的是,一個生產者把訊息釋出後,這些訊息可以傳送給多個消費者。
特點:每條訊息可以有多個消費者。
訊息驅動Bean(以下簡稱MDB)
在上面的JMS介紹中瞭解了非同步訊息,訊息驅動Bean可以看做是非同步訊息的消費者。
實現訊息驅動Bean,需要在JBoss的安裝目錄(jboss-5.0.1.GA\server\default\deploy)下新增一個配置檔案:
xxx-service.xml
<?xml version="1.0" encoding="UTF-8" ?>
<server>
<!-- Queue,name:Queue的名稱 -->
<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.org.destination:server=Queue,name=myqueue" >
<!-- JNDI名稱 -->
<attribute name="JNDIName">queue/myqueue</attribute >
<depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
</mbean>
<!-- Topic,name:Topic的名稱-->
<mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.org.destination:server=Topic,name=mytopic" >
<!-- JNDI名稱 -->
<attribute name="JNDIName">topic/mytopic</attribute>
<depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>
</mbean>
</server>
實現P2P模式的訊息驅動Bean
服務端
MyQueueMDBBean.java
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@MessageDriven(
activationConfig={
@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination",propertyValue="queue/myqueue")
}
)
public class MyQueueMDBBean implements MessageListener{
static int i=0;
@Override
public void onMessage(Message msg) {
TextMessage textMessage=(TextMessage)msg;
try {
System.out.println("【MyQueueMDBBean】訊息"+(i++)+"被接收了。TextMessage:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
對以上程式碼的說明:
用@MessageDriven註解來定義訊息驅動Bean,如果檢視EJB的原始碼會發現,MessageDriven中有一個數組型別的變數activationConfig:
ActivationConfigProperty[] activationConfig() default {};
所以這裡需要為activationConfig賦值:
activationConfig={
@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
}
destinationType屬性值為javax.jms.Topic說明此MDB實現的是P2P模式的訊息服務;destination屬性值為topic/mytopic表示此MDB的訊息來源,也表示生產者的傳送訊息的目的地,jndi地址為topic/mytopic,這個可以在xxx-service.xml中自定義。
客戶端
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
public class MyQueueMDBBeanClient {
public static void main(String[] args) throws Exception {
InitialContext context=new InitialContext();
//獲取QueueConnectionFactory
QueueConnectionFactory factory=(QueueConnectionFactory)context.lookup("ConnectionFactory");
//建立QueueConnection物件
QueueConnection connection=factory.createQueueConnection();
//建立QueueSession物件,第一個引數表示事務自動提交,第二個引數表示一旦訊息被正確送達,將自動發回響應
QueueSession session=connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
//獲取Destination物件
Queue queue=(Queue)context.lookup("queue/myqueue");
//建立文字訊息
TextMessage msg=session.createTextMessage("Hello world!");
//建立傳送者
QueueSender sender=session.createSender(queue);
//傳送資訊
for(int i=0;i<10;i++){
sender.send(msg);
System.out.println("訊息"+i+"已經發送");
}
//關閉會話
session.close();
connection.close();
}
}
客戶端執行結果
訊息0已經發送
訊息1已經發送
訊息2已經發送
訊息3已經發送
訊息4已經發送
訊息5已經發送
訊息6已經發送
訊息7已經發送
訊息8已經發送
訊息9已經發送
EJB執行結果
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息4被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息6被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息8被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息9被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息2被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息7被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息5被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息0被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息1被接收了。TextMessage:Hello world!
13:39:30,045 INFO [STDOUT] 【MyQueueMDBBean】訊息3被接收了。TextMessage:Hello world!
從結果可以看出,傳送訊息的時候是有序的,但是MDB接收訊息不一定是有序的。
實現Pub/Sub模式的訊息驅動Bean
服務端
MyTopicMDBBean1.java
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@MessageDriven(
activationConfig={
@ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
@ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
}
)
public class MyTopicMDBBean1 implements MessageListener{
@Override
public void onMessage(Message msg) {
TextMessage textMessage=(TextMessage)msg;
try {
System.out.println("【MyTopicMDBBean1】訊息被接收了。TextMessage:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
MyTopicMDBBean2.java、MyTopicMDBBean3.java
MyTopicMDBBean2.java、MyTopicMDBBean3.java的程式碼與MyTopicMDBBean1.java的實現一模一樣。
客戶端
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
public class MyTopicMDBBeanClient {
public static void main(String[] args) throws Exception {
InitialContext context=new InitialContext();
//獲取QueueConnectionFactory
TopicConnectionFactory factory=(TopicConnectionFactory)context.lookup("ConnectionFactory");
//建立QueueConnection物件
TopicConnection connection=factory.createTopicConnection();
//建立QueueSession物件,第一個引數表示事務自動提交,第二個引數表示一旦訊息被正確送達,將自動發回響應
TopicSession session=connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
//獲取Destination物件
Topic topic=(Topic)context.lookup("topic/mytopic");
//建立文字訊息
TextMessage msg=session.createTextMessage("Hello world!");
//建立釋出者
TopicPublisher publisher=session.createPublisher(topic);
//釋出資訊
publisher.publish(msg);
System.out.println("訊息已經發布");
//關閉會話
session.close();
connection.close();
}
}
客戶端執行結果
訊息已經發布
EJB執行結果
14:16:24,287 INFO [STDOUT] 【MyTopicMDBBean01】訊息被接收了。TextMessage:Hello world!
14:16:24,287 INFO [STDOUT] 【MyTopicMDBBean03】訊息被接收了。TextMessage:Hello world!
14:16:24,288 INFO [STDOUT] 【MyTopicMDBBean02】訊息被接收了。TextMessage:Hello world!
這種場景類似於,我新發表了一篇部落格,訂閱我部落格的人都會收到RSS推送。