JMS-ActiveMQ:Java訊息服務
阿新 • • 發佈:2019-02-10
導讀:
JMS:Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通.本文接受了JMS的基礎知識,業務需求以及功能實現等。
JMS基礎---》需求----》過程-----》安裝-----》程式碼實現
一、JMS基礎:
1.連線工廠(JMS connectionFactory)
連線工廠是客戶用來建立連線的物件。根據JNDI來查詢。
2.連線(connection)
JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連線。
3.會話(session)
JMS Session是生產訊息和消費訊息的一個單執行緒上下文。會話用於建立訊息生產者(producer)、訊息消費者(consumer)和訊息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組傳送和接收被組合到了一個原子操作中。
4.目的地(destination)
目的地是客戶用來指定它生產的訊息的目標和它消費的訊息的來源的物件。
訊息傳遞域:1 Point-to-Point 訊息(P2P) 點對點;
2 Publish Subscribe messaging(Pub/Sub)釋出/訂閱訊息
在點對點訊息傳遞域中,目的地被稱為佇列(queue);在釋出/訂閱訊息傳遞域中,目的地被稱為主題(topic)。
5.1訊息生產者(producer)
訊息生產者是會話建立的一個物件,用於把訊息傳送到一個目的地。
5.2訊息消費者(consumer)
訊息消費者是由會話建立的一個物件,它用於接收發送到目的地的訊息。
同步消費。通過呼叫消費者的receive方法從目的地中顯式提取訊息。receive方法可以一直阻塞到訊息到達。
非同步消費。客戶可以為消費者註冊一個訊息監聽器,以定義在訊息到達時所採取的動作。消費者類必須實現MessageListener介面,然後在onMessage方法中監聽訊息的到達並處理。
6.訊息(message)
JMS訊息由以下三部分組成:
訊息頭:每個訊息頭欄位都有相應的getter和setter方法。
訊息屬性:如果需要除訊息頭欄位以外的值,那麼可以使用訊息屬性。
訊息體:JMS定義的訊息型別有,簡單文字(TextMessage)、可序列化的物件(ObjectMessage)、屬性集合(MapMessage)、位元組流(BytesMessage)、原始值流(StreamMessage)
JMS應用場合:如果有更新,伺服器端傳送更新資訊到客戶端;廣播
二、需求:
一個電子商務系統,擁有大量使用者。當用戶下單後,需要簡訊或郵件通知對方。
三、過程:
在系統架構中,核心業務系統(A)負責處理使用者訂單,但訂單成功生成後,核心業務傳送通知到訊息驅動的子系統(B)。B系統接到通知後,負責傳送簡訊或電子郵件,傳送成功後告知A系統。
1.A傳送通知--->2.B監聽,並受到訊息--->3.B處理訊息,併發送回復----->4.A監聽回覆,接受到回覆訊息,將回復訊息記錄到資料庫。
四、安裝、啟動、測試:
安裝:在http://activemq.apache.org/download.html 下載5.0.0發行包,解壓即可,
啟動:window環境執行解壓目錄下的/bin/activemq.bat
檢視:http://127.0.0.1:8161/admin
五、擴充套件:一個生產者,多個消費者
第二個消費者也需要實現listener,和第一個消費者一樣,只是需要指定不同的clientId和消費者名:
connection.setClientID("MyClient2");
TopicSubscriber consumer = jmsSession.createDurableSubscriber(
(Topic) envContext.lookup("jms/topic/MyTopic"), "MySub2");
六、程式碼實現:傳送訊息---》接受訊息---》伺服器配置
//1 傳送訊息(接受回覆訊息)
public class SenderMessageService {
//釋出指定訊息到指定地址(在釋出之前,建議將訊息儲存到資料庫)
public void publish(String type, Object object) {
try {
InitialContext initCtx = new InitialContext();
//1
Context envContext = (Context) initCtx.lookup("java:comp/env");
//2
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/NormalConnectionFactory");
//3
Connection connection = connectionFactory.createConnection();
//4
Session jmsSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5,6 Destination:需指定其對應的主題(subject)名稱
MessageProducer producer = jmsSession
.createProducer((Destination) envContext
.lookup("jms/topic/MyTopic"));
// 設定持久方式:根據Destination建立MessageProducer物件,同時設定其持久模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//7
Message message = jmsSession.createMessage();
message.setObjectProperty(type, object);
Topic topic = jmsSession.createTopic("jms/topic/MyTopic");
//8
message.setJMSReplyTo(topic);
//傳送訊息
producer.send(message);
//9 接受回覆的訊息
MessageConsumer consumer = jmsSession.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
String messageReceived = null;
try {
messageReceived = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out
.println("reply message received from customer1:"
+ messageReceived);
}
}
});
connection.start();
// 釋出重新整理帖子訊息
// testMessage.clearProperties();
// testMessage.setStringProperty("RefreshThreadId", "331");
// producer.send(testMessage);
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
//2接受訊息(傳送回覆)
import javax.servlet.*;
import javax.servlet.http.*;
import javax.naming.*;
import javax.jms.*;
import com.brightmart.MessageAction;
import com.brightmart.SM;
import com.util.mail.TestSendMail;
// 初始化jms連線,建立topic監聽器;指定接收訊息時候,做的對應處理
public class JMSListener extends HttpServlet implements MessageListener {
private static final long serialVersionUID = 3963233366687996777L;
//初始化jms連線,建立topic監聽器
public void init(ServletConfig config) throws ServletException {
try {
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");// 1
// 根據JNDI獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");// 2
Connection connection = connectionFactory.createConnection();// 3
// 給connection設定一個clientId
connection.setClientID("MyClient");
// 會話:兩個參,事務和應答模式
Session jmsSession = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);// 4 AUTO_ACKNOWLEDGE
// 普通訊息訂閱者,無法接收持久訊息// MessageConsumer consumer =
// jmsSession.createConsumer((Destination)//envContext.lookup("jms/topic/MyTopic"));
// 基於Topic建立持久的訊息訂閱者,前提:Connection必須指定一個唯一的clientId,當前為MyClient
TopicSubscriber consumer = jmsSession.createDurableSubscriber(
(Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");// 5
consumer.setMessageListener(this);
connection.start();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
//接收訊息,做相應處理
public void onMessage(Message message) {
System.out.println("message in coustomer1.");
if (message == null) {
return;
}
try {
if (message.getObjectProperty("email") != null) {
String emailAddress = (String) message
.getObjectProperty("email");
TestSendMail sendMail = new TestSendMail();
sendMail.sendMail(emailAddress);
message.acknowledge();
Destination d = message.getJMSReplyTo();
Session sessionn = getConnection().createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageProducer p = sessionn.createProducer(d);
TextMessage tm = sessionn
.createTextMessage("ustomer1 RECEIVED a email type message");
System.out
.println("customer1 RECEIVED a email type message");
p.send(tm);
} else if (message.getObjectProperty("message") != null) {
MessageAction m = new MessageAction();
SM sm = new SM();
sm.setDestTermId((String) message.getObjectProperty("message"));
sm.setMsgContent("分散式JMS-ActiveMQ系統測試");
m.addSM(sm);
message.acknowledge();
} else {
System.out.println("接收普通訊息,不做任何處理!");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public Connection getConnection() {
InitialContext initCtx;
try {
initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
// 根據JNDI、url、user、password獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");
Connection connection = connectionFactory.createConnection();
return connection;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
//3 context.xml
Context中新增配置:
<Resource
name="jms/FailoverConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource
name="jms/NormalConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="tcp://localhost:61616"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource name="jms/topic/MyTopic"
auth="Container"
type="org.apache.activemq.command.ActiveMQTopic"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO"/>
<Resource name="jms/queue/MyQueue"
auth="Container"
type="org.apache.activemq.command.ActiveMQQueue"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO.QUEUE"/>
4.傳送簡訊或郵件
請參考本部落格前兩篇文章
注:
1.系統下載ActiveMQ,並允許;
2.ActiveMQ需要融合web伺服器,如可以配置tomcat伺服器的context.xml;
JMS:Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通.本文接受了JMS的基礎知識,業務需求以及功能實現等。
JMS基礎---》需求----》過程-----》安裝-----》程式碼實現
一、JMS基礎:
1.連線工廠(JMS connectionFactory)
連線工廠是客戶用來建立連線的物件。根據JNDI來查詢。
2.連線(connection)
JMS Connection封裝了客戶與JMS提供者之間的一個虛擬的連線。
3.會話(session)
JMS Session是生產訊息和消費訊息的一個單執行緒上下文。會話用於建立訊息生產者(producer)、訊息消費者(consumer)和訊息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組傳送和接收被組合到了一個原子操作中。
4.目的地(destination)
目的地是客戶用來指定它生產的訊息的目標和它消費的訊息的來源的物件。
訊息傳遞域:1 Point-to-Point 訊息(P2P) 點對點;
2 Publish Subscribe messaging(Pub/Sub)釋出/訂閱訊息
在點對點訊息傳遞域中,目的地被稱為佇列(queue);在釋出/訂閱訊息傳遞域中,目的地被稱為主題(topic)。
5.1訊息生產者(producer)
訊息生產者是會話建立的一個物件,用於把訊息傳送到一個目的地。
5.2訊息消費者(consumer)
訊息消費者是由會話建立的一個物件,它用於接收發送到目的地的訊息。
同步消費。通過呼叫消費者的receive方法從目的地中顯式提取訊息。receive方法可以一直阻塞到訊息到達。
非同步消費。客戶可以為消費者註冊一個訊息監聽器,以定義在訊息到達時所採取的動作。消費者類必須實現MessageListener介面,然後在onMessage方法中監聽訊息的到達並處理。
6.訊息(message)
JMS訊息由以下三部分組成:
訊息頭:每個訊息頭欄位都有相應的getter和setter方法。
訊息屬性:如果需要除訊息頭欄位以外的值,那麼可以使用訊息屬性。
訊息體:JMS定義的訊息型別有,簡單文字(TextMessage)、可序列化的物件(ObjectMessage)、屬性集合(MapMessage)、位元組流(BytesMessage)、原始值流(StreamMessage)
JMS應用場合:如果有更新,伺服器端傳送更新資訊到客戶端;廣播
二、需求:
一個電子商務系統,擁有大量使用者。當用戶下單後,需要簡訊或郵件通知對方。
三、過程:
在系統架構中,核心業務系統(A)負責處理使用者訂單,但訂單成功生成後,核心業務傳送通知到訊息驅動的子系統(B)。B系統接到通知後,負責傳送簡訊或電子郵件,傳送成功後告知A系統。
1.A傳送通知--->2.B監聽,並受到訊息--->3.B處理訊息,併發送回復----->4.A監聽回覆,接受到回覆訊息,將回復訊息記錄到資料庫。
四、安裝、啟動、測試:
安裝:在http://activemq.apache.org/download.html 下載5.0.0發行包,解壓即可,
啟動:window環境執行解壓目錄下的/bin/activemq.bat
檢視:http://127.0.0.1:8161/admin
五、擴充套件:一個生產者,多個消費者
第二個消費者也需要實現listener,和第一個消費者一樣,只是需要指定不同的clientId和消費者名:
connection.setClientID("MyClient2");
TopicSubscriber consumer = jmsSession.createDurableSubscriber(
(Topic) envContext.lookup("jms/topic/MyTopic"), "MySub2");
六、程式碼實現:傳送訊息---》接受訊息---》伺服器配置
//1 傳送訊息(接受回覆訊息)
public class SenderMessageService {
//釋出指定訊息到指定地址(在釋出之前,建議將訊息儲存到資料庫)
public void publish(String type, Object object) {
try {
InitialContext initCtx = new InitialContext();
//1
Context envContext = (Context) initCtx.lookup("java:comp/env");
//2
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/NormalConnectionFactory");
//3
Connection connection = connectionFactory.createConnection();
//4
Session jmsSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5,6 Destination:需指定其對應的主題(subject)名稱
MessageProducer producer = jmsSession
.createProducer((Destination) envContext
.lookup("jms/topic/MyTopic"));
// 設定持久方式:根據Destination建立MessageProducer物件,同時設定其持久模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//7
Message message = jmsSession.createMessage();
message.setObjectProperty(type, object);
Topic topic = jmsSession.createTopic("jms/topic/MyTopic");
//8
message.setJMSReplyTo(topic);
//傳送訊息
producer.send(message);
//9 接受回覆的訊息
MessageConsumer consumer = jmsSession.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
String messageReceived = null;
try {
messageReceived = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out
.println("reply message received from customer1:"
+ messageReceived);
}
}
});
connection.start();
// 釋出重新整理帖子訊息
// testMessage.clearProperties();
// testMessage.setStringProperty("RefreshThreadId", "331");
// producer.send(testMessage);
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
//2接受訊息(傳送回覆)
import javax.servlet.*;
import javax.servlet.http.*;
import javax.naming.*;
import javax.jms.*;
import com.brightmart.MessageAction;
import com.brightmart.SM;
import com.util.mail.TestSendMail;
// 初始化jms連線,建立topic監聽器;指定接收訊息時候,做的對應處理
public class JMSListener extends HttpServlet implements MessageListener {
private static final long serialVersionUID = 3963233366687996777L;
//初始化jms連線,建立topic監聽器
public void init(ServletConfig config) throws ServletException {
try {
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");// 1
// 根據JNDI獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");// 2
Connection connection = connectionFactory.createConnection();// 3
// 給connection設定一個clientId
connection.setClientID("MyClient");
// 會話:兩個參,事務和應答模式
Session jmsSession = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);// 4 AUTO_ACKNOWLEDGE
// 普通訊息訂閱者,無法接收持久訊息// MessageConsumer consumer =
// jmsSession.createConsumer((Destination)//envContext.lookup("jms/topic/MyTopic"));
// 基於Topic建立持久的訊息訂閱者,前提:Connection必須指定一個唯一的clientId,當前為MyClient
TopicSubscriber consumer = jmsSession.createDurableSubscriber(
(Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");// 5
consumer.setMessageListener(this);
connection.start();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
//接收訊息,做相應處理
public void onMessage(Message message) {
System.out.println("message in coustomer1.");
if (message == null) {
return;
}
try {
if (message.getObjectProperty("email") != null) {
String emailAddress = (String) message
.getObjectProperty("email");
TestSendMail sendMail = new TestSendMail();
sendMail.sendMail(emailAddress);
message.acknowledge();
Destination d = message.getJMSReplyTo();
Session sessionn = getConnection().createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageProducer p = sessionn.createProducer(d);
TextMessage tm = sessionn
.createTextMessage("ustomer1 RECEIVED a email type message");
System.out
.println("customer1 RECEIVED a email type message");
p.send(tm);
} else if (message.getObjectProperty("message") != null) {
MessageAction m = new MessageAction();
SM sm = new SM();
sm.setDestTermId((String) message.getObjectProperty("message"));
sm.setMsgContent("分散式JMS-ActiveMQ系統測試");
m.addSM(sm);
message.acknowledge();
} else {
System.out.println("接收普通訊息,不做任何處理!");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public Connection getConnection() {
InitialContext initCtx;
try {
initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
// 根據JNDI、url、user、password獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");
Connection connection = connectionFactory.createConnection();
return connection;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
//3 context.xml
Context中新增配置:
<Resource
name="jms/FailoverConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource
name="jms/NormalConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="tcp://localhost:61616"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource name="jms/topic/MyTopic"
auth="Container"
type="org.apache.activemq.command.ActiveMQTopic"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO"/>
<Resource name="jms/queue/MyQueue"
auth="Container"
type="org.apache.activemq.command.ActiveMQQueue"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO.QUEUE"/>
4.傳送簡訊或郵件
請參考本部落格前兩篇文章
注:
1.系統下載ActiveMQ,並允許;
2.ActiveMQ需要融合web伺服器,如可以配置tomcat伺服器的context.xml;
3在專案中,需要引入ActiveMQ的jar.