JAVA訊息佇列之ActiveMQ入門
訊息中介軟體
前言
本文只適合初級入門,闡述了訊息中介軟體的基礎概念,和部分實踐,理論性比較強。
為什麼要使用訊息中介軟體
有一家專業做保險的大公司,他有一套保險系統,記錄著保險銷售的所有資訊,他在全國有38家分公司,每一家分公司都有自己的保險系統,總公司與分公司之間有著保單資料共享的關係,總公司每銷售一份保險都要向分公司提供該保單的基本資訊,而分公司每銷售一份保險也需要向總公司和其他分公司提供該保單的基本資訊。那麼總公司與分公司們的系統是如何實現資料共享的呢?
假如有一箇中介,總公司和分公司都對他進行了訂閱,總公司賣出一份保險後,交給中介一份保單的基本資訊,然後中介向他的訂閱者們下發這份保單資訊。分公司賣出保險也同樣交給中介一份保單資訊,讓中介來下發。
那麼訊息中介軟體中的訂閱模式就是這個中介的實現。
什麼是中介軟體
非底層作業系統軟體,非業務應用軟體,不是直接給終端使用者使用的,不能直接給客戶帶來價值的軟體統稱為中介軟體。
什麼是訊息中介軟體
關注於資料的傳送和接受,利用高效可靠的非同步訊息傳遞機制整合分散式系統。
什麼是JSM
Java訊息服務,及JMS是一個java平臺中關於面向訊息中介軟體的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。
什麼是AMQP
AMQP是一個提供統一訊息服務的應用層標準協議,基於此協議的客戶端與訊息中介軟體可傳遞訊息,並不受客戶/中介軟體不同產品,不同開發語言等條件限制。
ActiveMQ
是Apache出品,能力強勁的開源訊息匯流排。
RabbitMQ
是一個開源的AMQP試下,服務端用Erlang語言編寫,用於分散式系統中儲存轉發訊息,在易用性、擴充套件性。高可用性表現不俗。
高可用HA(High Availability)是分散式系統架構設計中必須考慮的因素之一,它通常是指,通過設計減少系統不能提供服務的時間
JMS規範
相關概念
提供者:實現JMS規範的訊息中介軟體伺服器
客戶端:傳送或接受訊息的應用程式
生產者/釋出者:建立併發送訊息的客戶端
消費者/訂閱者:接受並處理訊息的客戶端
訊息:在應用程式之間傳遞的資料內容
訊息模式:在客戶端之間傳遞訊息的方式,JMS中定義了主題和佇列兩種模式。
訊息模式
佇列模式
客戶端包括生產者和消費者
佇列中的訊息只能被一個消費者消費
消費者可以隨時消費佇列中的訊息
訂閱模式
客戶端包括髮布者和訂閱者
主題中的訊息被所有訂閱者消費
消費者不能消費訂閱之前就傳送到主題中的訊息
JMS編碼介面
ConnectionFactory用於建立連線到訊息中介軟體的連線工廠
Connection代表了應用程式和訊息伺服器之間的通訊鏈路
Destination 指訊息釋出接收的地點,包括佇列或主題
Session表示一個單執行緒的上下文,用於傳送和接收訊息
MessageConsumer 由會話建立,用於接收發送到目標的訊息
MessageProducer 由會話建立,用於傳送訊息到目標
Message 是在消費者和生產者之間傳送的物件,訊息頭,一組訊息屬性,一個訊息體
JMS編碼介面之間的關係
預設埠:8161是後臺管理系統,61616是給java用的tcp埠
佇列模式程式碼演示
生產者:
public class Producer {public static final String URL = "tcp://localhost:61616"; public static final String queueName = "queue-test"; public static void main(String args[]) throws JMSException {//1.建立ConnectionFactoryActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);//2.建立ConnectionConnection connection = connectionFactory.createConnection();//3.啟動連線connection.start();//4.創建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.建立一個目標Destination destination = session.createQueue(queueName);//6.建立一個生產者MessageProducer producer = session.createProducer(destination); for (int i = 0 ;i < 10; i++){//7.建立訊息TextMessage message = session.createTextMessage("hello point:"+i);//8.傳送訊息producer.send(message);System.out.println("yes"+i);}//9.關閉連線connection.close();}}
消費者
public class Consumer {public static void main(String args[]) throws JMSException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Producer.URL);Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();Queue destination = session.createQueue(Producer.queueName);MessageConsumer consumer = session.createConsumer(destination);//建立監聽器consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) { TextMessage textMessage = (TextMessage)message; try { System.out.println(textMessage.getText());} catch (JMSException e) { e.printStackTrace();} } });}}
訂閱模式程式碼演示
只需在建立目標的使用,建立訂閱模式就行。 session.createTopic(Producer.topicName);
createSession(paramA,paramB)方法詳解
paramA為boolean型別,標識是否支援事務
paramB為int型別,可以設定acknowledgment mode (稱之為建立會話時的應答模式)取值有:
1) Session.AUTO_ACKNOWLEDGE
a. 為自動確認,客戶端傳送和接收訊息不需要做額外的工作。
2) Session.CLIENT_ACKNOWLEDGE
a. 為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會刪除訊息。
3) Session.DUPS_OK_ACKNOWLEDGE
a. 允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。在需要考慮資源使用時,這種模式非常有效。
4) Session.SESSION_TRANSACTED
paramA設定為false時代表不支援事務:paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
paramA設定為true時:paramB的值忽略, acknowledgment mode被jms伺服器設定為SESSION_TRANSACTED 。
使用Spring整合JMS連線ActiveMQ
Spring提供了一些JSM封裝的介面:
ConnectionFactory用於管理連線的連線工廠,Spring提供的
JmsTemplate 用於傳送和接受訊息的模板類
Messagelistener 訊息監聽器
ConnectionFactory
ConnectionFactory是Spring為我們提供的連線池
JmsTemplate每次傳送訊息都會建立連線,會話和producter,因為很耗效能,所以spring推出了連線池
Spring提供了兩種連線池,SingleConnectionFactory和CachingConnectionFactory
SingleConnectionFactory:對於建立的JMS請求只會返回同一個連線,並且會忽略Connection的close方法呼叫。
CachingConnectionFactory:繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了快取功能,它可以快取Session、MessageProducer和MessageConsumer
JmsTemplate
是Spring提供的,只需要向Spring容器內註冊這個類就可以使用JMSTemplate方便的操作jms
是執行緒安全的,可以在整個應用範圍內使用,可以建立多個JmsTemplate
MessageListerner
實現一個onMessage方法,該方法只接受一個Message引數。
訂閱模式程式碼演示:
1.新建spring配置檔案common.xml,配置公共bean
在這個配置檔案中,配置公共的bean,ActiveMQ連線工廠,spring提供的jms連線池,目的地等。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config/><!--建立ActiveMQ為我們提供的連線工廠--><bean id = "targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616"></property> </bean><!--建立spring提供的JMS連線池--><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"></property> </bean><!--建立目的地,佇列模式--><bean id = "queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue-test-spring"></constructor-arg> </bean><!--建立目的地,訂閱模式--><bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic-text-spring"/> </bean></beans>
2. 新建spring配置檔案,produce.xml,配置釋出者
在這個配置檔案中,引入公共的配置檔案common.xml,配置jms模板
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <import resource="common.xml"></import> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> </bean> <bean class="com.jsm.springdemo.producer.ProducerServiceImpl"></bean></beans>
3. 建立ProducerService介面
提供一個傳送資訊的方法
public interface ProducerService {public void sendMessage(String message);}
4.建立ProducerService實現類
在這個實現類中,我們需要注入JmsTemplate,用來發送資訊。注入定義好的訂閱模式的目的地和實現ProducerService介面。
public class ProducerServiceImpl implements ProducerService{@AutowiredJmsTemplate jmsTemplate;@Resource(name = "topicDestination") Destination destination;@Overridepublic void sendMessage(final String message) {jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); return textMessage;} });System.out.println("Success:"+message);}}
5. 建立AppProducer類
啟動Producer
public class AppProducer {public static void main(String agrs[]){ ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("producer.xml");ProducerService producerService = (ProducerService) applicationContext.getBean(ProducerService.class); for (int i = 0; i < 100; i++) { producerService.sendMessage("訊息編號:"+i);} applicationContext.close();}}
6. 建立消費者ConsumerListener監聽器
此類用來接受釋出者傳送的資訊,需要實現MessageListener介面
public class ConsumerListener implements MessageListener{@Overridepublic void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText());} catch (JMSException e) { e.printStackTrace();} }}
7. 建立consumer.xml配置檔案
在這個配置檔案中,引入公共的配置檔案common.xml,配置監聽容器,注入連線池,目的地,自定義的監聽器等。
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <import resource="common.xml"/> <bean id="consumerListener" class="com.jsm.springdemo.consumer.ConsumerListener"> </bean> <bean id = "jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="topicDestination"/> <property name="messageListener" ref="consumerListener"/> </bean></beans>
8. 新建AppConsumer類
我們在spring容器中配置了監聽容器,因此專案啟動的時候,監聽器也隨之啟動
public class AppConcumer {public static void main(String args[]){ ApplicationContext applicationContext = new ClassPathXmlApplicationContext("consumer.xml");}}
佇列模式程式碼演示
佇列模式,在common.xml配置檔案中配置佇列目的地,在釋出者ProducerService實現類中,將queueDestination注入到Destination中。
在消費者配置配件consumer.xml的監聽容器中,將queueDestination注入到destination
Common.xml:
<!--建立目的地,佇列模式--><bean id = "queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue-test-spring"></constructor-arg> </bean>
ProducerServiceImpl :
public class ProducerServiceImpl implements ProducerService{@AutowiredJmsTemplate jmsTemplate;@Resource(name = "queueDestination") Destination destination;@Overridepublic void sendMessage(final String message) {jmsTemplate.send(destination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); return textMessage;} });System.out.println("Success:"+message);}}
consumer.xml:
<bean id = "jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="consumerListener"/></bean>
ActiveMQ叢集配置
為什麼要對訊息中介軟體叢集?
實現高可用,以排除單點故障引起的服務終端
實現負載均衡,以提高效率為更多客戶提供服務
ActiveMQ叢集基礎知識
叢集方式
客戶端叢集:讓多個消費者消費同一個佇列
Broker Cluster:多個Broker之間同步訊息
Master Slave:實現高可用,當主伺服器宕機時,備用伺服器會立即補充
客戶端配置
ActiveMQ失效轉移(failover)
允許當其中一臺訊息伺服器宕機時,客戶端在傳輸層上重新連線到其它訊息伺服器
語法:failover:(uri1,...,uriN)?transportOptions
transportOptions引數說明
Randomize預設true,標識URI列表中選擇URI連線時是否採用隨機策略
initialReconnectDelay 預設10,單位毫秒,表示第一次嘗試重新連線之間等待的時間
maxReconnectDelay 預設30000,單位毫秒,最長重連的時間間隔
Broker Cluster叢集配置
當我們有兩個節點:A、B,節點A可以把訊息同步到節點B,節點B也可以把訊息同步到節點A,通過這個機制,節點A的訊息可以被B消費,節點B的訊息也可以被A消費。
實現方式是採用NetworkConnector(網路聯結器)網路聯結器實現的。
NetworkConnector(網路聯結器)主要用於配置ActiveMQ伺服器與伺服器之間的網路通訊方式,用於伺服器透傳訊息。
網路聯結器分為靜態聯結器和動態聯結器
靜態聯結器,是指在伺服器的IP地址上面去指定具體的IP地址
<networkConnectors>
<networkConnector name="local_network" uri="static:(tcp:/127.0.0.1:6167,tcp://127.0.0.1:61618" />
</networkConnectors>
如果伺服器比較多,並且需要通過動態擴充套件,這時候可以使用動態
動態聯結器,他是使用多播的方式通知其他伺服器
定義一個網路聯結器和傳輸聯結器,傳輸聯結器會給出一個發現的uri地址,即組播地址,就可以達到動態效果了
Master/Slave叢集配置
Share nothing storage master/slave(過時,5.8移除)
Shared storage master/slave 共享儲存
節點獲取的訊息儲存排他鎖,就可以成為master,而沒有獲取資源鎖的節點稱之為slave,當master待機的時候回釋放資源鎖,而獲取資源鎖的slave會成為新的master。
Replicated LevelDB Store 基於複製的LevelDB Store
共享儲存叢集的原理
有節點A、B兩臺伺服器,有一個共享的儲存地址,稱之為持久化,它可以是資料庫或者san檔案系統,把節點A、B的持久化配置到同一個地方之後,先啟動節點A,這個時候節點A就獲取了資源排他鎖稱為master,在啟動節點B,B獲取不到排他鎖所以稱為slave,稱為master的伺服器有對外開放的能力,外部的客戶端可以提交資訊給A,不能給B
如果A掛掉了,B會立即過去資源的排他鎖,成為Master,而外部客戶端就可以提交資訊給B。
基於LevelDB Store的原理
基於Zookeeper,有三個節點A、B、C,他們都有自己的儲存方式,三個節點配置同一個Zookeeper節點,ZooKeeper選舉一臺成為Master,比如A,此時A具有服務能力,A獲取訊息後本地儲存,然後通過ZooKeeper同步到B、C,B、C分別對訊息儲存。
如果節點A故障,ZooKeeper會立即從新選舉一臺作為Master
兩種叢集方式對比
Master/Slave可以做到高可用,但不能做到負載均衡,因為Slave不具備服務能力,而Broker/Cluster不具備高可用,當這臺伺服器掛掉後,它處理的資料會同步丟失,能做負載均衡
三臺伺服器的完美叢集方案
ActiveMQ叢集實戰
因為是在一臺電腦上實踐,通過修改埠來模擬不同主機。因為A節點是做Broker,所以不需要單獨配置儲存,B、C是做master/slave,所以配置共享資料夾就可以了
將下載的activemq檔案解壓,複製三份,分別起名:activemq_a/activemq_b/activemq_c
配置A節點
1. 找到activemq.xml配置檔案,由於A預設使用61616,所以不需要修改,其他的埠我們不需要,註釋就可以了
2. 配置網路聯結器:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<!--
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
-->
</transportConnectors>
<!-- 配置網路聯結器 -->
<networkConnectors>
<n