JMS訊息傳送機制ActiveMQ
JMS(Java Message Service) 訊息服務是一個面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或者分散式系統中傳送訊息消費訊息,進行非同步通訊,資料互動。JMS提供的API與具體廠商無關,抽象出介面提供給廠商去實現。類似於JDBC是可以用來訪問許多不同關係資料庫的 API.JMS 使您能夠通過訊息收發服務(有時稱為訊息中介程式或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送訊息,並且通過提供標準的產生、傳送、接收訊息的介面簡化企業應用的開發。
訊息:
訊息是一種型別的物件,由兩部分組成:報文頭和訊息主體。報頭由路由資訊以及有關該訊息的元資料組成。訊息主體則攜帶著應用程式的資料。
物件模型:
JMS連線工廠:(ConnectionFactory)
JMS連線:(Connection)
JMS會話:(Session)
JMS目的地:(Destination)
JMS生產者:(Message Procedur)
JMS消費者:(Message Consumer)
模型:
JMS 支援兩類訊息傳送模型(訊息傳送域):點對點模型和釋出/訂閱模型
點對點(Point-to-Point)。在點對點的訊息系統中,訊息分發給一個單獨的使用者。點對點訊息往往與佇列(javax.jms.Queue)相關聯,是一對一的訊息模型。
釋出/訂閱(Publish/Subscribe)。釋出/訂閱訊息系統支援一個事件驅動模型,訊息生產者和消費者都參與訊息的傳遞。生產者釋出事件,而使用者訂閱感興趣的事件,並使用事件。該型別訊息一般與特定的主題(javax.jms.Topic)關聯,是一對多的模型.
ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
整體架構
ActiveMQ主要涉及到5個方面:
- 傳輸協議
訊息之間的傳遞,無疑需要協議進行溝通,啟動一個ActiveMQ打開了一個監聽埠, ActiveMQ提供了廣泛的連線模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ預設的使用的協議是openWire,埠號:61616;
- 訊息域
ActiveMQ主要包含Point-to-Point (點對點),Publish/Subscribe Model (釋出/訂閱者),其中在Publich/Subscribe 模式下又有Nondurable subscription和 durable subscription (持久化訂閱)2種訊息處理方式
- 訊息儲存
在訊息傳遞過程中,部分重要的訊息可能需要儲存到資料庫或檔案系統中,當中介崩潰時,資訊不回丟失
- Cluster (叢集)
最常見到 叢集方式包括network of brokers和Master Slave;
- Monitor (監控)
ActiveMQ一般由jmx來進行監控;
安裝執行
ActiveMQ HelloWord:
生產者:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Send
{
public static final String QueueName = "MessageQueue";
public static void main(String[] args) throws JMSException
{
//1. 建立ConnctionFactroy 工廠物件,需要使用者名稱 密碼 連結的地址
ConnectionFactory facotry = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");
//2. 通過ConnectionFactory工廠物件建立Connection,並且呼叫start()方法開啟連線,預設是關閉狀態
Connection connection = facotry.createConnection();
connection.start();
//3. 通過Connection建立會話,用於傳送、接收訊息。args0(是否開啟事務) args1(簽收的模式)
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4 .建立Destination 物件, 生產訊息目標或消費訊息來源的的物件,在PTP中是Queue物件,在Pusb/Sec 是Topic物件
Destination destination = session.createQueue(QueueName);
//⑤ 通過Session建立生產者或者消費者
MessageProducer producer = session.createProducer(destination);
//⑥ 設定訊息的持久化或非持久化 持久化方式(KaHaDB JDBC[MYSQL ORACLE] )
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//⑦ 建立訊息
TextMessage message = session.createTextMessage();
message.setText("hello ActiveMQ");
int i = 0;
while (i < 7)
{
producer.send(message);
i++;
}
if (connection != null)
{
connection.close();
}
}
}
消費者:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Reveice
{
public static void main(String[] args) throws Exception
{
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD);
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(Send.QueueName);
MessageConsumer consumer = session.createConsumer(destination);
while (true)
{
TextMessage message = (TextMessage) consumer.receive();
System.out.println(message.getText());
}
}
}
本章完結