JMS介紹+訊息組成+訊息型別+體系架構+模型+ActiveMQ演示
1JMS
JMS:Java訊息服務(Java Message Service)應用程式介面。
是一面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務平臺無關,絕大多數MOM提供商都對JMS提供支援。
JMS與廠商無關,用來訪問訊息收發系統訊息,JMS 使您能夠通過訊息收發服務(有時稱為訊息中介程式或路由器)從一個 JMS 客戶機向另一個 JMS客戶機發送訊息。
訊息組成:
報頭:由路由資訊以及有關該訊息的元資料組成。
訊息主體:攜帶著應用程式的資料或有效負載。
訊息型別(有效負載的型別來劃分)
- 簡單文字(TextMessage)
- 可序列化的物件 (ObjectMessage)
- 屬性集合 (MapMessage)
- 位元組流 (BytesMessage)
- 原始值流 (StreamMessage)
- 還有無有效負載的訊息 (Message)。
體系架構
JMS由以下元素組成。
JMS提供者provider:連接面向訊息中介軟體的,JMS介面的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向訊息中介軟體的介面卡。
JMS客戶:生產或消費基於訊息的Java的應用程式或物件。
JMS生產者:建立併發送訊息的JMS客戶。
JMS消費者:接收訊息的JMS客戶。
JMS訊息:包括可以在JMS客戶之間傳遞的資料的物件
JMS佇列:一個容納那些被髮送的等待閱讀的訊息的區域。與佇列名字所暗示的意思不同,訊息的接受順序並不一定要與訊息的傳送順序相同。一旦一個訊息被閱讀,該訊息將被從佇列中移走。
JMS主題:一種支援傳送訊息給多個訂閱者的機制
Java訊息服務應用程式結構支援兩種模型
1點對點或佇列模型
在點對點或佇列模型下,一個生產者向一個特定的佇列釋出訊息,一個消費者從該佇列中讀取訊息。這裡,生產者知道消費者的佇列,並直接將訊息傳送到消費者的佇列。
這種模式被概括為:
只有一個消費者將獲得訊息
生產者不需要在接收者消費該訊息期間處於執行狀態,接收者也同樣不需要在訊息傳送時處於執行狀態。
每一個成功處理的訊息都由接收者簽收
2、釋出者/訂閱者模型
釋出者/訂閱者模型支援向一個特定的訊息主題釋出訊息。0或多個訂閱者可能對接收來自特定訊息主題的訊息感興趣。在這種模型下,釋出者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。
這種模式被概括為:
多個消費者可以獲得訊息。
在釋出者和訂閱者之間存在時間依賴性。釋出者需要建立一個訂閱(subscription),以便客戶能夠訂閱。
訂閱者必須保持持續的活動狀態以接收訊息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連線時釋出的訊息將在訂閱者重新連線時重新發布。
2 ActiveMQ演示
安裝(window)
解壓縮apache-activemq-5.9.0-bin.zip
修改配置檔案activeMQ.xml,將0.0.0.0修改為localhost
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector uri="http://localhost:8081"/>
<transportConnector uri="udp://localhost:61618"/>
</transportConnectors>
雙擊bin\activemq.bat執行ActiveMQ程式。
啟動ActiveMQ以後,登陸:http://localhost:8161/admin/ 就可以訪問了
0
2程式碼設定
生產者,消費者模式
package cn.itcast_03_mq.queue;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerTool implements MessageListener,ExceptionListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "myqueue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private ActiveMQConnectionFactory connectionFactory=null;
public static Boolean isconnection=false;
// 初始化
private void initialize() throws JMSException {
connectionFactory= new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
// 消費訊息
public void consumeMessage() throws JMSException {
initialize();
//啟動連線
connection.start();
//設定監聽,監聽來自MQ佇列的訊息
consumer.setMessageListener(this);
//設定異常檢測
connection.setExceptionListener(this);
System.out.println("消費者開始監聽");
isconnection=true;
// 開始監聽
Message message = consumer.receive();
System.out.println("消費者--訊息ID:"+message.getJMSMessageID());
}
// 關閉連線
public void close() throws JMSException {
System.out.println("消費者關閉連線");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
// 訊息處理函式
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("消費者接受訊息: " + msg);
} else {
System.out.println("消費者接受訊息: " + message);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public void onException(JMSException arg0){
isconnection=false;
}
}
package cn.itcast_03_mq.queue;
public class Consumer implements Runnable {
static Thread t1 = null;
public void run() {
try {
ConsumerTool consumer = new ConsumerTool();
consumer.consumeMessage();
} catch (Exception e) {
}
}
public static void main(String[] args) throws InterruptedException {
t1 = new Thread(new Consumer());
t1.start();
while (true) {
System.out.println("t1執行緒是否存活:"+t1.isAlive());
//放線上程結束
if (!t1.isAlive()) {
t1 = new Thread(new Consumer());
t1.start();
System.out.println("重新啟動t1執行緒");
}
//方便測試
Thread.sleep(5000);
}
}
}
package cn.itcast_03_mq.queue;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "myqueue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageProducer producer = null;
// 初始化
private void initialize() throws JMSException, Exception {
//獲取工廠物件
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
//獲取連線
connection = connectionFactory.createConnection();
//獲取session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題佇列
destination = session.createQueue(subject);
//建立生產者
producer = session.createProducer(destination);
//設定不做持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
// 傳送訊息
public void produceMessage(String message) throws JMSException, Exception {
initialize();
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("生產者開始傳送訊息: " + message);
producer.send(msg);
System.out.println("生產者傳送訊息結束");
}
// 關閉連線
public void close() throws JMSException {
System.out.println("生產者關閉連線");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
public static void main(String[] args) throws JMSException, Exception{
Producer producer = new Producer();
producer.produceMessage("Hello, world22!");
producer.close();
}
}
先開啟消費者,在開啟生產者