ActiveMQ 事務、叢集、持久訂閱者、ActiveMQ監控
JMS介紹
JMS是什麼?
JMS的全稱Java Message Service,既Java訊息服務。
JMS是SUN提供的旨在統一各種MOM(Message-Oriented Middleware)系統介面的規範,它包含點對點(Point to Point,PTP)和釋出/訂閱(Publish/Subscribe,pub/sub)兩種訊息模型,提供可靠訊息傳輸、事務和訊息過濾等機制。
ActiveMQ是Apache出品的開源專案,他是JMS規範的一個實現。
MOM是什麼?
MOM(Message-Oriented Middleware):面向訊息的中介軟體,使用訊息中介軟體來協調訊息傳輸操作。
MOM需要提供API和管理工具
- 客戶端呼叫API,把訊息傳送到訊息中介軟體指定的目的地。在訊息傳送之後,客戶端會繼續執行其他的工作。
- 接收方收到這個訊息確認之前,訊息中介軟體一直保留該訊息。
JMS的作用是什麼?
在不同應用之間進行通訊或者從一個系統傳輸資料到另外一個系統。兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊,程式或應用之間解耦。
它主要用於在生產者和消費者之間進行訊息傳遞,生產者負責產生訊息,而消費者負責接收訊息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成-訊息,並進行傳送,對應的消費者在接收到對應的訊息後去完成對應的業務邏輯。
JMS的應用場景
主要可以應用於規模和複雜度較高的分散式系統:
- 非同步通訊:客戶發出呼叫後,不用等待服務物件完成處理並返回結果後就能繼續執行;
- 客戶和服務物件的生命週期解耦合:客戶進行和服務物件進行不需要都正常執行;如果由於服務物件崩潰或網路故障導致客戶的請求不可達,不會影響客戶端正常響應;
- 一對一或一對多通訊:客戶的一次呼叫可以傳送給一個或多個目標物件;
JMS中的角色
三種角色:生產者(Java應用程式)、消費者(Java應用程式)、訊息中介軟體(MQ)
JMS訊息模型
點對點模型(基於佇列)
- 訊息的生產者和消費者之間沒有時間上的相關性。
- 生產者把訊息傳送到佇列中(Queue),可以有多個傳送者,但只能被一個消費者消費。一個訊息只能被一個消費者消費一次。
- 消費者無需訂閱,當消費者未消費到訊息時就會處於阻塞狀態
釋出者/訂閱者模型(基於主題的)
- 生產者和消費者之間有時間上的相關性,訂閱一個主題的消費者只能消費自它訂閱之後釋出的訊息
- 生產者將訊息傳送到主題上(Topic)
- 消費者必須先訂閱,JMS規範允許提供客戶端建立持久訂閱
JMS訊息組成
訊息頭
訊息正文
JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收一些不同形式的資料,提供現有訊息格式的一些級別的相容性。
- StreamMessage --Java原始值得資料流
- MapMessage --一套名稱-值對
- TextMessage --一個字串物件
- ObjectMessage --一個序列化的Java物件
- BytesMessage --一個位元組的資料流
訊息屬性
總結
1、JMS是什麼?是指定訊息傳送和接收的一套標準
2、JMS的角色:生產者、消費者、MOM訊息中介軟體
3、JMS訊息模型:點對點、釋出訂閱模型
4、JMS訊息正文:Stream、Map、Text、Byte、Object
ActiveMQ介紹
什麼是ActiveMQ
MQ,既Message Queue,就是訊息佇列的意思。
ActiveMQ是Apache出品,最流行,能力強勁的開源訊息匯流排。ActiveMQ是一個完全支援JMS1.1和J2EE 1.4規範的JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊地位。
ActiveMQ主要特點
- 多種語言和協議編寫客戶端,語言:C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby
- 支援Java訊息服務(JMS) 1.1 版本
- 對Srping的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
- 協議支援包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP
- 叢集
ActiveMQ下載安裝
下載
http://activemq.apache.org/components/classic/download/
下載版本(我使用的版本最新):5.15.12
安裝jdk(必須要安裝)
1、先解除安裝系統自帶的jdk
1、檢視安裝的jdk rpm -qa | grep java 2、解除安裝系統自帶jdk rpm -e --nodeps 包名
2、安裝JDK,ActiveMQ是使用Java開發的
當前最新版本下載地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html 歷史版本下載地址: http://www.oracle.com/technetwork/java/javase/archive-139210.html
我下載的是1.8,點我直達
連結: https://pan.baidu.com/s/1DZGsJuLUrhpEQm7jaSKTwg 密碼: baa5
3、解壓到指定位置
tar -zxvf jdk-8u202-linux-x64.tar.gz
4、修改/etc/profile檔案
在最下面新增兩行程式碼
export JAVA_HOME=/cyb/soft/jdk1.8.0_202 export PATH=$JAVA_HOME/bin:$PATH
5、執行source操作
source /etc/profile
6、檢查是否安裝成功
java -version
安裝ActiveMQ
1、解壓縮
tar -zxvf apache-activemq-5.15.12-bin.tar.gz
2、啟動ActiveMQ
cd apache-activemq-5.15.12/bin/ ./activemq start
3、訪問ActiveMQ後臺
地址:http://192.168.191.132:8161/admin/ 賬戶:admin 密碼:admin
4、訪問測試
注:為什麼埠是8161,因為ActiveMQ用的內嵌web伺服器jetty,埠可以修改,配置檔案在/conf/jetty.xml
補充
ActiveMQ與jdk是有版本對應關係的!!!!!
ActiveMQ使用
建立Demo工程
- 訊息生產者:activemq-producer-demo工程(jar)
- 訊息消費者:activemq-consumer-demo工程(jar)
新增Maven依賴
生產者和消費者都要加入以下依賴
<dependencies> <!--activemq依賴--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.12</version> </dependency> <!--junit依賴--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
點對點模式演示
提供者(activemq-producer-demo)
package com.cyb.activemq.producer; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTextMessage; import org.junit.Test; import javax.jms.*; public class Producer { @Test public void testQueueProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:建立ConnectionFactory,用於連線broker String brokerURL = "tcp://192.168.1.106:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設定訊息傳送為同步傳送 ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true); //設定 ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:通過工廠,建立Connection connection = connectionFactory.createConnection(); ((ActiveMQConnection) connection).setUseAsyncSend(true); //第三步:連線啟動 connection.start(); //第四步:通過連接獲取session會話 //第一個引數:是否啟用ActiveMQ事務,如果為true,第二個引數無用 //第二個引數:應答模式,AUTO_ACKNOWLEDGE為自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立destination,兩種目的地:Queue、Topic //引數:訊息佇列的名稱,在後臺管理系統中可以看到 Queue queue = session.createQueue("cyb-queue"); //第六步:通過session建立MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第七步:建立Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("部落格園地址:https://www.cnblogs.com/chenyanbin/"); //第八步:通過producer傳送訊息 producer.send(message1); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } } }
消費者(activemq-consumer-demo)
package com.cyb.activemq.consumer; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class Consumer { @Test public void testQueueConsumer() throws Exception{ //第一步:建立ConnectionFactory String brokerURL="tcp://192.168.1.106:61616"; ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(brokerURL); //第二步:通過工廠,建立Connection Connection connection=connectionFactory.createConnection(); //第三步:開啟連結 connection.start(); //第四步:通過Connection建立session Session session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立Consumer Queue queue=session.createQueue("cyb-queue"); MessageConsumer consumer=session.createConsumer(queue); //第六步:通過consumer接收資訊(兩種方式:1、receive方法接收(同步);2、通過監聽器接收(非同步)) //方式1、receive方法接收資訊 Message message=consumer.receive(100000); //第七步:處理資訊 if (message!=null&&message instanceof TextMessage){ TextMessage tm=(TextMessage)message; System.out.println(tm.getText()); } //方式2:監聽器接收資訊 // consumer.setMessageListener(new MessageListener() { // @Override // public void onMessage(Message message) { // //第七步:處理資訊 // if (message instanceof TextMessage){ // TextMessage tm=(TextMessage)message; // try{ // System.out.println(tm.getText()); // } // catch (Exception e){ // e.printStackTrace(); // } // } // } // }); //session.commit(); //第八步:關閉資源 consumer.close(); session.close(); connection.close(); } }
測試
釋出訂閱模式演示
提供者(activemq-producer-demo)
@Test public void testTopicProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:建立ConnectionFactory,用於連線broker String brokerURL = "tcp://192.168.1.106:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設定訊息傳送為同步傳送 ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true); //設定 ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:通過工廠,建立Connection connection = connectionFactory.createConnection(); ((ActiveMQConnection) connection).setUseAsyncSend(true); //第三步:連線啟動 connection.start(); //第四步:通過連接獲取session會話 //第一個引數:是否啟用ActiveMQ事務,如果為true,第二個引數無用 //第二個引數:應答模式,AUTO_ACKNOWLEDGE為自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立destination,兩種目的地:Queue、Topic //引數:訊息佇列的名稱,在後臺管理系統中可以看到 Topic topic=session.createTopic("cyb-topic"); //第六步:通過session建立MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第七步:建立Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("topic->部落格園地址:https://www.cnblogs.com/chenyanbin/"); //第八步:通過producer傳送訊息 producer.send(message1); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } }
消費者(activemq-consumer-demo)
@Test public void testTopicProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:建立ConnectionFactory,用於連線broker String brokerURL = "tcp://192.168.1.106:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設定訊息傳送為同步傳送 ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true); //設定 ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:通過工廠,建立Connection connection = connectionFactory.createConnection(); ((ActiveMQConnection) connection).setUseAsyncSend(true); //第三步:連線啟動 connection.start(); //第四步:通過連接獲取session會話 //第一個引數:是否啟用ActiveMQ事務,如果為true,第二個引數無用 //第二個引數:應答模式,AUTO_ACKNOWLEDGE為自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立destination,兩種目的地:Queue、Topic //引數:訊息佇列的名稱,在後臺管理系統中可以看到 Topic topic=session.createTopic("cyb-topic"); //第六步:通過session建立MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第七步:建立Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("topic->部落格園地址:https://www.cnblogs.com/chenyanbin/"); //第八步:通過producer傳送訊息 producer.send(message1); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } }
測試
先啟動消費者,在啟動提供者
自定義BrokerServer
package com.cyb.activemq; import org.apache.activemq.broker.BrokerService; public class MyBrokerServer { public static void main(String[] args) { BrokerService brokerService=new BrokerService(); String bindAddress="tcp://localhost:61616"; try { brokerService.setUseJmx(true); brokerService.addConnector(bindAddress); brokerService.start(); } catch (Exception e){ e.printStackTrace(); } } }
JMS事務
建立事務
建立事務的方法:createSession(paramA,paramB);
- paramA是設定事務的,paramB設定acknowledgment mode(應答模式)
- paramA設定為true時,paramB的值忽略,acknowledgment mode被jms伺服器設定Session.SESSION_TRANSACTED。
- paramA設定為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLTENT_ACKNOWLEDGE,Session.DUPS_OK_ACKNOWLEDGE其中一個。
事務的應答模式
- JMS訊息被應答確認後,才會認為是被成功消費,broker才會將訊息清除掉
- 訊息的消費包含三個階段:客戶端接收訊息、客戶端處理訊息、訊息被確認
SESSION_TRANSACTED(開啟事務,預設):
當一個事務被commit的時候,訊息確認就會自動發生。如果開啟了事務,最後沒有執行commit方法,那麼消費者會重複消費該訊息。
AUTO_ACKNOWLEDGE:
自動確認,當客戶成功的從receive方法返回的時候,或者從MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的訊息。
CLIENT_ACKNOWLEDGE(針對消費者):
客戶端確認。客戶端接收到訊息後,必須呼叫 javax.jmx.Message的acknowledge方法,broker才會刪除訊息。(預設是批量確認)
Message.acknowledge();
DUPS_OK_ACKNOWLEDGE:
允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收,而且允許重複確認。如果是重複的訊息,那麼JMS provider必須將訊息頭的JMSRedelivered欄位設定為true。
事務的作用
在一個JMS客戶端,可以使用本地事務來組合訊息的傳送和接收,JMS Session介面提供了commit和rollback方法。
開啟事務之後,JMS Provider會快取每個生產者當前生產的所有訊息,直到commit或rollback。在事務未提交之前,訊息時不會被持久化儲存的,也不會被消費者消費。
- commit:操作將會導致生產者事務中所有的訊息被持久儲存,消費者的所有訊息都被確認。
- rollback:操作將會導致生產者事務中所有的訊息被清除,消費者的所有訊息不被確認。
訊息生產者處理
訊息的持久化和非持久化
ActiveMQ支援兩種傳輸模式:持久傳輸和非持久傳輸,預設情況下使用的是持久傳輸。
兩者差異
- 採用持久傳輸時,傳輸的訊息會儲存到磁碟中,既“儲存轉發”模式,先把訊息儲存到磁碟中,然後再將訊息“轉發”給訂閱者。當Borker宕機恢復後,訊息還在。
- 採用非持久傳輸時,傳送的訊息不會儲存到磁碟中。當Borker宕機重啟後,訊息丟失。
通過MessageProducer類的setDeliveryMode設定傳輸模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
訊息同步傳送和非同步傳送
producer 傳送訊息有同步和非同步兩種模式,可以通過以下方式設定
1、設定ConnectionFactory時指定使用非同步
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616?jms.useAsyncSend=true");
2、不在建構函式中指定,而是修改ConnectionFactory配置
//第一步:建立ConnectionFactory,用於連線broker String brokerURL = "tcp://192.168.1.109:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設定訊息傳送為非同步傳送 ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3、在例項化後的ActiveMQConnection物件中設定非同步傳送
String brokerURL = "tcp://192.168.1.109:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); Connection connection = connectionFactory.createConnection(); ((ActiveMQConnection) connection).setUseAsyncSend(true);
在不考慮事務的情況下:
- producer傳送持久化訊息是同步傳送,傳送是阻塞的,直到收到確認。
- producer傳送非持久化訊息時非同步傳送,非同步傳送不會等待broker的確認,不阻塞。
訊息生產者使用持久傳遞模式傳送訊息的時候,producer.send(message)方法會被阻塞,直到broker傳送一個確認訊息給生產者,這個確認訊息暗示broker已經成功接收訊息並把訊息儲存到二級儲存中。這個過程通常稱為同步傳送。
如果應用程式能容忍一些訊息的丟失,那麼可以使用非同步傳送。非同步傳送不會受到broker的確認之前一直阻塞Producer.send方法。
生產者流量控制
ProducerWindowSize
在ActiveMQ5.0版本中,我們可以分別一個共享連線上的各個生產者進行流量控制,而不需要掛起整個連線。“流量控制”意味著當代理(broker)檢測目標(destination)的記憶體,或臨時檔案空間或檔案儲存空間超過了限制,訊息的流量可以被減慢。生產者將會被阻塞直至資源可用,或者受到一個JMSException異常
- 同步傳送的訊息將會自動對每一個生產者使用流量控制;除非你使用了 useAsynSend標誌,否則這將對同步傳送的永續性訊息都適用。
- 適用非同步傳送的生產者不需要等待來自代理的任何確認訊息;所以,如果記憶體限制被超過了,你不會被通知。如果你真的想知道什麼時候代理的限制被超過了,你需要配置ProducerWindowSize這一連線選項,這樣就算是非同步訊息也會對每一個生產者進行流量控制。
3種方式設定ProducerWindowSize
方式一、
String brokerURL = "tcp://192.168.1.109:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
方式二、
在brokerURL中設定:"tcp://192.168.1.109:61616?jms.produceWindowSize=1000",這種設定將會對所有的produce生效。
方式三、
在destinationUrl中設定"cyb-queue?producer.windowSize=1000",此引數只會對使用此Destination例項的producer生效,將會覆蓋brokerUrl中的producerWindowSize值
配置說明:
ProducerWindowSize是一個生產者在等到確認訊息之前,可以傳送給代理的資料最大byte,這個確認訊息用來告訴生產者,代理已經收到之前傳送的訊息了。
它主要用來約束在非同步傳送時producer端允許非同步傳送的(尚未ACK)的訊息尺寸,且只對非同步傳送有意義。
值越大,意味著消耗Broker伺服器的記憶體就越大。
alwaysSyncSend
如果你要傳送非持久化的訊息(訊息預設是非同步傳送的),並且想要每次都得到佇列或者主題的記憶體限制是否達到,你只需將連線工廠配置為“alwaysSyncSend”,雖然這樣會變得稍微慢一點,但是這將保證當出現記憶體問題時,你的訊息生產者能夠及時得到通知。
((ActiveMQConnection) connection).setAlwaysSyncSend(true);
如何提升訊息傳送效率?(背)
- 在某些場景下,我們的Producer的個數非常有限的,可能只有幾個,比如基於Queue的“訂單接入網管”(生成訂單原始資訊並負責傳遞),但是響應的Consumer的個數相對較多,在整體上Producer效能小於Consumer。
- 還有一些場景,Producer的數量非常多,訊息量也很大,但是Consumer的個數或者效能相對較低,比如“使用者點選流”、“使用者訊息Push系統”等
訊息持久化
1、持久化型別的訊息,對broker端效能消耗遠遠大於非持久化型別
2、這歸結於ActiveMQ本身對持久化訊息確保“最終一致性”,持久化意味著“訊息不丟失”,即當broker接收到訊息後需要一次強制性磁碟同步
3、對於Consumer在消費訊息後,也會觸發磁碟寫入
4、通常broker端還會開啟相關的“過期訊息檢測”執行緒,將儲存器中的資料載入記憶體並檢測,這個過程也是記憶體,磁碟IO消耗的。由此可見,持久化型別的訊息從始至終,都在“拖累”系統的效能和吞吐能力。
訊息屬性
1、通過Producer傳送訊息(Message)中,除了訊息本身的負荷體之外(Consumer),還有大量的JMS屬性和Properties可以設定,因為JMS中,支援對JMS屬性和properties使用selector,那麼這些內容將會加大和複雜化message header,我們儘可能的在properties中攜帶更少
非同步傳送
1、如果訊息是非永續性的,或者Session是基於事務的,建議開發者不要關閉非同步傳送;這是提升Producer傳送效率的重要的策略。
2、設定合適的windowSize,開啟Broker端“Flow Control”等
事務
對於Producer而言,使用事務並不會消耗Broker太多的效能,主要會佔用記憶體,所有未提交的事務訊息,都會儲存在記憶體中,有些基於日誌的儲存器,事務型別的持久化訊息暫存在額外的檔案中,直到日誌提交或回滾後清除。所以,Producer端不要在事務中,積壓太多的訊息,儘可能早的提交事務。
提升Consumer消費速率
無論是Queue還是Topic,快速的Consumer,無疑是提升整體效能的最好手段。
選擇合適的儲存器
activeMQ目前支援JDBC、kahadb、LevelDB三種儲存器。
JDBC主要面向基於RDBMS方向,通常如果訊息不僅面向ActiveMQ,還可能被用於第三方平臺的操作,JDBC的特點就是透明度高,可擴充套件方案較多(擴充套件成本高)。
kahadb和LevelDB,同屬於日誌儲存+BTree索引,效能很高,對於訊息較多(單位尺寸較小),消費速度較快的應用,是最好的選擇,這兩種儲存器也最常用,推薦LevelDB
Broker Server處理
導讀
以下內容都是修改:vim /cyb/soft/apache-activemq-5.15.12/conf/activemq.xml
流量控制
設定指定佇列和主題失效
可以通過在代理配置中,將適當的目的地(destination)的策略(policy)中的producerFlowControl標誌設定為false,使代理商特定的JMS佇列和主題不適用流量控制
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false"/> </policyEntries> </policyMap> </destinationPolicy>
生存記憶體限制
注意,在ActiveMQ 5.x中引入了新的file cursor,非持久化訊息會被刷到臨時檔案儲存中來減少記憶體使用量。所以,你會發現queue的memoryLimit永遠達不到,因為file cursor花不了多少記憶體,如果你真的要把所有非持久化訊息儲存在記憶體中,並且當memoryLimit達到時停止producer,你應該配置<vmQueueCursor>。
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> </policyEntry>
上面的片段能保證,所有的訊息儲存在記憶體中,並且每一個佇列只有1Mb的限制。
配置生產者客戶端的異常
應對Broker代理空間不足,而導致不確定的阻塞send()操作的一種替代方案,就是將其配置成客戶端丟擲一個異常。通過將sendFailIfNoSpace屬性設定為true,代理將會引起send()方法失敗,並丟擲javax.jmx.ResourceAllocationException異常,傳播到客戶端
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
這個屬性的好處是,客戶端可以捕獲javax.jms.ResourceAllocationException異常,稍等一下,並重試send()操作,而不是無限期地傻等下去。
從5.3.1版本之後,sendFailIfNoSpaceAfterTimeout 屬性被加了進來。這個屬性同樣導致send()方法失敗,並在客戶端丟擲異常,但僅當等待了指定時間之後才觸發。如果在配置的等待時間過去之後,代理上的空間仍然沒有被釋放,僅當這個時候send()方法才會失敗,並且在客戶端丟擲異常。下面是一個示例:
<systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="3000"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
定義超時的單位是毫秒,所以上面的例子將會在使send()方法失敗並對客戶端丟擲異常之前,等待三秒。這個屬性的優點是,它僅僅阻塞配置指定的時間,而不是立即另傳送失敗,或者無限期阻塞。這個屬性不僅在代理端提供了一個改進,還對客戶端提供了一個改進,使得客戶端能捕獲異常,等待一下並重試send()操作。
使用流量控制無效
一個普通的需求是使流量控制無效,使得訊息分佈能夠持續,直到所有可用的磁碟被掛起的訊息耗盡。要這樣做,你可以使用訊息遊標。
ActiveMQ的訊息遊標分為三種類型
- Store-based
- VM
- File-based
系統佔用(重要)
你還可以通過<systemUsage>元素的一些屬性來減慢生產者。來看一眼下面的例子:
<systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="64 mb" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb" /> </storeUsage> <tempUsage> <tempUsage limit="10 gb" /> </tempUsage> </systemUsage> </systemUsage>
你可以為非持久化的訊息(NON_PERSISTENT messages)設定記憶體限制,為持久化訊息(PERSISTENT messages)設定磁碟空間,以及為臨時訊息設定總的空間,代理將在減慢生產者之前使用這些空間。使用上述的預設設定,代理將會一直阻塞sen()方法的呼叫,直至一些訊息被消費,並且代理有了可用空間。預設值如上例所述,你可能需要根據你的環境增加這些值。
解決消費緩慢及無法消費的問題(重要)
其中broker中還以單獨設定生產者使用的 producerSystemUsage和消費者使用 consumerSystemUsage,格式跟systemUsage一樣。
預設情況下,沒有配置 producerSystemUsage 和consumerSystemUsage,則生產者和消費者都使用 systemUsage。
問題:
可能會因為生產者執行緒把記憶體用完,導致消費者執行緒處理緩慢甚至無法消費的問題。這種情況下,新增消費端的機器和消費者數量可能都無法增加消費的速度。
解決辦法:
在broker上設定 splitSystemUsageForProducersConsumers="true",使得生產者執行緒和消費者執行緒各使用各的記憶體。
預設是 生產者執行緒記憶體:消費者執行緒記憶體 => 6:4
也可以通過如下兩個引數設定生產者執行緒記憶體和消費者記憶體各一半:
producerSystemUsagePortion = "50" consumerSystemUsagePortion = "50"
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" splitSystemUsageForProducersConsumers="true" producerSystemUsagePortion = "50" consumerSystemUsagePortion = "50">
訊息定時刪除(重要)
<broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/> </policyEntries> </policyMap> </destinationPolicy> </broker>
實現定時自動清除無效的Topic和Queue需要設定三個屬性
- schedulePeriodForDestinationPurge:執行清理任務的週期,單位是毫秒
- gclnactiveDestinations="true":啟動清理功能
- inactiveTiomoutBeforeGC="3000":Topic或Queue超時時間,在規定的時間內,無有效訂閱,沒有入隊記錄,超時就會被清理。
持久化儲存方式
KahaDB基於檔案的儲存(預設)
KahaDB是從ActiveMQ 5.4開始 預設的持久化外掛,也是我們專案現在使用的持久化方式。KahaDB恢復時間遠遠小於其前身AMQ並且使用更少的資料檔案,所以可以完全替代AMQ。KahaDB的持久化機制同樣是基於日誌檔案,索引和快取。
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/> </persistenceAdapter>
directory:指定持久化訊息的儲存目錄
journalMaxFileLength:指定儲存訊息的日誌檔案大小,具體根據你的實際應用配置
KahaDB主要特性
- 日誌形式儲存訊息
- 訊息索引以B-Tree結構儲存,可以快速更新;
- 完全支援JMS事務
- 支援多種恢復機制
AMQ 基於檔案的儲存
效能高於JDBC,寫入訊息時,會將訊息寫入日誌檔案,由於很高。為了提升效能,建立訊息主鍵索引,並且提供快取機制,進一步提升效能。每個日誌檔案的大小都是有限制的(預設32m,可配置)。
當超過這個大小,系統會重新建立一個檔案。當所有的訊息都消費完成,系統會刪除這個檔案或者歸檔
主要的缺點是:
- AMQ Message會為每一個Destination建立一個索引,如果使用了大量的Queue,索引檔案的大小會佔用很多磁碟空間
- 由於索引巨大,一旦Broker崩潰,重建索引的速度會非常慢
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/> </persistenceAdapter>
JDBC基於資料庫的儲存
1、首先將以下驅動放到lib目錄下,驅動包和ActiveMQ我已上傳至百度雲,下面有連線供下載
驅動包,百度雲盤地址:https://pan.baidu.com/s/1veqFD2k49x5m97FA6CAwJA 密碼: gea6
2、修改配置檔案:conf/activemq.xml
<persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> <jdbcPersistenceAdapter dataSource="#activemq-db" createTablesOnStartup="true" /> </persistenceAdapter>
dataSource指定持久化資料庫的bean,createTablesOnStartup是否在啟動的時候建立資料表,預設使用true,這樣每次啟動都會去建立資料表了,一般第一次啟動的時候設定為true,之後改成false
3、在配置檔案中的broker節點外增加以下內容
<bean id="activemq-db" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://192.168.31.206:3306/activemq"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
4、從配置中可以看出資料庫的名稱是activemq,需要手動在mysql中增加這個庫,然後重啟訊息佇列,你會發現多了三張表
- activeme_acks ->儲存持久訂閱的資訊
- activemq_lock ->鎖表(用來做叢集的時候,實現master選舉的表)
- activemq_msgs ->訊息表
補充:
mysql必須支援遠端連線!!!!
控制檯: 1、mysql -uroot -proot 2、GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
Memory基於記憶體
基於記憶體的訊息儲存,就是訊息儲存在記憶體中。persistent="false":表示不設定持久化儲存,直接儲存到記憶體中
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false">
訊息消費者處理
prefetch機制
prefetch即在activemq中消費者預獲取訊息數量,重要的調優引數之一。當消費者存活時,broker將會批量push prefetchSize條訊息給消費者,消費者也可以配合optimizeAcknowledge來批量確認它們。由於broker批量push訊息給消費者,提高了網路傳輸效率,預設為1000。
broker端將會根據consumer指定的prefetchSize來決定pendingBuffer的大小,prefetchSize越大,broker批量傳送的訊息就會越多,如果消費者消費速度較快,再配合optimizeAck,這將是相對完美的訊息傳送方案。
不過,prefetchSize也會帶來一定的問題,在Queue中(Topic中沒有效果),broker將使用“輪詢”方式來平衡多個消費者之間的訊息傳送數量,如果消費者消費速度較慢,而prefetchSize較大,這將不利於訊息量在多個消費者之間平衡。通常情況下,如果consumer數量較多,或者消費速度較慢,或者訊息量較少時,我們應該設定prefetchSize為較小的值。
設定prefetchSize的方式如下:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("cyb-queue?customer.prefetchSize=100");
prefetch值建議在destinationUrl中指定,因為在brokerUrl中指定比較繁瑣;在brokerUrl中,queuePrefetchSize和topicPrefetchSize都需要單獨設定:"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等逐個指定。
optimizeACK機制
optimizeACK,可優化的訊息ACK策略,關係到是否批量確認訊息的策略,這個是Consumer端最重要的調優引數之一。optimizeAcknowledge表示是否開啟“優化ACK選項”,當開啟optimizeACK策略後,只有當optimizeACK為true,也只會當session的ACK_MODE為AUTO_ACKNOWLEDGE時才會生效。
該引數的具體含義和消費端的處理如下:
- 當consumer.optimizeACK有效時,如果客戶端已經消費但尚未確認的訊息(deliveredMessage)達到prefetch*0.65,從consumer端將會自動進行ACK。
- 同事如果離上一次ACK的時間間隔,已經超過“optimizeAcknowledgeTimeout”毫秒,也會導致自動進行ACK。
String brokerURL = "tcp://192.168.31.215:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=30000"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
ACK模型和型別介紹
ACK模型
ACK模型是確定應答的時機
- AUTO_ACKNOWLEDGE = 1 ->自動確認
- CLIENT_ACKNOWLEDGE = 2 ->客戶端手動確認
- DUPS_OK_ACKNOWLEDGE = 3 ->自動批量確認
- SESSION_TRANSACTED = 0 ->事務提交併確認
ACK型別
ACK型別是確定應答的型別,客戶端根據ACK型別的不同,需要不同的處理,比如訊息重發。
Client端指定了ACK模式,但是在Client與broker在交換ACK指令的時候,還需要告知ACK_TYPE,ACK_TYPE表示此確認指定的型別,不同的ACK_TYPE將傳遞著訊息的狀態,broker可以根據不同的ACK_TYPE對訊息進行不同的操作。
比如Consumer消費訊息時出現異常,就需要向broker傳送ACK指定,ACK_TYPE為“REDELIVERED_ACK_TYPE”
,那麼broker就會重新發送此訊息。在JMS API中並沒有定義ACK_TYPE,因為它通常是一種內部機制,並不會面向開發者。ActiveMQ中定義瞭如下幾種ACK_TYPE
- DELIVERED_ACK_TYPE = 0 訊息“已接收”,但尚未處理結束
- STANDARD_ACK_TYPE = 2 “標準”型別,通常表示為訊息“處理成功”,broker端可以刪除訊息了
- POSION_ACK_TYPE = 1 訊息“錯誤”,通常表示“拋棄”此訊息,比如訊息重發多次後,都無法正常處理時,訊息將會被刪除或DLQ(死信佇列)
- REDELIVERED_ACK_TYPE = 3 訊息需“重發”,比如consumer處理訊息時丟擲異常,broker稍後會重新發送此訊息
- INDIVIDUAL_ACK_TYPE = 4 表示只確認“單條訊息”,無論在任何ACK_MODE下
- UNMATCHED_ACK_TYPE = 5 在Topic中,如果一條訊息在轉發給“訂閱者”時,發現此訊息不合符Selector過濾條件,那麼此訊息將不會轉發給訂閱者,訊息將會被儲存引擎刪除
重發機制
可以在brokerUrl中配置“redelivery”策略,比如當一條訊息處理異常時,broker端還可以重發的最大次數。當訊息需要broker端重發時,consumer會首先在本地的“deliveredMessage佇列”(Consumer已經接收但未確認的訊息佇列)刪除它,善後向broker傳送“REDELIVERED_ACK_TYPE”型別的確認指令,broker將會把指令中指定的訊息重新新增到pendingQueue中,直到合適的時機,再次push給client。
持久化訂閱和非持久化訂閱
注意事項:
- 持久化訂閱和非持久化訂閱針對的訊息模型是Pub/Sub,而不是P2P
- 持久化訂閱需要消費者先執行訂閱,然後生產者再發送訊息
- 如果消費者宕機,而又不想丟失它宕機期間的訊息,就需要開啟持久訂閱。如果對於同一個訊息有多個消費者需要開啟持久訂閱的情況,則設定的clientID不能相同
消費者
public void testTopicConsumer2() throws Exception { //第一步:建立ConnectionFactory String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //第二步:通過工廠,建立Connection Connection connection = connectionFactory.createConnection(); //設定持久訂閱的客戶端ID String clientId = "10086"; connection.setClientID(clientId); //第三步:開啟連結 connection.start(); //第四步:通過Connection建立session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立Consumer Topic topic = session.createTopic("cyb-topic"); //建立持久訂閱的消費者客戶端 //第一個引數是指定Topic //第二個引數是自定義的ClientId MessageConsumer consumer = session.createDurableSubscriber(topic, "client1-sub"); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //第七步:處理資訊 if (message instanceof TextMessage){ TextMessage tm=(TextMessage)message; try{ System.out.println(tm.getText()); } catch (Exception e){ e.printStackTrace(); } } } }); //session.commit(); //第八步:關閉資源 consumer.close(); session.close(); connection.close(); }
提供者
public void testTopicProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:建立ConnectionFactory,用於連線broker String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設定訊息傳送為同步傳送 ((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(false); //設定 //((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:通過工廠,建立Connection connection = connectionFactory.createConnection(); ((ActiveMQConnection) connection).setUseAsyncSend(false); //第三步:連線啟動 connection.start(); //第四步:通過連接獲取session會話 //第一個引數:是否啟用ActiveMQ事務,如果為true,第二個引數無用 //第二個引數:應答模式,AUTO_ACKNOWLEDGE為自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立destination,兩種目的地:Queue、Topic //引數:訊息佇列的名稱,在後臺管理系統中可以看到 Topic topic = session.createTopic("cyb-topic"); //第六步:通過session建立MessageProducer producer = session.createProducer(topic); //producer.setDeliveryMode(DeliveryMode.PERSISTENT); //第七步:建立Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("topic->部落格園地址:https://www.cnblogs.com/chenyanbin/"); //第八步:通過producer傳送訊息 producer.send(message1,DeliveryMode.PERSISTENT,1,1000*60*5); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } }
測試
此處就不測試了,因為當初測試時候,踩了一次坑,測試結果,已經記錄到另一篇部落格:ActiveMQ 持久訂閱者,執行結果與初衷相違背,驗證離線訂閱者無效,問題解決
ActiveMQ叢集
ActiveMQ叢集配置
刪除一些不用的埠
修改activemq.xml配置檔案
方式一:
在任意一臺Linux機器上,activemq.xml的broker 標籤下,新增以下內容,然後重啟即可
方式二
還在修改activemq.xml,在broker標籤下,加入以下內容,去掉duplex="true",配置對方的ip地址,若有多個逗號隔開即可“,” 然後重啟
測試
提供者端程式碼,brokerUrl中加入容錯機制,若果第一個沒連上,就連線第一個,預設先連線第一個
failover:(tcp://192.168.1.108:61616,tcp://192.168.1.109:61616)
public void testQueueProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:建立ConnectionFactory,用於連線broker String brokerURL = "failover:(tcp://192.168.1.108:61616,tcp://192.168.1.109:61616)"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設定訊息傳送為同步傳送 //((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true); //設定 ((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:通過工廠,建立Connection connection = connectionFactory.createConnection(); //((ActiveMQConnection) connection).setUseAsyncSend(true); //第三步:連線啟動 connection.start(); //第四步:通過連接獲取session會話 //第一個引數:是否啟用ActiveMQ事務,如果為true,第二個引數無用 //第二個引數:應答模式,AUTO_ACKNOWLEDGE為自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立destination,兩種目的地:Queue、Topic //引數:訊息佇列的名稱,在後臺管理系統中可以看到 Queue queue = session.createQueue("cyb-queue"); //第六步:通過session建立MessageProducer producer = session.createProducer(queue); //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第七步:建立Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("部落格園地址:https://www.cnblogs.com/chenyanbin/"); //第八步:通過producer傳送訊息 producer.send(message1); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } }
消費者端程式碼
public void testQueueConsumer() throws Exception { //第一步:建立ConnectionFactory String brokerURL = "tcp://192.168.1.109:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //第二步:通過工廠,建立Connection Connection connection = connectionFactory.createConnection(); //第三步:開啟連結 connection.start(); //第四步:通過Connection建立session Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); //第五步:通過session建立Consumer Queue queue = session.createQueue("cyb-queue"); MessageConsumer consumer = session.createConsumer(queue); //第六步:通過consumer接收資訊(兩種方式:1、receive方法接收(同步);2、通過監聽器接收(非同步)) //方式1、receive方法接收資訊 Message message = consumer.receive(100000); //第七步:處理資訊 if (message != null && message instanceof TextMessage) { TextMessage tm = (TextMessage) message; System.out.println(tm.getText()); message.acknowledge(); } //方式2:監聽器接收資訊 // consumer.setMessageListener(new MessageListener() { // @Override // public void onMessage(Message message) { // //第七步:處理資訊 // if (message instanceof TextMessage){ // TextMessage tm=(TextMessage)message; // try{ // System.out.println(tm.getText()); // } // catch (Exception e){ // e.printStackTrace(); // } // } // } // }); //session.commit(); //第八步:關閉資源 consumer.close(); session.close(); connection.close(); }
演示
這裡我們可以看到,提供者先連線192.168.1.108這臺機器,消費者去消費192.168.1.109,照樣可以消費成功,監控平臺上,也可以看到響應資訊