1. 程式人生 > >ActiveMQ高併發處理方案

ActiveMQ高併發處理方案

高併發傳送訊息異常解決方法:

現象:使用10個執行緒每100ms傳送一條訊息,大約3000多條後,出現異常,所有執行緒停

             止:javax.jms.JMSException:Could not connect to broker

URL: tcp://localhost:61616.Reason:java.net.BindException:     Addressalready in use: connect; nested exception is

java.net.BindException: Address already inuse: connect

原因:建立了太多jms連線沒有來得及回收

解決方法:使用jms連線池

原來的配置:

         <bean>

         <property name="environment">

                  <props>

                            <prop key="java.naming.factory.initial">

org.apache.activemq.jndi.ActiveMQInitialContextFactory

</prop>

                            <prop key="java.naming.provider.url">tcp://huzq-linux:61616</prop>

                  </props>

         </property>

</bean>

<bean>

         <property name="jndiName">

<value>ConnectionFactory</value>

</property>

<property name="jndiTemplate">

<ref local="jndiTemplate"></ref>

</property>

</bean>

修改為:

<bean>

         <property name="connectionFactory">

                   <bean>

                            <property name="brokerURL" value="tcp://huzq-linux:61616" />

                   </bean>

         </property>

</bean>

解決activemq多消費者併發處理

遇到一個現象,如果activemq佇列積壓了資料的話,如果在spring中啟動listner,只有一個consumer執行,查閱了很多資料,無果,後來偶爾通過activemq的監控網頁看到消費者列表中,只有一個消費者有等待處理的資料,其他都沒有,如下圖:


由此得知,activemq有一定機制將佇列中的資料交給consumer處理,這個機制就是資料的數量分配,查資料得知,預設是1000,因此,把這個值調小就可以了。

在客戶端的連線url中,修改為tcp://ipaddr:61616?jms.prefetchPolicy.all=2

這樣基本消費者就分配公平了,不會出現一個消費者忙死,另外的消費者閒死了。

為高併發程式部署ActiveMQ

使用ActiveMQ來擴充套件你的應用程式需要一些時間並要花一些精力.本節中我們將介紹三種技術用於擴充套件應用程式.我們將從垂直擴充套件開始,這種擴充套件方式中,單個代理需要處理成千上萬的連線和訊息佇列.

接下來我們將介紹水平擴充套件,這種擴充套件方式需要處理比前一種方式更多的網路連線.最後,我們介紹的傳輸負載分流,可以在擴充套件和效能間得到平衡,但是會增加ActiveMQ程式的複雜性.

1.        垂直擴充套件:

垂直擴充套件是一種用於增加單個ActiveMQ代理連線數(因而也增加了負載能力)的技術.預設情況下,ActiveMQ的被設計成儘可高效的傳輸訊息以確保低延遲和良好的效能.但是,你也可以進行一些配置使的ActiveMQ代理可以同時處理大量併發的連線以及大量的訊息佇列.

預設情況下,ActiveMQ使用阻塞IO來處理傳輸連線,這種方式為每一個連線分配一個執行緒.你可以為ActiveMQ代理使用非阻塞IO(同時客戶端可以使用預設的傳輸)以減少執行緒的使用.可以在ActiveMQ的配置檔案中通過傳輸聯結器配置非阻塞IO.下面的是配置非阻塞IO的示例

程式碼:配置NIO傳輸聯結器

<broker>

<transportConnectors>

<transportConnector name="nio" uri="nio://localhost:61616"/>

</<transportConnectors>

</broker>

除了為每個連線使用一個執行緒的阻塞IO,ActiveMQ還可以為每一個客戶端連線使用一個訊息分發執行緒.你可以通過將系統引數org.apache.activemq.UseDedicatedTaskRunner設定為false來設定ActiveMQ使用一個搞執行緒池.下面是一個示例:

ACTIVEMQ_OPTS="-Dorg.apache.activemq.UseDedicatedTaskRunner=false"

確保ActiveMQ代理用於足夠的記憶體來處理大量的併發連線,需要分兩步進行:

首先,你需要確保執行ActiveMQ的JVM在啟動之前已經配置了足夠的記憶體.可以使用

JVM的-Xmx選項來配置,如下所示:

ACTIVEMQ_OPTS="-Xmx1024M -Dorg.apache.activemq.UseDedicatedTaskRunner=false"

其次,需要確保JVM配置了適量的專門供ActiveMQ代理使用的記憶體.這個配置可用通過<system-Usage> 元素的limit屬性來配置.一個不錯的根據經驗得到的規則時,在連線數為幾百個時配置512MB為最小記憶體.

如果測試發現記憶體不夠用,可以增加記憶體配置.你可以按照下面程式碼示例來配置ActiveMQ使用的記憶體限制:

程式碼:為ActiveMQ代理設定記憶體使用限制

<systemUsage>

<systemUsage>

<memoryUsage>

<memoryUsage limit="512 mb"/>

</memoryUsage>

<storeUsage>

<storeUsage limit="10 gb" name="foo"/>

</storeUsage>

<tempUsage>

<tempUsage limit="1 gb"/>

</tempUsage>

</systemUsage>

</systemUsage>

同樣,簡易減少每個連線的CPU負載.如果你正使用Open-Wire格式的訊息,關閉tight encoding選項,開啟該選項會導致CPU佔有過多.Tight encoding選項可以通過客戶端連線的URI中的引數設定以便關閉該選項.下面是示例程式碼:

String uri = "failover://(tcp://localhost:61616?" + wireFormat.tightEncodingEnabled=false)";

ConnectionFactory cf = new ActiveMQConnectionFactory(uri);

瞭解了一些擴充套件ActiveMQ代理處理大量連線的調優選項之後,我們在瞭解一些讓ActiveMQ處理大量訊息佇列的調優選項.

預設的訊息佇列配置中使用一個獨立的執行緒負責將訊息儲存中的訊息提取到訊息佇列中而後再被分發到對其感興趣的訊息消費者.如果有大量的訊息佇列,建議通過啟用optimizeDispatch這個屬性

改善這個特性,示例程式碼如下所示:

<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry queue=">" optimizedDispatch="true"/>

</policyEntries>

</policyMap>

</destinationPolicy>

注意,程式碼清單中使用萬用字元>表示該配置會遞迴的應用到所有的訊息佇列中.

為確保擴充套件配置既可以處理大量連線也可以處理海量訊息佇列,請使用JDBC或更新更快的KahaDB訊息儲存.預設情況下ActiveMQ使用KahaDB訊息儲存.

到目前位置,我們關注了連線數擴充套件,減少執行緒使用以及選擇正確的訊息儲存.下面的示例配置程式碼展示了ActiveMQ配置中為擴充套件進行了調優:

程式碼:為擴充套件進行調優的配置示例程式碼

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="amq-broker" dataDirectory="${activemq.base}/data">

         <persistenceAdapter>

                   <kahaDB directory="${activemq.base}/data" journalMaxFileLength="32mb"/>

         </persistenceAdapter>

         <destinationPolicy>

                   <policyMap>

                            <policyEntries>

                                     <policyEntry queue="&gt;" optimizedDispatch="true"/>

                            </policyEntries>

                   </policyMap>

         </destinationPolicy>

         <systemUsage>

                   <systemUsage>

                            <memoryUsage>

                                     <memoryUsage limit="512 mb"/>

                            </memoryUsage>

                            <storeUsage>

                                     <storeUsage limit="10 gb" name="foo"/>

                            </storeUsage>

                            <tempUsage>

                                     <tempUsage limit="1 gb"/>

                            </tempUsage>

                   </systemUsage>

         </systemUsage>

         <transportConnectors>

                   <transportConnector name="openwire" uri="nio://localhost:61616"/>

         </transportConnectors>

</broker>

注意示例程式碼中所有為調優而建議的配置條目,這些調優條目在預設的配置檔案中並沒有配置,所以請確保給予充分重視.

瞭解過如何擴充套件ActiveMQ後,現在是時候瞭解使用代理網路來進行橫向擴充套件了.

2.      橫向擴充套件

除了擴充套件單獨的代理,你還可以使用代理網路來增加應用程式可用的代理數量.因為網路會自動傳遞訊息給所有互聯的具有對訊息感興趣的訊息消費者的代理,所以你可以配置客戶端連線到一個代理叢集,隨機的選擇叢集中的一個代理來連線.可以通過URI中的引數來配置,如下所示:

failover://(tcp://broker1:61616,tcp://broker2:61616)?randomize=true 

為了確保佇列或持久化主題中的訊息不會卡在某個代理上而不能進行轉發,需要在配置網路連線時,將dynamicOnly配置成true並使用小一點的prefetchSize.下面是一個示例:

<networkConnector uri="static://(tcp://remotehost:61617)"

name="bridge" dynamicOnly="true" prefetchSize="1"

</networkConnector> 

示例代理網路來橫向擴充套件並不會的代理更多的延遲,因為訊息在傳送到訊息消費者在之前會經過多個代理.另外一種可選的部署方案可以提供更多的擴充套件性和更好的效能,但是需要在應用程式中做更多的計劃.這種混合的解決方案被稱為傳輸負載分流(traffic partitioning),這種方案通過在應用程式中分割訊息目的地到不同的代理上以完成垂直擴充套件.

3.      傳輸負載分流

客戶端的傳輸負載分流是一個垂直和水平混合的負載分流方案.通常不使用代理網路,因為客戶端程式會決定將哪個負載傳送到哪個(些)代理上.客戶端程式需要維護多個JMS連線並且決定哪個JMS連線應該用於那個訊息目的地.

沒有直接使用網路連線的好處是降低了代理見過量的訊息轉發.你不需要像在傳統程式中那樣進行額外的負載的均衡處理.到這裡,我們已經瞭解了垂直和水平擴充套件以及傳輸負載分流.你應該能夠深刻了解如何使用ActiveMQ來處理大量的併發連線和海量的訊息目的地的連線了.

Activemq在大流量停出現記憶體耗盡的情況以及解決方案

在大量訊息持續傳送到broker的情況下,當broker到消費者之間的網路滿了以後,broker的訊息無法傳送出去,導致在TransportConnection的dispatchQueue中堆積的訊息越來越多。PendingMessageCursor中的訊息不能被及時消費,導致broker判斷消費者為慢消費者。當broker的記憶體被耗盡後JVM會頻繁的進行full gc,由於訊息不能被回收,所以訊息物件會從年輕代轉移到老年代而不會釋放記憶體,導致broker幾乎停止對外服務。

這個問題的根本原因是ActiveMQ只對接收訊息作了流量控制,但是沒有傳送訊息堵塞的情況。需要根據訊息傳送情況來控制訊息的接收。

解決方案,在TransportConnection中的dispatchAsync對dispatchQueue中的訊息數量做判斷,當超過閾值就暫停dispatch,當前thread sleep,這樣TopicSubscription就會暫停接收訊息,避免記憶體耗盡

歡迎訪問我們的技術交流群425783133