1. 程式人生 > >自己動手實現訊息佇列之JMS

自己動手實現訊息佇列之JMS

什麼是JMS?JMS的誕生史?

在JMS還沒有誕生前,每個企業都會有自己的一套內部訊息系統,比如專案組A需要呼叫到專案組B的系統,專案組B也有可能會呼叫到專案組C的系統。這樣每個公司都有自己的一套實現。很不規範,所以Apache基金會,為企業訊息產品專門定義了一套規範。我們可以把JMS當作是一系列介面及相關語義的集合,通過這些介面和語義定義了JSM客戶端如何去訪問訊息系統。簡單點來說就是JMS主要乾了兩件事,定義通用的訊息格式,和訊息傳遞的模式。

體系結構

JMS由以下元素組成。[1] JMS提供者 連接面向訊息中介軟體的,JMS介面的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向訊息中介軟體的介面卡。 JMS客戶 生產或消費基於訊息的Java的應用程式或物件。 JMS生產者 建立併發送訊息的JMS客戶。 JMS消費者 接收訊息的JMS客戶。 JMS訊息 包括可以在JMS客戶之間傳遞的資料的物件 JMS佇列 一個容納那些被髮送的等待閱讀的訊息的區域。與佇列名字所暗示的意思不同,訊息的接受順序並不一定要與訊息的傳送順序相同。一旦一個訊息被閱讀,該訊息將被從佇列中移走。 JMS主題 一種支援傳送訊息給多個訂閱者的機制。

物件模型

JMS物件模型包含如下幾個要素:[2] 1)連線工廠。連線工廠(ConnectionFactory)是由管理員建立,並繫結到JNDI樹中。客戶端使用JNDI查詢連線工廠,然後利用連線工廠建立一個JMS連線。 2)JMS連線。JMS連線(Connection)表示JMS客戶端和伺服器端之間的一個活動的連線,是由客戶端通過呼叫連線工廠的方法建立的。 3)JMS會話。JMS會話(Session)表示JMS客戶與JMS伺服器之間的會話狀態。JMS會話建立在JMS連線上,表示客戶與伺服器之間的一個會話執行緒。 4)JMS目的。JMS目的(Destination),又稱為訊息佇列
,是實際的訊息源。 5)JMS生產者和消費者。生產者(Message Producer)和消費者(Message Consumer)物件由Session物件建立,用於傳送和接收訊息。 6)JMS訊息通常有兩種型別: ① 點對點(Point-to-Point)。在點對點的訊息系統中,訊息分發給一個單獨的使用者。點對點訊息往往與佇列(javax.jms.Queue)相關聯。 ② 釋出/訂閱(Publish/Subscribe)。釋出/訂閱訊息系統支援一個事件驅動模型,訊息生產者和消費者都參與訊息的傳遞。生產者釋出事件,而使用者訂閱感興趣的事件,並使用事件。該型別訊息一般與特定的主題(javax.jms.Topic)關聯。 上面都是一些概念型的東西,有個大概就行了,真正想理解還得一邊寫程式碼一遍思考;

下面將簡單模擬一下一個基於JMS規範的訊息佇列:

定義JMS訊息

JMS 定義了5中訊息型別: TextMessage、MapMessage、BytesMessage、

StreamMessage和ObjectMessage。

  • TextMessage(文字訊息)

將資料作為簡單字串存放在主體中(XML就可以作為字串發)

  • MapMessage(對映訊息)

使用一張對映表來存放其主體內容(參照Jms API)

  • BytesMessage(位元組訊息)

將位元組流存放在訊息主體中。適合於下列情況:必須壓縮傳送的大量資料、需要與現有

訊息格式保持一致等(參照Jms API)

  • StreamMessage(流訊息)

用於處理原語型別。這裡也支援屬性欄位和MapMessage所支援的資料型別。使用這種

訊息格式時,收發雙方事先協商好欄位的順序,以保證寫讀順序相同(參照Jms API)

  • ObjectMessage(物件訊息)

用於往訊息中寫入可序列化的物件。

訊息中可以存放一個物件,如果要存放多個物件,需要建立一個物件集合,然後把這個

集合寫入訊息。

這裡簡單定義一下TextMessage(文字訊息)。在我們的程式中,傳送和接收訊息的都只是一個字串而已,所以定義很簡單。String就OK。

JMS佇列:

這裡使用我們Java的 LinkedList佇列集合去實現。

[java] view plain copy  print?在CODE上檢視程式碼片派生到我的程式碼片
  1. // 訊息佇列
  2. privatestatic LinkedList<String> jmsQueue=new LinkedList<String>();   
JMS客戶,生產者,消費者

在JMS裡面的客戶,並不是我們的消費者,這裡指的是基於訊息的Java的應用程式或者物件。這句話什麼意思啦,按照我的理解來說,就是我有一個程式用到了基於消費者產生的資料,那麼我就是消費者的客戶,因為我在使用消費者。這個很類似於我們的客戶端,比較簡單。

下面主要講講生產者和消費者:

這裡主要有兩種訊息訂閱模型,一種是點對點,即一個消費者和一個生產者之間進行傳輸訊息,一個是多對多,即會有多個消費者和生產者之間進行訊息傳輸。

Point-to-Point(P2P)
Publish/Subscribe(Pub/Sub)

  1. P2P

    1. P2P模式圖 
      這裡寫圖片描述
    2. 涉及到的概念 
      1. 訊息佇列(Queue)
      2. 傳送者(Sender)
      3. 接收者(Receiver)
      4. 每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
    3. P2P的特點

      1. 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
      2. 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
      3. 接收者在成功接收訊息之後需向佇列應答成功

      如果你希望傳送的每個訊息都應該被成功處理的話,那麼你需要P2P模式。

  1. Pub/Sub

    1. Pub/Sub模式圖 
      這裡寫圖片描述
    2. 涉及到的概念 
      1. 主題(Topic)
      2. 釋出者(Publisher)
      3. 訂閱者(Subscriber) 
        客戶端將訊息傳送到主題。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。
    3. Pub/Sub的特點

      1. 每個訊息可以有多個消費者
      2. 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息,而且為了消費訊息,訂閱者必須保持執行的狀態。
      3. 為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。

      如果你希望傳送的訊息可以不被做任何處理、或者被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型

下面建立一下這幾個模型:

首先建立一個消費者生產者模式的緩衝區來做我們的多執行緒訊息佇列管理,用來接收資料和傳輸資料;

[java] view plain copy  print?在CODE上檢視程式碼片派生到我的程式碼片
  1. /** 
  2.  * 實現緩衝區 
  3.  *  
  4.  * @author gh 
  5.  *  
  6.  */
  7. publicclass JmsBuffer {  
  8.     // 佇列 最大儲存量
  9.     privatefinalstaticint  MAX_SIZE = 100;  
  10.     // 訊息佇列
  11.     privatestatic LinkedList<String> jmsQueue=new LinkedList<String>();   
  12.     static JmsBuffer buffer;  
  13.      // 生產訊息
  14.     publicstaticvoid produce(String str)    
  15.     {    
  16.         // 同步程式碼段  
  17.         synchronized (jmsQueue)    
  18.         {    
  19.             // 如果倉庫剩餘容量不足  
  20.             while (jmsQueue.size()> MAX_SIZE)    
  21.             {    
  22.                 System.out.println("你要生產的訊息為" + "/t【庫存量】:"
  23.                         + jmsQueue.size() + "/t暫時不能執行生產任務!");    
  24.                 try
  25.                 {    
  26.                     // 由於條件不滿足,生產阻塞  
  27.                     jmsQueue.wait();    
  28.                 }    
  29.                 catch (InterruptedException e)    
  30.                 {    
  31.                     e.printStackTrace();    
  32.                 }    
  33.             }    
  34.             // 生產條件滿足情況下,生產訊息  
  35.                 jmsQueue.add(str);    
  36.            System.out.println("已經生產該訊息" + str + "/t【現倉儲量為】:" + jmsQueue.size());    
  37.             jmsQueue.notifyAll();    
  38.         }    
  39.     }    
  40.     // 消費訊息
  41.     publicstatic String consume()    
  42.     {    
  43.         // 同步程式碼段  
  44.         synchronized (jmsQueue)    
  45.         {    
  46.             // 如果倉庫儲存量不足  
  47.             while (jmsQueue.size() > MAX_SIZE)    
  48.             {    
  49.                 System.out.println("【訊息庫存量】:"
  50.                         + jmsQueue.size() + "/t暫時不能執行生產任務!");    
  51.                 try
  52.                 {    
  53.                     // 由於條件不滿足,消費阻塞  
  54.                     jmsQueue.wait();    
  55.                 }    
  56.                 catch (InterruptedException e)    
  57.                 {    
  58.                     e.printStackTrace();    
  59.                 }    
  60.             }    
  61.             // 消費條件滿足情況下,消費該訊息 
  62.                 String