1. 程式人生 > >ActiveMQ使用與部署

ActiveMQ使用與部署

1JMS與MQ 
1.1JMS
JMS(Java Messaging Service)是Java平臺上有關面向訊息中介軟體(MOM)的技術規範,它便於訊息系統中的Java應用程式進行訊息交換,並且通過提供標準的產生、傳送、接收訊息的介面簡化企業應用的開發,中文稱Java訊息服務。JMS是一種與廠商無關的 API,用來訪問訊息收發系統訊息;就像JDBC(Java Database Connectivity)可以用來訪問許多不同關係資料庫一樣,JMS則提供同樣與廠商無關的訪問方法,以訪問訊息收發服務。
JMS是在Java標準化組織(JCP)內開發的標準(代號JSR 914)。2001年6月25日,釋出了JMS 1.0.2b,2002年3月18日釋出了JMS1.1,統一了訊息域。
JMS規範約定了兩種訊息方式:P2P(Point To Point)和釋出/訂閱(Publish/Subscribe)。P2P是用來進行兩個節點之間的點對點通訊;釋出/訂閱則是用於一個釋出者和多個訂閱者之間的通訊。
1.2MQ
MQ是Message Queue的縮寫,中文稱訊息佇列,MQ是在訊息的傳輸過程中儲存訊息的容器。訊息管理器在將訊息從它的源中繼到它的目標時充當中間人;佇列的主要目的是提供路由並保證訊息的傳遞;如果傳送訊息時接收者不可用,訊息佇列會保留訊息,直到可以成功地傳遞它。
MQ有很多不同廠商和語言實現的產品,如Sun MQ、Microsoft MQ、IBM MQ的等商業產品;Java語言實現的MQ,則有ActiveMQ、基於JMS標準的OpenJMS,以及新近的遷移專案Jafka等開源專案。
1.3二者關係
JMS是一個用於提供訊息服務的技術規範,TA制定了在整個訊息服務提供過程中的所有資料結構和互動流程。而MQ則是訊息佇列服務,是面向訊息中介軟體(MOM)的最終實現,是真正的服務提供者;MQ的實現可以基於JMS,也可以基於其他規範或標準。

2MQ產品介紹

2.1ActiveMQ
ActiveMQ是Apache的一個頂級Java開源專案,也是目前最流行的,能力強勁的開源訊息佇列。ActiveMQ是比較老牌的MQ,2004年由Apache開源孵化,2007年成為Apache頂級專案。最新版本已經更新到5.8.0,TA擁有眾多特性:
支援Java,C/C++,C#,Ruby,Perl,Python,PHP等多種語言實現客戶端和協議。
完全支援企業整合模式。
支援訊息分組、虛擬目標及複合目標等高階特性。
完全支援JMS1.1,和J2EE1.4(持久化、事務,及XA訊息)。
提供對Spring的支援,可以很容易內嵌到使用Spring的系統中。
通過了常規的J2EE伺服器(如TomEE、Geronimo、Jboss、GlassFish、WebLogic等)的測試。
支援多種傳輸協議,如:in-VM、TCP、SSL、NIO、UDP、multicast、Jgroups、JXTA等。
支援通過JDBC和journal提供高速的訊息持久化。
從設計上保證了高效能的叢集、客戶端-伺服器、點對點通訊。
支援基於Web的API及其他方式的REST呼叫。
支援Ajax。
支援CXF和Axis。
支援用作內嵌JMS provider,進行測試。
2.2其他產品
2.2.1OpenJMS
OpenJMS是完全基於JMS1.1規範實現的JMS provider,有以下特性:
支援通過JDBC提供訊息持久化。
支援Applet。
能夠與Jakarta Tomcat這樣的Servlet容器結合。
支援RMI、TCP、HTTP與SSL協議。
提供可靠訊息傳輸、事務和訊息過濾。
2.2.2Jafka
基於scala語言開發的分散式釋出訂閱訊息系統Kafka的Java移植版,功能比較簡單,但其最大的特色分散式和高效能:
訊息持久化非常快,服務端儲存訊息的開銷為O(1)。
基於檔案系統,能夠持久化TB級的訊息而不損失效能。
吞吐量很大,同等機器配置下,Jafka吞吐量比同類MQ均高。
完全的分散式系統,broker、producer、consumer都原生自動支援分散式;自動實現複雜均衡。
核心非常小,內部機制簡潔,適合進行內嵌或者二次開發。
訊息格式以及通訊機制非常簡單,適合進行跨語言開發。

3ActiveMQ的使用

ActiveMQ的使用分為兩種,一種是嵌入式,即把ActiveMQ作為內嵌的JMS provider整合到Tomcat或其他Web伺服器中;另一種是獨立部署,即ActiveMQ單獨部署,獨立執行,通過ActiveMQ提供的API進行程序外訪問。
我們採用獨立部署的方式使用ActiveMQ,這樣的做法更有利於業務的解耦和工程部署結構的分離;也有利於對ActiveMQ進行配置定製和優化。
我們知道JMS規範中約定了兩種訊息傳送方式:P2P和Publish/Subscribe。根據業務的需求,我們採用P/S方式更為合理,也能更好的發揮ActiveMQ本身的優勢。
3.1訊息的釋出
// 連線到ActiveMQ伺服器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題
Topic topic = session.createTopic("myTopic.messages");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

while(true) {
TextMessage message = session.createTextMessage();
message.setText("TIME:" + (new Date()).toLocaleString());
// 釋出主題訊息
producer.send(message);
System.out.println("Sent message: " + message.getText());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
3.2訊息的訂閱
// 連線到ActiveMQ伺服器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題
Topic topic = session.createTopic("myTopic.messages");
// 建立訂閱
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
// 訂閱接收方法
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
3.3訊息的持久訂閱
// 連線到ActiveMQ伺服器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = factory.createConnection();
String ip = "";
try {
// 獲取本機IP
InetAddress addr = InetAddress.getLocalHost();
ip = addr.getHostAddress().toString();
} catch (UnknownHostException ex) {
ex.printStackTrace();
ip = "";
}
if(!"".equals(ip)) {
System.out.println("CLIENT: " + ip);
// 設定訂閱客戶端ID
connection.setClientID(ip);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題
Topic topic = session.createTopic("myTopic.messages");
// 建立訂閱
MessageConsumer consumer = session.createDurableSubscriber(topic, "test");
consumer.setMessageListener(new MessageListener() {
// 訂閱接收方法
public void onMessage(Message message) {
TextMessage tm = (TextMessage) message;
try {
System.out.println("Received message: " + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
3.4注意事項
普通訂閱的情況下,客戶端只有在連線到伺服器的情況下,才能接收伺服器上的主題訊息。
持久訂閱後,當客戶端線上時,伺服器端會把客戶端在上次下線之後到本次上線之間的所有訊息一併推送給客戶端;這樣就保證了客戶端不會有丟失的訊息。
持久訂閱會引發另一個問題:當新增一個訂閱客戶端時,這個客戶端會收到伺服器上該主題下的所有未過期訊息。

4ActiveMQ的部署

4.1下載
下載地址:http://activemq.apache.org/activemq-580-release.html
4.2安裝
安裝JDK(1.4以上即可)。
設定JAVA_HOME環境變數。
直接解壓ActiveMQ壓縮包。
預設配置下,執行解壓目錄下bin/activemq.bat即可正常執行。之後,可以通過訪問http://localhost:8161/admin檢視ActiveMQ的執行情況(預設使用者名稱和密碼為admin/admin)。
4.3配置
ActiveMQ的配置存放在安裝目錄的conf/activemq.xml檔案中。
因為ActiveMQ採用了Jetty作為容器,因此Jetty相關的配置在conf/jetty.xml檔案中。

5其他

5.1訊息持久化
預設的情況下,ActiveMQ的訊息持久化是基於檔案系統的KahaDB。我們可以通過配置,讓ActiveMQ使用MySQL實現訊息持久化:
將MySQL的jar包複製到安裝目錄的lib下。
修改配置檔案:
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds" />
</persistenceAdapter>
增加節點(與broker節點同級):
<bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true" />
<property name="username" value="activemq" />
<property name="password" value="activemq" />
<property name="maxActive" value="200" />
<property name="poolPreparedStatements" value="true" />
</bean>
然後,重啟ActiveMQ即可。
5.2叢集
ActiveMQ的叢集由伺服器端和客戶端共同完成。伺服器端通過部署Master/Slaver機制,通過進行分散式部署,以實現伺服器叢集的平行擴充套件。而客戶端則採取靜態地址發現,或者動態地址發現的方式,實現伺服器的負載均衡選擇。
5.2.1伺服器端的部署
ActiveMQ支援Master/Slaver機制,但簡單Master/Slaver方式有一定的侷限性,不適合伺服器叢集的平行擴充套件(當然,簡單Master/Slaver已經足夠支撐一般的商業應用)。因此,ActiveMQ提供了支援大併發請求的叢集方式:共享檔案系統的叢集,以及基於JDBC的叢集。
共享檔案系統的叢集
實際上就是基於檔案系統進行叢集部署(前面提到過,ActiveMQ預設的訊息儲存就是基於檔案系統的),可以通過分散式儲存系統或共享資料目錄來實現。這種方式只需要修改conf/activemq.xml:
<persistenceAdapter>
<journaledJDBC dataDirectory="/sharedFileSystem/broker"/>
</persistenceAdapter>
基於JDBC的叢集
原理與共享檔案系統一致,只不過把檔案系統換成了資料庫平臺。即:多臺ActiveMQ連線同一個資料庫,從而實現ActiveMQ的伺服器叢集。配置同5.1。
5.2.2客戶端的使用
伺服器端的叢集對客戶端而言是透明的,但如果客戶端希望得到叢集和負載均衡的功能支援,則必須在程式碼中有所體現。
最常規的方法就是failover協議,fileover支援客戶端在當前伺服器斷開的情況下,自動重新連線到新的伺服器上,而新的伺服器地址可以來源於靜態地址列表,也可以來源於動態地址廣播。
靜態地址發現的常規用法
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false");
動態地址發現的常規用法
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(multicast://host:6255)");
當然,ActiveMQ還支援更多的協議,如:fanout、discovery等。
5.3管理與監控
ActiveMQ提供了一個Web後臺用於檢視伺服器執行狀態,並提供了對訊息佇列、主題及訂閱者等進行管理的功能。
另外,ActiveMQ也可以通過配置支援Nagios的整合監控。