1. 程式人生 > >JMS(ActiveMQ)學習以及分析

JMS(ActiveMQ)學習以及分析

1.介紹

JMS 是 SUN 公司開發的一套訪問 MOM(Message-Oriented-Middleware) 訊息服務中介軟體的標準 API,MON 提供訊息接收和轉發的服務 , 對訊息進行快取和持久操作 , 保證訊息的安全性 ,JMS 讓開發都無須瞭解遠端過程呼叫的細節和網路通訊協議的細節就可以通過 JMS 向 MOM 傳送訊息 , 藉助訊息我們可以鬆散耦合的方式整合不同的應用。

ActiveMQ 是Apache出品,最流行的、功能強大的即時通訊和整合模式的開源伺服器。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現。提供客戶端支援跨語言和協議,帶有易於在充分支援JMS 1.1和1.4使用J2EE企業整合模式和許多先進的功能。(ActiveMQ只是JMS的一個實現服務,其他的有jboss等)

2.環境安裝

2、 解壓apache-activemq-5.8.0.zip即可完成ActiveMQ的安裝

3、 執行bin/activemq.bat即可

3.啟動服務

如果報這種異常:

Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind

說明mq預設使用的61616埠被佔用了,在大多數情況下,佔用61616埠的是Internet Connection Sharing (ICS) 這個Windows服務,你只需停止它就可以啟動ActiveMQ了。

如果報這種異常:

ERROR | Failed to start Apache ActiveMQ (localhost, ID:mac-4363-1389937469328-0:1). Reason: java.io.IOException: Transport Connector could not be registered in JMX:  Failed to bind to server socket: amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: 
java.net.BindException: Address already in use: JVM_Bind
上面的我不知道是什麼原因引起的,在我的電腦上就是啟動不了,看著像是5672的埠被佔用了,但我的5672確實一個java在使用,具體是誰在使用還不知道,我停掉這個程序後,馬上又被建立了,所以我修改了conf/activemq.xml中的

<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>

就是把它註釋掉,這個看其他的文件,應該是一種連線方式,就像上面的tcp一樣(這裡不用,就不深究了)。

4.管理介面

啟動成功就可以訪問管理員介面:http://localhost:8161/admin,預設使用者名稱和密碼admin/admin。如果你想修改使用者名稱和密碼的話,在conf/jetty-realm.properties中修改即可。

image

其中在導航選單中,Queues是佇列方式訊息。Topics是主題方式訊息。Subscribers訊息訂閱監控查詢。Connections可以檢視連結數,分別可以檢視xmpp、ssl、stomp、openwire、ws和網路連結。Network是網路連結數監控。Send可以傳送訊息資料。

5.程式碼實現(僅僅用點對點queue實現)

先加入activemq.jar包

接收方:
package JMSTest;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSReceiver {
    public static void main(String args[]){
        try{
        ConnectionFactory factory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
        Connection connection=factory.createConnection();
        Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue=session.createQueue("SecondQueue");
//        Context xtx=new InitialContext();
//            System.out.println(xtx.lookup("jndi/jmsConn"));
        MessageConsumer consumer=session.createConsumer(queue);
        connection.start();
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message msg) {
                System.out.println("你好"+msg);
            }
        });
    }catch(Exception e){
            System.out.println(e);
    }
 }
}
傳送方:
package JMSTest;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class JMSSender {
    public static void main(String args[]){
        try{
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,ActiveMQConnection.DEFAULT_BROKER_URL);
        Connection connection=connectionFactory.createConnection();
        Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        Queue queue=session.createQueue("SecondQueue");
            MessageProducer messageProducer=session.createProducer(queue);
            TextMessage textMessage=session.createTextMessage("It is a test");
            messageProducer.send(textMessage);
        session.close();
        connection.close();
        }catch(Exception e){
            System.out.println(e);
        }
    }
}
上面的無所謂先啟動哪個,但是肯定要先啟動activemq.bat才行(相當於中間的伺服器)
6.程式碼分析
ConnectionFactory 介面(連線工廠) 使用者用來建立到JMS提供者的連線的被管物件。JMS客戶通過可移植的介面訪問連線,這樣當下層的實現改變時,程式碼不需要進行修改。 管理員在JNDI名字空間中配置連線工廠,這樣,JMS客戶才能夠查詢到它們。根據訊息型別的不同,使用者將使用佇列連線工廠,或者主題連線工廠。
Connection 介面(連線) 連線代表了應用程式和訊息伺服器之間的通訊鏈路。在獲得了連線工廠後,就可以建立一個與JMS提供者的連線。根據不同的連線型別,連線允許使用者建立會話,以傳送和接收佇列和主題到目標。
Destination 介面(目標) 目標是一個包裝了訊息目標識別符號的被管物件,訊息目標是指訊息釋出和接收的地點,或者是佇列,或者是主題。JMS管理員建立這些物件,然後使用者通過 JNDI發現它們。和連線工廠一樣,管理員可以建立兩種型別的目標,點對點模型的佇列,以及釋出者/訂閱者模型的主題。
MessageConsumer 介面(訊息消費者) 由會話建立的物件,用於接收發送到目標的訊息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收佇列和主題型別的訊息。
MessageProducer 介面(訊息生產者) 由會話建立的物件,用於傳送訊息到目標。使用者可以建立某個目標的傳送者,也可以建立一個通用的傳送者,在傳送訊息時指定目標。
Message 介面(訊息) 是在消費者和生產者之間傳送的物件,也就是說從一個應用程式傳送到另一個應用程式。一個訊息有三個主要部分: 訊息頭(必須):包含用於識別和為訊息尋找路由的操作設定。 一組訊息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定製的欄位和過濾器(訊息選擇器)。 一個訊息體(可選):允許使用者建立五種型別的訊息(文字訊息,對映訊息,位元組訊息,流訊息和物件訊息)。 訊息介面非常靈活,並提供了許多方式來定製訊息的內容。
Session 介面(會話) 表示一個單執行緒的上下文,用於傳送和接收訊息。由於會話是單執行緒的,所以訊息是連續的,就是說訊息是按照發送的順序一個一個接收的。會話的好處是它支援事 務。如果使用者選擇了事務支援,會話上下文將儲存一組訊息,直到事務被提交才傳送這些訊息。在提交事務之前,使用者可以使用回滾操作取消這些訊息。一個會話允 許使用者建立訊息生產者來發送訊息,建立訊息消費者來接收訊息。

ActiviteMQ訊息有3中形式

JMS 公共

點對點域

釋出/訂閱域

ConnectionFactory

QueueConnectionFactory

TopicConnectionFactory

Connection

QueueConnection

TopicConnection

Destination

Queue

Topic

Session

QueueSession

TopicSession

MessageProducer

QueueSender

TopicPublisher

MessageConsumer

QueueReceiver

TopicSubscriber

(1)、點對點方式(point-to-point)

點對點的訊息傳送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存貯訊息,Sneder 傳送訊息,receive接收訊息.具體點就是Sender Client傳送Message Queue ,而 receiver Cliernt從Queue中接收訊息和"傳送訊息已接受"到Quere,確認訊息接收。訊息傳送客戶端與接收客戶端沒有時間上的依賴,傳送客戶端可以在任何時刻傳送資訊到Queue,而不需要知道接收客戶端是不是在執行

(2)、釋出/訂閱 方式(publish/subscriber Messaging)

釋出/訂閱方式用於多接收客戶端的方式.作為釋出訂閱的方式,可能存在多個接收客戶端,並且接收端客戶端與傳送客戶端存在時間上的依賴。一個接收端只能接收他建立以後傳送客戶端傳送的資訊。作為subscriber ,在接收訊息時有兩種方法,destination的receive方法,和實現message listener 介面的onMessage 方法。

傳送訊息的基本步驟:

(1)、建立連線使用的工廠類JMS ConnectionFactory

(2)、使用管理物件JMS ConnectionFactory建立連線Connection,並啟動

(3)、使用連線Connection 建立會話Session

(4)、使用會話Session和管理物件Destination建立訊息生產者MessageSender

(5)、使用訊息生產者MessageSender傳送訊息

訊息接收者從JMS接受訊息的步驟

(1)、建立連線使用的工廠類JMS ConnectionFactory

(2)、使用管理物件JMS ConnectionFactory建立連線Connection,並啟動

(3)、使用連線Connection 建立會話Session

(4)、使用會話Session和管理物件Destination建立訊息接收者MessageReceiver

(5)、使用訊息接收者MessageReceiver接受訊息,需要用setMessageListener將MessageListener介面繫結到MessageReceiver訊息接收者必須實現了MessageListener介面,需要定義onMessage事件方法。


上面獲得ConnectionFactory有三種方式:

1.當用spring容器時

ApplicationContext ctx = new FileSystemXmlApplicationContext("src/cn/com/snt/jms/applicationContext.xml");
 ConnectionFactory factory=(ConnectionFactory) ctx.getBean("connectionFactory");
2.當用jndi容器時
 
  Context ctx=new InitialContext();
 ConnectionFactory factory=(ConnectionFactory)ctx.lookup("jndi/jmsConn");
3.直接呼叫
ConnectionFactory connectionFactory=
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,
ActiveMQConnection.DEFAULT_BROKER_URL);
(上面的是用配置檔案預設的,最後的uri可以為failover://tcp://localhost:61616或者tcp://localhost:61616(當用tcp進行連線的時候),區別是加上failover可以進行備份連線,如第一次連線失敗,可以進行第二次連線failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false 

4.注意接受者中的connection.start();這個必須要有,最好放到消費者建立以後再開啟,為的就是節省資源。

5.接受者中,有兩種方式接受,一種是同步的方法consumer.receive()或者consumer.receive(long),上面的方法是阻塞式的,consumer.receive(long)可以設定等待多長時間,如果為0,則永不超時。第二種方式是非同步的,就是上面的consumer.setMessageListener(new MessageListener() {...}的方式,這種用的比較多。