訊息中介軟體ActiveMQ(一)HelloWorld入門例項
1、JMS訊息傳送模式
- 在點對點或佇列模型:
一個生產者向一個特定的佇列釋出訊息,一個消費者從該佇列中讀取訊息。這裡,生產者知道消費者的佇列,並直接將訊息傳送到消費者的佇列。這種模式被概括為:只有一個消費者將獲得訊息。生產者不需要在接收者消費該訊息期間處於執行狀態,接收者也同樣不需要在訊息傳送時處於執行狀態。每一個成功處理的訊息都由接收者簽收。
- 釋出者/訂閱者模型:
支援向一個特定的訊息主題釋出訊息。0或多個訂閱者可能對接收來自特定訊息主題的訊息感興趣。在這種模型下,釋出者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得訊息.在釋出者和訂閱者之間存在時間依賴性。釋出者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收訊息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連線時釋出的訊息將在訂閱者重新連線時重新發布。
2、JMS應用程式介面
- ConnectionFactory 介面(連線工廠)
使用者用來建立到JMS提供者的連線的被管物件。JMS客戶通過可移植的介面訪問連線,這樣當下層的實現改變時,程式碼不需要進行修改。 管理員在JNDI名字空間中配置連線工廠,這樣,JMS客戶才能夠查詢到它們。根據訊息型別的不同,使用者將使用佇列連線工廠,或者主題連線工廠。
- Connection 介面(連線)
連線代表了應用程式和訊息伺服器之間的通訊鏈路。在獲得了連線工廠後,就可以建立一個與JMS提供者的連線。根據不同的連線型別,連線允許使用者建立會話,以傳送和接收佇列和主題到目標。
- Destination 介面
目標是一個包裝了訊息目標識別符號的被管物件,訊息目標是指訊息釋出和接收的地點,或者是佇列,或者是主題。JMS管理員建立這些物件,然後使用者通過JNDI發現它們。和連線工廠一樣,管理員可以建立兩種型別的目標,點對點模型的佇列,以及釋出者/訂閱者模型的主題。
- MessageConsumer 介面(訊息消費者)
由會話建立的物件,用於接收發送到目標的訊息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收佇列和主題型別的訊息。
- MessageProducer 介面(訊息生產者)
由會話建立的物件,用於傳送訊息到目標。使用者可以建立某個目標的傳送者,也可以建立一個通用的傳送者,在傳送訊息時指定目標。
- Message 介面(訊息)
是在消費者和生產者之間傳送的物件,也就是說從一個應用程式創送到另一個應用程式。一個訊息有三個主要部分:
訊息頭(必須):包含用於識別和為訊息尋找路由的操作設定。
一組訊息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定製的欄位和過濾器(訊息選擇器)。
一個訊息體(可選):允許使用者建立五種型別的訊息(文字訊息,對映訊息,位元組訊息,流訊息和物件訊息)。
訊息介面非常靈活,並提供了許多方式來定製訊息的內容。
- Session 介面(會話)
表示一個單執行緒的上下文,用於傳送和接收訊息。由於會話是單執行緒的,所以訊息是連續的,就是說訊息是按照發送的順序一個一個接收的。會話的好處是它支援事務。如果使用者選擇了事務支援,會話上下文將儲存一組訊息,直到事務被提交才傳送這些訊息。在提交事務之前,使用者可以使用回滾操作取消這些訊息。一個會話允許使用者建立訊息生產者來發送訊息,建立訊息消費者來接收訊息。
3、訊息佇列
把ActiveMQ依賴的jar包新增到工程中:activemq-all-5.12.0.jar
使用maven工程,則新增jar包的依賴:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
3.1、Producer:
public class QueueSender {
public static void main(String[] args) {
//建立一個連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
try {
//從工廠物件中獲得連線
Connection connection = connectionFactory.createConnection();
//開啟連線
connection.start();
/*
connection.createSession(paramA, paramB)
A)paramA設定為true時:
paramB的值忽略, acknowledgment mode被jms伺服器設定 SESSION_TRANSACTED 。
當一個事務被提交的時候,訊息確認就會自動發生。
B) paramA設定為false時:
Session.AUTO_ACKNOWLEDGE為自動確認,當客戶成功的從receive方法返回的時候,或者從
MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的訊息。
Session.CLIENT_ACKNOWLEDGE 為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的
acknowledge方法。jms伺服器才會刪除訊息。(預設是批量確認)
*/
//開啟一個回話,第一個引數指定不使用事務,第二個引數指定客戶端接收訊息的確認方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一目的地Queue或者是Topic
Queue queue = session.createQueue("mytestqueue");
//建立一個生產者
MessageProducer producer = session.createProducer(queue);
//建立message
TextMessage message = new ActiveMQTextMessage();
message.setText("hello");
//傳送訊息
producer.send(message);
//關閉
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
3.2、Consumer:
消費者有兩種消費方法:
1、同步消費。通過呼叫消費者的receive方法從目的地中顯式提取訊息。receive方法可以一直阻塞到訊息到達。
2、非同步消費。客戶可以為消費者註冊一個訊息監聽器,以定義在訊息到達時所採取的動作。實現MessageListener介面,在MessageListener()方法中實現訊息的處理邏輯。
3.2.1、同步消費:
public class QueueConsumer {
public static void main(String[] args) {
//建立一連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
try {
//建立一個連線
Connection connection = connectionFactory.createConnection();
//開啟連線
connection.start();
//建立一個回話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個目的地Destination
Queue queue = session.createQueue("mytestqueue");
//建立一個消費者
MessageConsumer consumer = session.createConsumer(queue);
while(true) {
//設定接收者接收訊息的時間,為了便於測試,這裡定為100s
Message message = consumer.receive(100000);
if (message != null) {
System.out.println(message);
} else {
//超時結束
break;
}
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.2.2、非同步消費:
public class QueueConsumer {
public static void main(String[] args) {
//建立一連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
try {
//建立一個連線
Connection connection = connectionFactory.createConnection();
//開啟連線
connection.start();
//建立一個回話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個目的地Destination
Queue queue = session.createQueue("mytestqueue");
//建立一個消費者
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
String text = "";
try {
text = ((TextMessage)message).getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(text);
}
}
});
System.in.read();
//關閉
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
4、釋出者/訂閱者
4.1、Producer:
public class TopicProducer {
public static void main(String[] args) {
//建立連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
try {
//建立連線
Connection connection = connectionFactory.createConnection();
//開啟連線
connection.start();
//建立一個回話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個Destination,queue或者Topic
Topic topic = session.createTopic("mytopic");
//建立一個生成者
MessageProducer producer = session.createProducer(topic);
//建立一個訊息
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("hello my topic");
//傳送訊息
producer.send(textMessage);
//關閉
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
4.2、Consumer:
public class TopicConsumer {
public static void main(String[] args) {
//建立連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
try {
//建立連線
Connection connection = connectionFactory.createConnection();
connection.start();
//建立一個會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個目標
Destination destination = session.createTopic("mytopic");
//建立一個消費者
MessageConsumer consumer = session.createConsumer(destination);
//接收訊息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
});
//暫停
System.in.read();
//關閉
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}