1. 程式人生 > >JMS介紹+訊息組成+訊息型別+體系架構+模型+ActiveMQ演示

JMS介紹+訊息組成+訊息型別+體系架構+模型+ActiveMQ演示

1JMS

JMS:Java訊息服務(Java Message Service)應用程式介面。

是一面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務平臺無關,絕大多數MOM提供商都對JMS提供支援。

JMS與廠商無關,用來訪問訊息收發系統訊息,JMS 使您能夠通過訊息收發服務(有時稱為訊息中介程式或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送訊息。

訊息組成:

報頭:由路由資訊以及有關該訊息的元資料組成。

訊息主體:攜帶著應用程式的資料或有效負載。

 

訊息型別(有效負載的型別來劃分)

  • 簡單文字(TextMessage)
  • 可序列化的物件 (ObjectMessage)
  • 屬性集合 (MapMessage)
  • 位元組流 (BytesMessage)
  • 原始值流 (StreamMessage)
  • 還有無有效負載的訊息 (Message)。

 

體系架構

JMS由以下元素組成。

JMS提供者provider:連接面向訊息中介軟體的,JMS介面的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向訊息中介軟體的介面卡。

JMS客戶:生產或消費基於訊息的Java的應用程式或物件。

JMS生產者:建立併發送訊息的JMS客戶。

JMS消費者:接收訊息的JMS客戶。

JMS訊息:包括可以在JMS客戶之間傳遞的資料的物件

JMS佇列:一個容納那些被髮送的等待閱讀的訊息的區域。與佇列名字所暗示的意思不同,訊息的接受順序並不一定要與訊息的傳送順序相同。一旦一個訊息被閱讀,該訊息將被從佇列中移走。

JMS主題:一種支援傳送訊息給多個訂閱者的機制

Java訊息服務應用程式結構支援兩種模型

1點對點或佇列模型

在點對點或佇列模型下,一個生產者向一個特定的佇列釋出訊息,一個消費者從該佇列中讀取訊息。這裡,生產者知道消費者的佇列,並直接將訊息傳送到消費者的佇列。

這種模式被概括為:

只有一個消費者將獲得訊息

生產者不需要在接收者消費該訊息期間處於執行狀態,接收者也同樣不需要在訊息傳送時處於執行狀態。

每一個成功處理的訊息都由接收者簽收

2、釋出者/訂閱者模型

釋出者/訂閱者模型支援向一個特定的訊息主題釋出訊息。0或多個訂閱者可能對接收來自特定訊息主題的訊息感興趣。在這種模型下,釋出者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。

這種模式被概括為:

多個消費者可以獲得訊息。

在釋出者和訂閱者之間存在時間依賴性。釋出者需要建立一個訂閱(subscription),以便客戶能夠訂閱。

訂閱者必須保持持續的活動狀態以接收訊息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連線時釋出的訊息將在訂閱者重新連線時重新發布。

2 ActiveMQ演示

安裝(window)

解壓縮apache-activemq-5.9.0-bin.zip

修改配置檔案activeMQ.xml,將0.0.0.0修改為localhost

        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616"/>
            <transportConnector name="ssl"     uri="ssl://localhost:61617"/>
            <transportConnector name="stomp"   uri="stomp://localhost:61613"/>
            <transportConnector uri="http://localhost:8081"/>
            <transportConnector uri="udp://localhost:61618"/>
        </transportConnectors>

雙擊bin\activemq.bat執行ActiveMQ程式。


啟動ActiveMQ以後,登陸:http://localhost:8161/admin/  就可以訪問了

0

 

 

2程式碼設定

 

生產者,消費者模式

package cn.itcast_03_mq.queue;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerTool implements MessageListener,ExceptionListener {
	private String user = ActiveMQConnection.DEFAULT_USER;
	private String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
	private String subject = "myqueue";
	private Destination destination = null;
	private Connection connection = null;
	private Session session = null;
	private MessageConsumer consumer = null;
	private ActiveMQConnectionFactory connectionFactory=null;
	public static Boolean isconnection=false;
	// 初始化
	private void initialize() throws JMSException {
			connectionFactory= new ActiveMQConnectionFactory(
				user, password, url);
			connection = connectionFactory.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue(subject);
			consumer = session.createConsumer(destination);
	}

	// 消費訊息
	public void consumeMessage() throws JMSException {
			initialize();
			//啟動連線
			connection.start();
			//設定監聽,監聽來自MQ佇列的訊息
			consumer.setMessageListener(this);
			//設定異常檢測
			connection.setExceptionListener(this);
			System.out.println("消費者開始監聽");
			isconnection=true;
			// 開始監聽
			Message message = consumer.receive();
			System.out.println("消費者--訊息ID:"+message.getJMSMessageID());
	}

	// 關閉連線
	public void close() throws JMSException {
			System.out.println("消費者關閉連線");
			if (consumer != null)
				consumer.close();
			if (session != null)
				session.close();
			if (connection != null)
				connection.close();
	}

	// 訊息處理函式
	public void onMessage(Message message) {
		try {
			if (message instanceof TextMessage) {
				TextMessage txtMsg = (TextMessage) message;
				String msg = txtMsg.getText();
				System.out.println("消費者接受訊息: " + msg);
			} else {
				System.out.println("消費者接受訊息: " + message);
			}
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void onException(JMSException arg0){
		isconnection=false;
	}
}
package cn.itcast_03_mq.queue;


public class Consumer implements Runnable {
	static Thread t1 = null;
	public void run() {
		try {
			ConsumerTool consumer = new ConsumerTool();
			consumer.consumeMessage();
		} catch (Exception e) {
		}

	}
	
	public static void main(String[] args) throws InterruptedException {

		t1 = new Thread(new Consumer());
		t1.start();
		while (true) {
			System.out.println("t1執行緒是否存活:"+t1.isAlive());
			//放線上程結束
			if (!t1.isAlive()) {
				t1 = new Thread(new Consumer());
				t1.start();
				System.out.println("重新啟動t1執行緒");
			}
			//方便測試
			Thread.sleep(5000);
		}
	}
}

 

package cn.itcast_03_mq.queue;
import javax.jms.Connection;      
import javax.jms.DeliveryMode;      
import javax.jms.Destination;      
import javax.jms.JMSException;      
import javax.jms.MessageProducer;      
import javax.jms.Session;      
import javax.jms.TextMessage;      
     
import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      
     
public class Producer {        
    private String user = ActiveMQConnection.DEFAULT_USER;         
    private String password = ActiveMQConnection.DEFAULT_PASSWORD;       
    private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       
    private String subject = "myqueue";      
    private Destination destination = null;      
    private Connection connection = null;      
    private Session session = null;      
    private MessageProducer producer = null;
    // 初始化   
    private void initialize() throws JMSException, Exception {      
    	//獲取工廠物件
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                user, password, url);     
        //獲取連線
        connection = connectionFactory.createConnection();      
        //獲取session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
//        建立主題佇列
        destination = session.createQueue(subject);    
        //建立生產者
        producer = session.createProducer(destination);      
        //設定不做持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      
    }
 // 傳送訊息          
    public void produceMessage(String message) throws JMSException, Exception {      
        initialize();      
        TextMessage msg = session.createTextMessage(message);      
        connection.start();      
        System.out.println("生產者開始傳送訊息: " + message);      
        producer.send(msg);      
        System.out.println("生產者傳送訊息結束");      
    }
    // 關閉連線      
    public void close() throws JMSException {      
        System.out.println("生產者關閉連線");      
        if (producer != null)      
            producer.close();      
        if (session != null)      
            session.close();      
        if (connection != null)      
            connection.close();      
    }      
    
    public static void main(String[] args) throws JMSException, Exception{      
    	Producer producer = new Producer();     
        producer.produceMessage("Hello, world22!");      
        producer.close();
    }    
}      

 

 

先開啟消費者,在開啟生產者