訊息中介軟體學習二 --ActiveMQ
1.概述
ActiveMQ是由Apache出品的,一款最流行的,能力強勁的開源訊息中介軟體。
ActiveMQ是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,
它非常快速,支援多種語言的客戶端和協議,
而且可以非常容易的嵌入到企業的應用環境中,並有許多高階功能。
2.安裝
下載地址:http://activemq.apache.org/download.html
執行服務:
第一步:解壓後找到指令碼目錄
第二步:在伺服器上執行下面命令
注:
訪問預設頁面,地址http://127.0.0.1:8161/admin/
預設頁面訪問埠是8161
預設使用者和密碼都是admin
3.特性
(1)多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。
應用協議: OpenWire,STOMP REST,MQTT,AMQP
(2)完全支援JMS1.1和J2EE 1.4規範 (持久化,分散式事務訊息,事務)
(3)對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的框架中去
(4)連線模式多樣化,支援多種傳輸協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
(5)支援通過JDBC和journal提供快速的訊息持久化
( 6)為高效能叢集,客戶端-伺服器,點對點通訊場景而設計
(7)可以輕鬆地與CXF、Axis等Web Service技術整合
(8)可以被作為記憶體中的JMS提供者,非常適合JMS單元測試
(9)提供了REST API介面
(10)支援以AJAX方式呼叫
4.使用場景
(1)多個專案之間整合,跨平臺 ,多語言
(2)降低系統間模組的耦合度,解耦
(3)系統前後端隔離
5.java使用
點對點模式
public static void main(String[] args) {
//1.建立工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory
(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
Connection connection = null;
Session session = null;
try {
//2.建立連線
connection = connectionFactory.createConnection();
connection.start();
//3.建立session
//Session.AUTO_ACKNOWLEDGE: 接收到訊息時,自動ack確認訊息
//Session.CLIENT_ACKNOWLEDGE:接收到訊息時,客戶端呼叫message.acknowledge(),顯式對訊息確認。
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//4.建立Destination
Destination destination = session.createQueue("hello");
//5.建立生產者/消費者
MessageProducer messageProducer = session.createProducer(destination);
MessageConsumer messageConsumer = session.createConsumer(destination);
//6.設定持久化方式
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//7.定義訊息物件併發送/接受
TextMessage textMessage = session.createTextMessage();
textMessage.setText("helloworld");
messageProducer.send(textMessage);
TextMessage receiveMessage = (TextMessage) messageConsumer.receive();
System.out.println(receiveMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(session!=null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
釋出訂閱模式
public class TestPublish {
private static final String TOPIC = "TestPublish.";
private static final String[] topics = {"A", "B", "C"};
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.setTopics(topics);
for (int i = 0; i < 10; i++) {
publisher.sendMessage(topics);
}
publisher.close();
}
public static class Listener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage)message;
String msg = map.getString("msg");
double version = map.getDouble("version");
System.out.println(msg+" version is "+version);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class Customer{
public static void main(String[] args) {
Customer customer = new Customer();
try {
for (String topic : topics) {
Destination destination = customer.getSession().createTopic(TOPIC + topic);
MessageConsumer messageConsumer = customer.getSession().createConsumer(destination);
messageConsumer.setMessageListener(new Listener());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
private Connection connection = null;
private Session session;
public Customer() {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
e.printStackTrace();
}
}
public Session getSession() {
return session;
}
}
public static class Publisher {
private Connection connection = null;
private Destination[] destinations;
private Session session;
private MessageProducer messageProducer;
public Publisher() {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
messageProducer = session.createProducer(null);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void setTopics(String[] topics) {
destinations = new Destination[topics.length];
for (int i = 0; i < topics.length; i++) {
try {
destinations[i] = session.createTopic(TOPIC + topics[i]);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
private Message createTestMessage(String msg, Session session) {
MapMessage message = null;
try {
message = session.createMapMessage();
message.setString("msg", msg);
message.setDouble("version", 1.00);
} catch (JMSException e) {
e.printStackTrace();
}
return message;
}
public void sendMessage(String[] msgs) {
for (int i = 0; i < msgs.length; i++) {
Message message = createTestMessage(msgs[i], session);
try {
messageProducer.send(destinations[i], message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public void close() {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
6.配置詳情
jetty-realm.properties:該配置檔案主要用於配置MQ登入頁面的使用者和密碼
jetty.xml:ActiveMQ內建jetty啟動,該配置檔案包含了管理功能的相關配置
activemq.xml
7.activemq.xml配置檔案包含了很多主要功能的配置
1).配置傳輸連線:
ActiveMQ提供了廣泛的連線模式,包括HTTP/S、JGroups、 JXTA、muticast、SSL、TCP、UDP、XMPP等 注:5.13.0+ 版本後,將OpenWire, STOMP, AMQP,MQTT 這四種主要協議的埠監聽進行了合併,並使用auto關鍵字進行表示。如果想給網路通訊提供安全支援,則可以在uri中使用"auto+ssl"字首
如果想提高吞吐性,則可以在uri中使用"auto+nio"字首
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
高階協議
Failover:一種重連機制,工作於上面介紹的連線協議之上,用於建立可靠的傳輸
配置語法:
failover:(tcp://localhost/61616,tcp://remotehost:61616)?initialReconnectDelay=100
Fanout:一種重連和複製機制,也是工作於其他連線協議之上, 採用複製的方式把訊息複製到多臺訊息伺服器上
配置語法:
fanout:(tcp://localhost/61616,tcp://localhost:61617,tcp://localhost:61618)
2).持久化儲存模式:
1.AMQ訊息儲存—5.0及以前預設的訊息儲存,存放在Data Log中
<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
2.KahaDB訊息儲存—比AMQ訊息儲存更好的可擴充套件性和可恢復性—5.3以後推薦使用的訊息儲存
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
3.LevelDB訊息儲存—5.6以後推出的,使用自定義的索引代替了常用的BTree索引,效能高於KahaDB
<persistenceAdapter>
<levelDB directory="${activemq.data}/levelDB"/>
</persistenceAdapter>
4.JDBC訊息儲存—基於JDBC儲存
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> <!--with ActiveMQ Journal--> <!--克服了JDBC Store的不足,使用快速的快取寫入技術,大大提高了效能。--> <!--<journalPersistenceAdapterFactory journalLogFiles="4"
journalLogFileSize="32768" useJournal="true"
useQuickJournal="true" dataSource="#mysql_ds"
dataDirectory="activemq-data">--> </persistenceAdapter> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
5.Memory訊息儲存—基於記憶體的訊息儲存
<broker persistent="false"> </broker>
6.其他配置詳細
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 訂閱/釋出-->
<policyEntry topic=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb">
<!--
訊息限制策略,面向Slow Consumer的
此策略只對Topic有效,只對nondurable訂閱者有效,當通道中有大量的訊息積壓時,broker可以保留的訊息量。
為了防止Topic中有慢速消費者,導致整個通道訊息積壓。(對於Topic而言,一條訊息只有所有的訂閱者都消費才
會被刪除)
-->
<pendingMessageLimitStrategy>
<!--
ConstantPendingMessageLimitStrategy: 保留固定條數的訊息,如果訊息量超過limit,將使用
“MessageEvictionStrategy”移除訊息
PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍數條訊息。
-->
<!-- 如果prefetchSize為100,則保留10 * 100條訊息 -->
<prefetchRatePendingMessageLimitStrategy multiplier="10"/>
</pendingMessageLimitStrategy>
<!--
訊息剔除策略 面向Slow Consumer的
配合PendingMessageLimitStrategy,只對Topic有效,只對nondurable訂閱者有效。當PendingMessage的數量超過
限制時,broker該如何剔除多餘的訊息。當Topic接收到資訊訊息後,會將訊息“Copy”給每個訂閱者,在儲存這
個訊息時(儲存策略"PendingSubscriberMessageStoragePolicy"),將會檢測pendingMessages的數量是否超過限
制(由"PendingMessageLimitStrategy"來檢測),如果超過限制,將會在pendingMessages中使用
MessageEvicationStrategy移除多餘的訊息,此後將新訊息儲存在PendingMessages中。
-->
<messageEvictionStrategy>
<!--
OldestMessageEvictionStrategy: 移除舊訊息,預設策略。
OldestMessageWithLowestPriorityEvictionStrategy: 舊資料中權重較低的訊息,將會被移除。
UniquePropertyMessageEvictionStrategy: 移除具有指定property的舊訊息。開發者可以指定property的名稱
,從此屬性值相同的訊息列表中移除較舊的(根據訊息的建立時間)。
-->
<OldestMessageWithLowestPriorityEvictionStrategy />
</messageEvictionStrategy>
<!--
慢速消費者策略
Broker將如何處理慢消費者。Broker將會啟動一個後臺執行緒用來檢測所有的慢速消費者,並定期關閉關閉它們。
-->
<slowConsumerStrategy>
<!--
AbortSlowConsumerStrategy: 中斷慢速消費者,慢速消費將會被關閉。abortConnection是否關閉連線
AbortSlowConsumerStrategy: 如果慢速消費者最後一個ACK距離現在的時間間隔超過閥maxTimeSinceLastAck,
則中斷慢速消費者。
-->
<abortSlowConsumerStrategy abortConnection="false"/><!-- 不關閉底層連結 -->
</slowConsumerStrategy>
<!--轉發策略 將訊息轉發給消費者的方式-->
<dispatchPolicy>
<!--
RoundRobinDispatchPolicy: “輪詢”,訊息將依次傳送給每個“訂閱者”。“訂閱者”列表預設按照訂閱的先後
順序排列,在轉發訊息時,對於匹配訊息的第一個訂閱者,將會被移動到“訂閱者
”列表的尾部,這也意味著“下一條”訊息,將會較晚的轉發給它。
StrictOrderDispatchPolicy: 嚴格有序,訊息依次傳送給每個訂閱者,按照“訂閱者”訂閱的時間先後。它和
RoundRobin最大的區別是,沒有移動“訂閱者”順序的操作。
PriorityDispatchPolicy: 基於“property”權重對“訂閱者”排序。它要求開發者首先需要對每個訂閱者指定
priority,預設每個consumer的權重都一樣。
SimpleDispatchPolicy: 預設值,按照當前“訂閱者”列表的順序。其中PriorityDispatchPolicy是其子類。
-->
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!--恢復策略 ActiveMQ重啟如何恢復資料-->
<subscriptionRecoveryPolicy>
<!--
FixedSizedSubscriptionRecoveryPolicy: 儲存一定size的訊息,broker將為此Topic開闢定額的RAM用來儲存
最新的訊息。使用maximumSize屬性指定儲存的size數量
FixedCountSubscriptionRecoveryPolicy: 儲存一定條數的訊息。 使用maximumSize屬性指定儲存的size數量
LastImageSubscriptionRecoveryPolicy: 只保留最新的一條資料
QueryBasedSubscriptionRecoveryPolicy: 符合置頂selector的訊息都將被儲存,具體能夠“恢復”多少訊息
,由底層儲存機制決定;比如對於非持久化訊息,只要記憶體中還
存在,則都可以恢復。
TimedSubscriptionRecoveryPolicy: 保留最近一段時間的訊息。使用recoverDuration屬性指定儲存時間 單位
毫秒
NoSubscriptionRecoveryPolicy: 關閉“恢復機制”。預設值。
-->
<!--恢復最近30分鐘內的資訊-->
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>
<!--"死信"策略 如何處理過去訊息
預設死信佇列(Dead Letter Queue)叫做ActiveMQ.DLQ;所有的未送達訊息都會被髮送到這個佇列,以致會非常
難於管理。 預設情況下,無論是Topic還是Queue,broker將使用Queue來儲存DeadLeader,即死信通道通常為
Queue;不過開發者也可以指定為Topic。
-->
<deadLetterStrategy>
<!--
IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,queuePrefix自定義死信字首
,useQueueForQueueMessages使用佇列儲存死信,還有一個屬性為“useQueueForTopicMessages”,此值表示是否
將Topic的DeadLetter儲存在Queue中,預設為true。
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
SharedDeadLetterStrategy: 將所有的DeadLetter儲存在一個共享的佇列中,這是ActiveMQ broker端預設的策略
。共享佇列預設為“ActiveMQ.DLQ”,可以通過“deadLetterQueue”屬性來設定。還有2個很重要的可選引數
,“processExpired”表示是否將過期訊息放入死信佇列,預設為true;“processNonPersistent”表示是否將“
非持久化”訊息放入死信佇列,預設為false。
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
DiscardingDeadLetterStrategy: broker將直接拋棄DeadLeatter。如果開發者不需要關心DeadLetter,可以使用
此策略。AcitveMQ提供了一個便捷的外掛:DiscardingDLQBrokerPlugin,來拋棄DeadLetter。
下面這個必須配置plugins節點中才對,
丟棄所有死信
<discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
丟棄指定死信
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000"
/>
使用丟棄正則匹配到死信
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}"
reportInterval="3000" />
-->
<individualDeadLetterStrategy queuePrefix="DLQ.TOPIC." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
<!--非耐久待處理訊息處理策略 類似於:pendingQueuePolicy(在下面自己找找)-->
<pendingSubscriberPolicy>
<!--支援三種策略:storeCursor, vmCursor和fileCursor。-->
<fileCursor/>
</pendingSubscriberPolicy>
<!--耐久待處理訊息處理策略 類似於:pendingQueuePolicy(在下面自己找找)-->
<pendingDurableSubscriberPolicy>
<!--支援三種策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。-->
<storeDurableSubscriberCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
<!--訊息佇列-->
<policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb">
<pendingMessageLimitStrategy>
<prefetchRatePendingMessageLimitStrategy multiplier="10"/>
</pendingMessageLimitStrategy>
<messageEvictionStrategy>
<OldestMessageWithLowestPriorityEvictionStrategy />
</messageEvictionStrategy>
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false"/>
</slowConsumerStrategy>
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ.QUEUE." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
<!--
pendingQueuePolicy 待消費訊息策略
通道中有大量Slow Consumer時,Broker該如何優化訊息的轉發,以及在此情況下,“非持久化”訊息達到記憶體
限制時該如何處理。
當Broker接受到訊息後,通常將最新的訊息寫入記憶體以提高訊息轉發的效率,提高訊息ACK的效率,減少對對底
層Store的操作;如果Consumer非常快速,那麼訊息將會立即轉發給Consumer,不需要額外的操作;但當遇到
Slow Consumer時,情況似乎並沒有那麼美好。
持久化訊息,通常為:寫入Store->執行緒輪詢,從Store中pageIn資料到PendingStorage->轉發給Consumer->從
PendingStorage中移除->訊息ACK後從Store中移除。
對於非持久化資料,通常為:寫入記憶體->如果記憶體足夠,則PendingStorage直接以記憶體中的訊息轉發->如果內
存不足,則將記憶體中的訊息swap到臨時檔案中->從臨時檔案中pageIn到記憶體,轉發給Consumer。
AcitveMQ提供了幾個的Cursor機制,它就是用來儲存Pending Messages。
1) vmQueueCursor: 將待轉發訊息儲存在額外的記憶體(JVM linkeList)的儲存結構中。是“非持久化訊息”的默
認設定,如果Broker不支援Persistent,它是任何型別訊息的預設設定。有OOM風險。
2) fileQueueCursor: 將訊息儲存到臨時檔案中。檔案儲存方式有broker的tempDataStore屬性決定。是“持久
化訊息”的預設設定。
3) storeCursor: “綜合”設定,對於非持久化訊息,將採用vmQueueCursor儲存,對於持久化訊息採用
fileQueueCursor。這是強烈推薦的策略,也是效率最好的策略。
-->
<pendingQueuePolicy>
<storeCursor>
<nonPersistent>
<fileQueueCursor/>
</nonPersistent>
</storeCursor>
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
8.部署模式
Master-Slave部署方式
(1). Shared Filesystem Master-Slave方式
支援N個AMQ例項組網,但由於他是基於kahadb儲存策略,
亦可以部署在分散式檔案系統上,應用靈活、高效且安全。
(2). Shared Database Master-Slave方式
與shared filesystem方式類似,只是共享的儲存介質由檔案系統改成了資料庫而已,
支援N個AMQ例項組網,但他的效能會受限於資料庫。
(3). Replicated LevelDB Store方式
ActiveMQ5.9以後才新增的特性,使用ZooKeeper協調選擇一個node作為master。
被選擇的master broker node開啟並接受客戶端連線。其他node轉入slave模式,
連線master並同步他們的儲存狀態。slave不接受客戶端連線。
所有的儲存操作都將被複制到連線至Master的slaves。
如果master死了,得到了最新更新的slave被允許成為master。
Broker-Cluster部署方式
(1). Static Broker-Cluster部署
(2). Dynamic Broker-Cluster部署
9. 示例(基於Shared Filesystem Master-Slave方式)
第一步:修改jetty.xml 和ActiveMQ.xml,
保證在同一臺機器上部署不會衝突,
並建立共享資料夾,然後修改預設kahaDB配置
<!--jetty-->
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
<!--activeMQ-->
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600;
wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/>
</transportConnectors>
<persistenceAdapter>
<kahaDB directory="D:\software\MqSharedData" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/>
</persistenceAdapter>
<!--客戶端配置檔案修改-->
brokerURL="failover:(Master,Slave1,Slave2)?randomize=false"
<!--策略配置:從5.6版本起,在destinationPolicy上新增的選擇replayWhenNoConsumers,
這個選項使得broker2上有需要轉發的訊息但是沒有消費者時,把訊息迴流到它原來的broker1上,
同時需要把enableAudit設定為false,為了防止訊息迴流後被當做重複訊息而不被分發-->
<!--這裡一定要注意空格和換行符的格式,否則無法啟動,而且必須每個都要配置,否則無法迴流-->
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
<!--Static Broker-Cluster-->
<!--duplex=true配置允許雙向連線-->
<!--備註:也可以在兩個broker中都配置,使得其可以相互通訊,則上面的配置就可以不需要了,但是未驗證-->
<!--static:靜態協議。用於為一個網路中的多個代理建立靜態配置,這種配置協議支援複合的URL。
multicast:多點傳送協議。訊息伺服器會廣播自己的服務,也會定位其他代理服務。-->
<networkConnectors>
<!--備用broker埠-->
<networkConnector uri="static:(tcp:// 127.0.0.1:61617)"duplex="false"/>
</networkConnectors>
<!--Dynamic Broker-Cluster-->
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
10.ActiveMQ與Spring整合例項
第一步:引入jar(pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>springmvc</groupId>
<artifactId>springmvc</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<properties>
<springframework>4.1.8.RELEASE</springframework>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!-- JSP相關 -->
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${springframework}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${springframework}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${springframework}</version>
</dependency>
<!-- xbean 如<amq:connectionFactory /> -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.6</version>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.11.1</version>
</dependency>
<!--activeMQ5.14以上版本中集成了spring一些包,導致衝突。-->
<!--解決方法:使用activeMQ5.11.1及以下版本即可。-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-web</artifactId>
<version>5.11.1</version>
</dependency>
<!-- 自用jar包,可以忽略-->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
</project>
第二步:配置applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd">
<!-- 指定spring元件掃描的基本包路徑 -->
<context:component-scan base-package="com.micheal" >
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"></context:exclude-filter>
</context:component-scan>
<mvc:annotation-driven />
</beans>
第三步:配置applicationContext-ActiveMQ.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.11.1.xsd">
<!--<amq:connectionFactory id="amqConnectionFactory"-->
<!--brokerURL="failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=false"/>-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="failover:(tcp://localhost:61616)?randomize=false"/>
<!-- 配置JMS連線工長 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 定義訊息佇列(Queue) -->
<bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 設定訊息佇列的名字 -->
<constructor-arg>
<value>demotest</value>
</constructor-arg>
</bean>
<bean id="demoTopicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 設定訊息佇列的名字 -->
<constructor-arg>
<value>chat1</value>
</constructor-arg>
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它傳送、接收訊息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoTopicDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,預設是false,此處顯示寫出false -->
<property name="pubSubDomain" value="true" />
</bean>
<!-- 配置訊息佇列監聽者(Queue) -->
<bean id="queueMessageListener" class="com.micheal.mq.listener.QueueMessageListener" />
<!-- 配置訊息佇列監聽者(Topic) -->
<bean id="topicMessageListener" class="com.micheal.mq.listener.TopicMessageListener" />
<!--<!– 顯示注入訊息監聽容器(Queue),配置連線工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 –>-->
<!--<bean id="queueListenerContainer"-->
<!--class="org.springframework.jms.listener.DefaultMessageListenerContainer">-->
<!--<property name="connectionFactory" ref="connectionFactory" />-->
<!--<property name="destination" ref="demoQueueDestination" />-->
<!--<property name="messageListener" ref="queueMessageListener" />-->
<!--</bean>-->
<!-- 顯示注入訊息監聽容器(Topic),配置連線工廠,demoTopicDestination,監聽器是上面定義的監聽器 -->
<bean id="topicListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoTopicDestination" />
<property name="messageListener" ref="topicMessageListener" />
</bean>
<bean id="topicListenerContainer1"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="demoQueueDestination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>
</beans>
第四步:配置spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 指定spring元件掃描的基本包路徑 -->
<context:component-scan base-package="com.micheal" >
<context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"></context:include-filter>
</context:component-scan>
<mvc:annotation-driven>
<!--支援非同步,用來實現前端ajax傳送監聽訊息功能-->
<mvc:async-support default-timeout="5000"/>
</mvc:annotation-driven>
<!-- 檢視解析器配置-->
<bean id="viewResolver" class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/views/" />
<property name="suffix" value=".jsp" />
<!-- 定義其解析檢視的order順序為1 -->
<property name="order" value="1" />
</bean>
</beans>
第五步:配置web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
version="3.1">
<display-name>springMvc</display-name>
<welcome-file-list>
<welcome-file>/views/index.jsp</welcome-file>
</welcome-file-list>
<servlet-mapping>
<servlet-name>default</servlet-name>
<url-pattern>*.js</url-pattern>
</servlet-mapping>
<!--用於實現ajax前端呼叫ActiveMQ,必須要要支援非同步-->
<context-param>
<param-name>org.apache.activemq.brokerURL</param-name>
<param-value>tcp://localhost:61616</param-value>
</context-param>
<servlet>
<servlet-name>AjaxServlet</servlet-name>
<servlet-class>org.apache.activemq.web.AjaxServlet</servlet-class>
<load-on-startup>2</load-on-startup>
<async-supported>true</async-supported>
</servlet>
<servlet-mapping>
<servlet-name>AjaxServlet</servlet-name>
<url-pattern>/views/amq/*</url-pattern>
</servlet-mapping>
<!-- 載入spring的配置檔案 -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
classpath:applicationContext*.xml;
</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<servlet>
<servlet-name>springMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
<async-supported>true</async-supported>
</servlet>
<servlet-mapping>
<servlet-name>springMVC</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- 處理編碼格式 -->
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<async-supported>true</async-supported>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
</web-app>
第六步:相關程式碼
/**
* @description:生產者
* @author: micheal
* @date:2018/7/8
*/
@Service("producerService")
public class ProducerService {
@Resource
private JmsTemplate jmsTemplate;
public void sendMessage(final String msg){
Destination destination = jmsTemplate.getDefaultDestination();
System.out.println(Thread.currentThread().getName()+"向預設佇列"+destination+"傳送訊息");
jmsTemplate.send(session -> session.createTextMessage(msg));
}
}
/**
* @description:消費者
* @author: micheal
* @date:2018/7/8
*/
@Service("customerService")
public class CustomerService {
@Resource
private JmsTemplate jmsTemplate;
public TextMessage receive(){
Destination destination = jmsTemplate.getDefaultDestination();
TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
return textMessage;
}
}
/**
* @author micheal
*/
public class TopicMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("從佇列:"+message.getJMSDestination()+"獲取到訊息:"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
第七步:前端程式碼(實現前端頁面聊天室)
<%--
Created by IntelliJ IDEA.
User: michael
Date: 2018/7/10
Time: 01:26
To change this template use File | Settings | File Templates.
--%>
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<%
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
%>
<html>
<head>
<meta charset="UTF-8">
<title>訊息接受頁面</title>
<%-- ActiveMQ的demo下有需要引入的js,複製過來即可--%>
<script type="text/javascript" src="../js/jquery-1.4.2.min.js"></script>
<script type="text/javascript" src="../js/amq_jquery_adapter.js"></script>
<script type="text/javascript" src="../js/amq.js"></script>
<script>
$(function(){
var amq = org.activemq.Amq;
var myDestination='topic://chat1';
amq.init({
uri: '/views/amq', //AjaxServlet所配置對應的URL
logging: true,//啟用日誌記錄
timeout: 20,//保持連線時長,單位為秒
clientId:(new Date()).getTime().toString() //防止多個瀏覽器視窗標籤共享同一個JSESSIONID
});
//傳送訊息
$("#sendBtn").click(function(){
var msg=$("#msg").val();
var name=$("#name").val();
amq.sendMessage(myDestination, "<message name='"+name+"' msg='"+msg+"'/>");
$("#msg").val("");
});
//接收訊息
var myHandler =
{
rcvMessage: function(message)
{
//alert("received "+message);
$("#distext").append(message.getAttribute('name')+":"+message.getAttribute('msg')+"\n");
}
};
amq.addListener('handler',myDestination,myHandler.rcvMessage);
});
</script>
</head>
<body>
<h1>傳送 ajax JMS 訊息</h1>
訊息視窗<br>
<textarea rows="10" cols="50" id="distext" readonly="readonly"></textarea>
<br>
<br/>
暱稱:<input type="text" id="name"><br/>
訊息:<input type="text" id="msg">
<input type="button" value="傳送訊息" id="sendBtn"/>
</body>
</html>
學習來源:https://www.jianshu.com/p/684cf74ab443