1. 程式人生 > >【EJB學習筆記】——JMS和訊息驅動Bean

【EJB學習筆記】——JMS和訊息驅動Bean


  認識訊息驅動Bean之前,先了解一下JMS。


JMS

  JMS(Java Message Service):java訊息服務,客戶端與服務端之間可以通過JSM服務進行訊息的非同步傳輸(訊息的傳送和訊息的接收不是同時進行的,即傳送了訊息後,不需要等待訊息的返回就可以繼續執行),客戶端只管傳送,不需要考慮服務端什麼時候處理。因此,如果客戶端與服務端對訊息傳送和接收對時間相關不是很嚴格的話,用JMS可以很大程度上提高效能。

  JMS支援兩種訊息模型:Point-to-Point(P2P)和Publish/Subscribe(Pub/Sub)。

  點對點模型(P2P)

這裡寫圖片描述

  生產者(傳送者)非同步把訊息傳送到佇列,消費者(接受者)從佇列中獲取訊息。訊息在被消費或超時之前,始終保持在訊息佇列中。

  特點:
  1、生產者和消費者之間沒有時間依賴性,無論消費者是否收到訊息,都不影響生產者傳送訊息;
  2、消費者收到訊息後需要向佇列反饋;
  3、適用於每條訊息都需要被消費者消費的場景。



  釋出/訂閱模型(Pub/Sub)

這裡寫圖片描述

  與P2P不同的是,一個生產者把訊息釋出後,這些訊息可以傳送給多個消費者。

  特點:每條訊息可以有多個消費者。


訊息驅動Bean(以下簡稱MDB)

  在上面的JMS介紹中瞭解了非同步訊息,訊息驅動Bean可以看做是非同步訊息的消費者。

  實現訊息驅動Bean,需要在JBoss的安裝目錄(jboss-5.0.1.GA\server\default\deploy)下新增一個配置檔案:

  xxx-service.xml

<?xml version="1.0" encoding="UTF-8" ?> 
<server> 
   <!-- Queue,name:Queue的名稱 -->
   <mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.org.destination:server=Queue,name=myqueue" > 
     <!-- JNDI名稱 -->
     <attribute name="JNDIName">queue/myqueue</attribute
>
<depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> </mbean> <!-- Topic,name:Topic的名稱--> <mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.org.destination:server=Topic,name=mytopic" > <!-- JNDI名稱 --> <attribute name="JNDIName">topic/mytopic</attribute> <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends> </mbean> </server>


  實現P2P模式的訊息驅動Bean

  服務端

  MyQueueMDBBean.java

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(
    activationConfig={
            @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
            @ActivationConfigProperty(propertyName="destination",propertyValue="queue/myqueue")
    }
)
public class MyQueueMDBBean implements MessageListener{
    static int i=0;
    @Override
    public void onMessage(Message msg) {

        TextMessage textMessage=(TextMessage)msg;
        try {
            System.out.println("【MyQueueMDBBean】訊息"+(i++)+"被接收了。TextMessage:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

  對以上程式碼的說明:

  用@MessageDriven註解來定義訊息驅動Bean,如果檢視EJB的原始碼會發現,MessageDriven中有一個數組型別的變數activationConfig:

ActivationConfigProperty[] activationConfig() default {}; 

  所以這裡需要為activationConfig賦值:

activationConfig={
            @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Queue"),
            @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
    }

  destinationType屬性值為javax.jms.Topic說明此MDB實現的是P2P模式的訊息服務;destination屬性值為topic/mytopic表示此MDB的訊息來源,也表示生產者的傳送訊息的目的地,jndi地址為topic/mytopic,這個可以在xxx-service.xml中自定義。


  客戶端

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;

public class MyQueueMDBBeanClient {
     public static void main(String[] args) throws Exception {
        InitialContext context=new InitialContext();

        //獲取QueueConnectionFactory
        QueueConnectionFactory factory=(QueueConnectionFactory)context.lookup("ConnectionFactory");

        //建立QueueConnection物件
        QueueConnection connection=factory.createQueueConnection();

        //建立QueueSession物件,第一個引數表示事務自動提交,第二個引數表示一旦訊息被正確送達,將自動發回響應
        QueueSession session=connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

        //獲取Destination物件
        Queue queue=(Queue)context.lookup("queue/myqueue");

        //建立文字訊息
        TextMessage msg=session.createTextMessage("Hello world!");

        //建立傳送者
        QueueSender sender=session.createSender(queue);

        //傳送資訊
        for(int i=0;i<10;i++){
            sender.send(msg);
            System.out.println("訊息"+i+"已經發送");
        }

        //關閉會話
        session.close();
        connection.close();
    }
}


  客戶端執行結果

訊息0已經發送
訊息1已經發送
訊息2已經發送
訊息3已經發送
訊息4已經發送
訊息5已經發送
訊息6已經發送
訊息7已經發送
訊息8已經發送
訊息9已經發送

  EJB執行結果

13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息4被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息6被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息8被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息9被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息2被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息7被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息5被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息0被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息1被接收了。TextMessage:Hello world!
13:39:30,045 INFO  [STDOUT]MyQueueMDBBean】訊息3被接收了。TextMessage:Hello world!

  從結果可以看出,傳送訊息的時候是有序的,但是MDB接收訊息不一定是有序的。


  實現Pub/Sub模式的訊息驅動Bean

  服務端

  MyTopicMDBBean1.java

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(
    activationConfig={
            @ActivationConfigProperty(propertyName="destinationType",propertyValue="javax.jms.Topic"),
            @ActivationConfigProperty(propertyName="destination",propertyValue="topic/mytopic")
    }
)
public class MyTopicMDBBean1 implements MessageListener{
    @Override
    public void onMessage(Message msg) {    
        TextMessage textMessage=(TextMessage)msg;
        try {
            System.out.println("【MyTopicMDBBean1】訊息被接收了。TextMessage:"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  MyTopicMDBBean2.java、MyTopicMDBBean3.java

  MyTopicMDBBean2.java、MyTopicMDBBean3.java的程式碼與MyTopicMDBBean1.java的實現一模一樣。

  客戶端

import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;

public class MyTopicMDBBeanClient {
     public static void main(String[] args) throws Exception {
            InitialContext context=new InitialContext();

            //獲取QueueConnectionFactory
            TopicConnectionFactory factory=(TopicConnectionFactory)context.lookup("ConnectionFactory");

            //建立QueueConnection物件
            TopicConnection connection=factory.createTopicConnection();

            //建立QueueSession物件,第一個引數表示事務自動提交,第二個引數表示一旦訊息被正確送達,將自動發回響應
            TopicSession session=connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

            //獲取Destination物件
            Topic topic=(Topic)context.lookup("topic/mytopic");

            //建立文字訊息
            TextMessage msg=session.createTextMessage("Hello world!");

            //建立釋出者
            TopicPublisher publisher=session.createPublisher(topic);

            //釋出資訊
            publisher.publish(msg);
            System.out.println("訊息已經發布");           

            //關閉會話
            session.close();    
            connection.close();
        }
}


  客戶端執行結果

訊息已經發布

  EJB執行結果

14:16:24,287 INFO  [STDOUT]MyTopicMDBBean01】訊息被接收了。TextMessage:Hello world!
14:16:24,287 INFO  [STDOUT]MyTopicMDBBean03】訊息被接收了。TextMessage:Hello world!
14:16:24,288 INFO  [STDOUT]MyTopicMDBBean02】訊息被接收了。TextMessage:Hello world!

  這種場景類似於,我新發表了一篇部落格,訂閱我部落格的人都會收到RSS推送。