1. 程式人生 > >JMS訊息傳送機制ActiveMQ

JMS訊息傳送機制ActiveMQ

JMS(Java Message Service) 訊息服務是一個面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或者分散式系統中傳送訊息消費訊息,進行非同步通訊,資料互動。JMS提供的API與具體廠商無關,抽象出介面提供給廠商去實現。類似於JDBC是可以用來訪問許多不同關係資料庫的 API.JMS 使您能夠通過訊息收發服務(有時稱為訊息中介程式或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送訊息,並且通過提供標準的產生、傳送、接收訊息的介面簡化企業應用的開發。

訊息:

訊息是一種型別的物件,由兩部分組成:報文頭和訊息主體。報頭由路由資訊以及有關該訊息的元資料組成。訊息主體則攜帶著應用程式的資料。

物件模型:

JMS連線工廠:(ConnectionFactory)

JMS連線:(Connection)

JMS會話:(Session)

JMS目的地:(Destination)

JMS生產者:(Message Procedur)

JMS消費者:(Message Consumer)

模型:

JMS 支援兩類訊息傳送模型(訊息傳送域):點對點模型和釋出/訂閱模型

點對點(Point-to-Point)。在點對點的訊息系統中,訊息分發給一個單獨的使用者。點對點訊息往往與佇列(javax.jms.Queue)相關聯,是一對一的訊息模型。

釋出/訂閱(Publish/Subscribe)。釋出/訂閱訊息系統支援一個事件驅動模型,訊息生產者和消費者都參與訊息的傳遞。生產者釋出事件,而使用者訂閱感興趣的事件,並使用事件。該型別訊息一般與特定的主題(javax.jms.Topic)關聯,是一對多的模型.

ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

整體架構

ActiveMQ主要涉及到5個方面:

  • 傳輸協議

訊息之間的傳遞,無疑需要協議進行溝通,啟動一個ActiveMQ打開了一個監聽埠, ActiveMQ提供了廣泛的連線模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ預設的使用的協議是openWire,埠號:61616;

  • 訊息域

ActiveMQ主要包含Point-to-Point (點對點),Publish/Subscribe Model (釋出/訂閱者),其中在Publich/Subscribe 模式下又有Nondurable subscription和     durable subscription (持久化訂閱)2種訊息處理方式

  • 訊息儲存

在訊息傳遞過程中,部分重要的訊息可能需要儲存到資料庫或檔案系統中,當中介崩潰時,資訊不回丟失

  • Cluster  (叢集)

最常見到 叢集方式包括network of brokers和Master Slave;

  • Monitor (監控)

ActiveMQ一般由jmx來進行監控;

安裝執行

  

 

ActiveMQ HelloWord:

 生產者:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Send
{
	public static final String QueueName = "MessageQueue";

	public static void main(String[] args) throws JMSException
	{
		//1. 建立ConnctionFactroy 工廠物件,需要使用者名稱 密碼 連結的地址
		ConnectionFactory facotry = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
		//2. 通過ConnectionFactory工廠物件建立Connection,並且呼叫start()方法開啟連線,預設是關閉狀態
		Connection connection = facotry.createConnection();
		connection.start();
		//3. 通過Connection建立會話,用於傳送、接收訊息。args0(是否開啟事務) args1(簽收的模式)
		Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		//4 .建立Destination 物件, 生產訊息目標或消費訊息來源的的物件,在PTP中是Queue物件,在Pusb/Sec 是Topic物件
		Destination destination = session.createQueue(QueueName);
		//⑤ 通過Session建立生產者或者消費者
		MessageProducer producer = session.createProducer(destination);
		//⑥ 設定訊息的持久化或非持久化 持久化方式(KaHaDB JDBC[MYSQL ORACLE] ) 
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//⑦ 建立訊息
		TextMessage message = session.createTextMessage();
		message.setText("hello ActiveMQ");
		int i = 0;
		while (i < 7)
		{
			producer.send(message);
			i++;
		}

		if (connection != null)
		{
			connection.close();
		}
	}
}

消費者:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Reveice
{
	public static void main(String[] args) throws Exception
	{
		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		Connection connection = factory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD);
		connection.start();
		Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

		Destination destination = session.createQueue(Send.QueueName);

		MessageConsumer consumer = session.createConsumer(destination);

		while (true)
		{
			TextMessage message = (TextMessage) consumer.receive();
			System.out.println(message.getText());
		}
	}
}


 

本章完結