ActiveMQ.xml 配置詳解
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <value>file:${activemq.conf}/credentials.properties</value> </property> </bean> <!-- Allows log searching in hawtio console --> <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery" lazy-init="false" scope="singleton" init-method="start" destroy-method="stop"> </bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="127.0.0.1" dataDirectory="${activemq.data}">
<destinationPolicy> <policyMap> <policyEntries> <!-- 訂閱/釋出--> <policyEntry topic=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb"> <!-- 訊息限制策略,面向Slow Consumer的 此策略只對Topic有效,只對nondurable訂閱者有效,當通道中有大量的訊息積壓時,broker可以保留的訊息量。 為了防止Topic中有慢速消費者,導致整個通道訊息積壓。(對於Topic而言,一條訊息只有所有的訂閱者都消費才 會被刪除) --> <pendingMessageLimitStrategy> <!-- ConstantPendingMessageLimitStrategy: 保留固定條數的訊息,如果訊息量超過limit,將使用 “MessageEvictionStrategy”移除訊息 PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍數條訊息。 --> <!-- 如果prefetchSize為100,則保留10 * 100條訊息 --> <prefetchRatePendingMessageLimitStrategy multiplier="10"/> </pendingMessageLimitStrategy> <!-- 訊息剔除策略 面向Slow Consumer的 配合PendingMessageLimitStrategy,只對Topic有效,只對nondurable訂閱者有效。當PendingMessage的數量超過 限制時,broker該如何剔除多餘的訊息。當Topic接收到資訊訊息後,會將訊息“Copy”給每個訂閱者,在儲存這 個訊息時(儲存策略"PendingSubscriberMessageStoragePolicy"),將會檢測pendingMessages的數量是否超過限 制(由"PendingMessageLimitStrategy"來檢測),如果超過限制,將會在pendingMessages中使用 MessageEvicationStrategy移除多餘的訊息,此後將新訊息儲存在PendingMessages中。 --> <messageEvictionStrategy> <!-- OldestMessageEvictionStrategy: 移除舊訊息,預設策略。 OldestMessageWithLowestPriorityEvictionStrategy: 舊資料中權重較低的訊息,將會被移除。 UniquePropertyMessageEvictionStrategy: 移除具有指定property的舊訊息。開發者可以指定property的名稱 ,從此屬性值相同的訊息列表中移除較舊的(根據訊息的建立時間)。 --> <OldestMessageWithLowestPriorityEvictionStrategy /> </messageEvictionStrategy> <!-- 慢速消費者策略 Broker將如何處理慢消費者。Broker將會啟動一個後臺執行緒用來檢測所有的慢速消費者,並定期關閉關閉它們。 --> <slowConsumerStrategy> <!-- AbortSlowConsumerStrategy: 中斷慢速消費者,慢速消費將會被關閉。abortConnection是否關閉連線 AbortSlowConsumerStrategy: 如果慢速消費者最後一個ACK距離現在的時間間隔超過閥maxTimeSinceLastAck, 則中斷慢速消費者。 --> <abortSlowConsumerStrategy abortConnection="false"/><!-- 不關閉底層連結 --> </slowConsumerStrategy> <!--轉發策略 將訊息轉發給消費者的方式--> <dispatchPolicy> <!-- RoundRobinDispatchPolicy: “輪詢”,訊息將依次傳送給每個“訂閱者”。“訂閱者”列表預設按照訂閱的先後 順序排列,在轉發訊息時,對於匹配訊息的第一個訂閱者,將會被移動到“訂閱者 ”列表的尾部,這也意味著“下一條”訊息,將會較晚的轉發給它。 StrictOrderDispatchPolicy: 嚴格有序,訊息依次傳送給每個訂閱者,按照“訂閱者”訂閱的時間先後。它和 RoundRobin最大的區別是,沒有移動“訂閱者”順序的操作。 PriorityDispatchPolicy: 基於“property”權重對“訂閱者”排序。它要求開發者首先需要對每個訂閱者指定 priority,預設每個consumer的權重都一樣。 SimpleDispatchPolicy: 預設值,按照當前“訂閱者”列表的順序。其中PriorityDispatchPolicy是其子類。 --> <strictOrderDispatchPolicy/> </dispatchPolicy> <!--恢復策略 ActiveMQ重啟如何恢復資料--> <subscriptionRecoveryPolicy> <!-- FixedSizedSubscriptionRecoveryPolicy: 儲存一定size的訊息,broker將為此Topic開闢定額的RAM用來儲存 最新的訊息。使用maximumSize屬性指定儲存的size數量 FixedCountSubscriptionRecoveryPolicy: 儲存一定條數的訊息。 使用maximumSize屬性指定儲存的size數量 LastImageSubscriptionRecoveryPolicy: 只保留最新的一條資料 QueryBasedSubscriptionRecoveryPolicy: 符合置頂selector的訊息都將被儲存,具體能夠“恢復”多少訊息 ,由底層儲存機制決定;比如對於非持久化訊息,只要記憶體中還 存在,則都可以恢復。 TimedSubscriptionRecoveryPolicy: 保留最近一段時間的訊息。使用recoverDuration屬性指定儲存時間 單位 毫秒 NoSubscriptionRecoveryPolicy: 關閉“恢復機制”。預設值。 --> <!--恢復最近30分鐘內的資訊--> <timedSubscriptionRecoveryPolicy recoverDuration="1800000"/> </subscriptionRecoveryPolicy> <!--"死信"策略 如何處理過去訊息 預設死信佇列(Dead Letter Queue)叫做ActiveMQ.DLQ;所有的未送達訊息都會被髮送到這個佇列,以致會非常 難於管理。 預設情況下,無論是Topic還是Queue,broker將使用Queue來儲存DeadLeader,即死信通道通常為 Queue;不過開發者也可以指定為Topic。 --> <deadLetterStrategy> <!-- IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,queuePrefix自定義死信字首 ,useQueueForQueueMessages使用佇列儲存死信,還有一個屬性為“useQueueForTopicMessages”,此值表示是否 將Topic的DeadLetter儲存在Queue中,預設為true。 <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> SharedDeadLetterStrategy: 將所有的DeadLetter儲存在一個共享的佇列中,這是ActiveMQ broker端預設的策略 。共享佇列預設為“ActiveMQ.DLQ”,可以通過“deadLetterQueue”屬性來設定。還有2個很重要的可選引數 ,“processExpired”表示是否將過期訊息放入死信佇列,預設為true;“processNonPersistent”表示是否將“ 非持久化”訊息放入死信佇列,預設為false。 <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/> DiscardingDeadLetterStrategy: broker將直接拋棄DeadLeatter。如果開發者不需要關心DeadLetter,可以使用 此策略。AcitveMQ提供了一個便捷的外掛:DiscardingDLQBrokerPlugin,來拋棄DeadLetter。 下面這個必須配置plugins節點中才對, 丟棄所有死信 <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> 丟棄指定死信 <discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000" /> 使用丟棄正則匹配到死信 <discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" reportInterval="3000" /> --> <individualDeadLetterStrategy queuePrefix="DLQ.TOPIC." useQueueForQueueMessages="true"/> </deadLetterStrategy> <!--非耐久待處理訊息處理策略 類似於:pendingQueuePolicy(在下面自己找找)--> <pendingSubscriberPolicy> <!--支援三種策略:storeCursor, vmCursor和fileCursor。--> <fileCursor/> </pendingSubscriberPolicy> <!--耐久待處理訊息處理策略 類似於:pendingQueuePolicy(在下面自己找找)--> <pendingDurableSubscriberPolicy> <!--支援三種策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。--> <storeDurableSubscriberCursor/> </pendingDurableSubscriberPolicy> </policyEntry> <!--訊息佇列--> <policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true" memoryLimit="16mb"> <pendingMessageLimitStrategy> <prefetchRatePendingMessageLimitStrategy multiplier="10"/> </pendingMessageLimitStrategy> <messageEvictionStrategy> <OldestMessageWithLowestPriorityEvictionStrategy /> </messageEvictionStrategy> <slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="false"/> </slowConsumerStrategy> <dispatchPolicy> <strictOrderDispatchPolicy/> </dispatchPolicy> <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="1800000"/> </subscriptionRecoveryPolicy> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ.QUEUE." useQueueForQueueMessages="true"/> </deadLetterStrategy> <!-- pendingQueuePolicy 待消費訊息策略 通道中有大量Slow Consumer時,Broker該如何優化訊息的轉發,以及在此情況下,“非持久化”訊息達到記憶體 限制時該如何處理。 當Broker接受到訊息後,通常將最新的訊息寫入記憶體以提高訊息轉發的效率,提高訊息ACK的效率,減少對對底 層Store的操作;如果Consumer非常快速,那麼訊息將會立即轉發給Consumer,不需要額外的操作;但當遇到 Slow Consumer時,情況似乎並沒有那麼美好。 持久化訊息,通常為:寫入Store->執行緒輪詢,從Store中pageIn資料到PendingStorage->轉發給Consumer->從 PendingStorage中移除->訊息ACK後從Store中移除。 對於非持久化資料,通常為:寫入記憶體->如果記憶體足夠,則PendingStorage直接以記憶體中的訊息轉發->如果內 存不足,則將記憶體中的訊息swap到臨時檔案中->從臨時檔案中pageIn到記憶體,轉發給Consumer。 AcitveMQ提供了幾個的Cursor機制,它就是用來儲存Pending Messages。 1) vmQueueCursor: 將待轉發訊息儲存在額外的記憶體(JVM linkeList)的儲存結構中。是“非持久化訊息”的默 認設定,如果Broker不支援Persistent,它是任何型別訊息的預設設定。有OOM風險。 2) fileQueueCursor: 將訊息儲存到臨時檔案中。檔案儲存方式有broker的tempDataStore屬性決定。是“持久 化訊息”的預設設定。 3) storeCursor: “綜合”設定,對於非持久化訊息,將採用vmQueueCursor儲存,對於持久化訊息採用 fileQueueCursor。這是強烈推薦的策略,也是效率最好的策略。 --> <pendingQueuePolicy> <storeCursor> <nonPersistent> <fileQueueCursor/> </nonPersistent> </storeCursor> </pendingQueuePolicy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- ActiveMQ的特性之一是很好的支援JMX。通過JMX MBeans可以很方便的監聽和控制ActiveMQ的broker。
<!--
官方網站提供的JMX特性說明對於遠端訪問的配置流程坑爹,如果想使用jconsole對ActiveMQ進行監控, 無密碼訪問> 需要在borker節點設定useJmx屬性為true,且managementContext節點的createConnector屬性為true。 通過jconsole訪問地址service:jmx:rmi:///jndi/rmi://ip:1099/jmxrmi進行連線, 預設埠為1099,可以通過connectorPort屬性修改連線埠,遠端訪問需要設定connectorHost屬性 為本機ip以供遠端訪問 有密碼訪問> 需要在borker節點設定useJmx屬性為true,且managementContext節點的createConnector屬性為false。 然後在${actviemq.base}/conf目錄下的jmx.access和jmx.password中新增使用者許可權和密碼, 最後修改${activemq.base}/bin/activemq檔案,找到下面的內容然後去掉註釋,儲存退出,重啟activemq即可 # ACTIVEMQ SUNJMX
--> <managementContext> <managementContext createConnector="false"/> </managementContext> <!--持久化儲存--> <persistenceAdapter> <!-- 官方預設的持久化方案 AMQ Message Store 是 ActiveMQ5.0 預設的持久化儲存。Message commands 被儲存到 transactional journal(由 rolling data logs 組成)。Messages 被儲存到 data logs 中,同時被 reference store 進行索引以提高存取速度。 Date logs由一些單獨的 data log 檔案組成,預設的檔案大小是 32M,如果某個訊息的大小超過了 data log 檔案的大 小,那麼可以修改配置以增加 data log 檔案的大小。 如果某個 data log 檔案中所有的訊息都被成功消費了,那麼這個 data log 檔案將會被標記,以便在下一輪的清理中 被刪除或者歸檔。 --> <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/> <!--Kaha Persistence 是一個專門針對訊息持久化的解決方案。它對典型的訊息使用模式進行了優化。在 Kaha 中,資料被 追加到 data logs 中。當不再需要 log檔案中的資料的時候,log 檔案會被丟棄。--> <!-- <kahaDB directory="${activemq.data}/kahadb"/>--> <!-- 支援的資料庫有Apache Derby,Axion,DB2,HSQL,Informix,MaxDB,MySQL,Oracle,Postgresql,SQLServer,Sybase。 如果你使用的資料庫不被支援,那麼可以調整 StatementProvider 來保證使 用正確的 SQL 方言(flavour of SQL)。通常絕大多數資料庫支援以下 adaptor:
1、org.activemq.store.jdbc.adapter.BlobJDBCAdapter 2、org.activemq.store.jdbc.adapter.BytesJDBCAdapter 3、org.activemq.store.jdbc.adapter.DefaultJDBCAdapter 4、org.activemq.store.jdbc.adapter.ImageJDBCAdapter 也可以在配置檔案中直接指定 JDBC adaptor 參考下面的的“jdbc持久化配置” -->
2、總結
想要使用非同步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true, 如果設定了alwaysSyncSend=true系統將會忽略useAsyncSend設定的值都採用同步:
1) 當alwaysSyncSend=false時,“NON_PERSISTENT”(非持久化)、事務中的訊息將使用“非同步傳送”
2) 當alwaysSyncSend=false時,如果指定了useAsyncSend=true,“PERSISTENT”持久化型別的訊息使用非同步傳送。如果useAsyncSend=false,“PERSISTENT”型別的訊息使用同步傳送。
預設情況(alwaysSyncSend=false,useAsyncSend=false),非持久化訊息、事務內的訊息均採用非同步傳送;對於持久化訊息採用同步傳送。