ActiveMQ——訊息中介軟體深入學習
訊息中間件概述
訊息中介軟體產生的背景
在客戶端與伺服器進行通訊時.客戶端呼叫後,必須等待服務物件完成處理返回結果才能繼續執行。
客戶與伺服器物件的生命週期緊密耦合,客戶程序和服務物件程序都都必須正常執行;如果由於服務物件崩潰或者網路故障導致使用者的請求不可達,客戶會受到異常
點對點通訊: 客戶的一次呼叫只發送給某個單獨的目標物件。
什麼是訊息中介軟體
面向訊息的中介軟體(MessageOrlented MiddlewareMOM)較好的解決了以上問
題。傳送者將訊息傳送給訊息伺服器,訊息伺服器將消感存放在若千佇列中,在合適
的時候再將訊息轉發給接收者。這種模式下,傳送和接收是非同步的,傳送者無需等
待; 二者的生命週期未必相同: 傳送訊息的時候接收者不一定執行,接收訊息的時候
傳送者也不一定執行;一對多通訊: 對於一個訊息可以有多個接收者。
JMS介紹
什麼是JMS?
JMS是java的訊息服務,JMS的客戶端之間可以通過JMS服務進行非同步的訊息傳輸。
什麼是訊息模型
○ Point-to-Point(P2P) --- 點對點 ○ Publish/Subscribe(Pub/Sub)--- 釋出訂閱 |
即點對點和釋出訂閱模型
P2P (點對點)
P2P
- P2P模式圖
- 涉及到的概念
- 訊息佇列(Queue)
- 傳送者(Sender)
- 接收者(Receiver)
- 每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
- P2P的特點
- 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
- 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
- 接收者在成功接收訊息之後需向佇列應答成功
如果你希望傳送的每個訊息都應該被成功處理的話,那麼你需要P2P模式。
應用場景
A使用者與B使用者傳送訊息
Pub/Sub (釋出與訂閱)
Pub/Sub模式圖
涉及到的概念
主題(Topic)
釋出者(Publisher)
訂閱者(Subscriber)
客戶端將訊息傳送到主題。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。
Pub/Sub的特點
每個訊息可以有多個消費者
釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息,而且為了消費訊息,訂閱者必須保持執行的狀態。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。
如果你希望傳送的訊息可以不被做任何處理、或者被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型
訊息的消費
在JMS中,訊息的產生和訊息是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。
○ 同步
訂閱者或接收者呼叫receive方法來接收訊息,receive方法在能夠接收到訊息之前(或超時之前)將一直阻塞
○ 非同步
訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。
應用場景:
使用者註冊、訂單修改庫存、日誌儲存
畫圖演示
MQ產品的分類
RabbitMQ
是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味著訊息在傳送給客戶端時先在中心佇列排隊。對路由(Routing),負載均衡(Load balance)或者資料持久化都有很好的支援。
Redis
是一個Key-Value的NoSQL資料庫,開發維護很活躍,雖然它是一個Key-Value資料庫儲存系統,但它本身支援MQ功能,所以完全可以當做一個輕量級的佇列服務來使用。對於RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試資料分為128Bytes、512Bytes、1K和10K四個不同大小的資料。實驗表明:入隊時,當資料比較小時Redis的效能要高於RabbitMQ,而如果資料大小超過了10K,Redis則慢的無法忍受;出隊時,無論資料大小,Redis都表現出非常好的效能,而RabbitMQ的出隊效能則遠低於Redis。
|
入隊 |
出隊 |
||||||
|
128B |
512B |
1K |
10K |
128B |
512B |
1K |
10K |
Redis |
16088 |
15961 |
17094 |
25 |
15955 |
20449 |
18098 |
9355 |
RabbitMQ |
10627 |
9916 |
9370 |
2366 |
3219 |
3174 |
2982 |
1588 |
ZeroMQ
號稱最快的訊息佇列系統,尤其針對大吞吐量的需求場景。ZMQ能夠實現RabbitMQ不擅長的高階/複雜的佇列,但是開發人員需要自己組合多種技術框架,技術上的複雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中介軟體的模式,你不需要安裝和執行一個訊息伺服器或中介軟體,因為你的應用程式將扮演了這個服務角色。你只需要簡單的引用ZeroMQ程式庫,可以使用NuGet安裝,然後你就可以愉快的在應用程式之間傳送訊息了。但是ZeroMQ僅提供非永續性的佇列,也就是說如果down機,資料將會丟失。其中,Twitter的Storm中使用ZeroMQ作為資料流的傳輸。
ActiveMQ
是Apache下的一個子專案。 類似於ZeroMQ,它能夠以代理人和點對點的技術實現佇列。同時類似於RabbitMQ,它少量程式碼就可以高效地實現高階應用場景。RabbitMQ、ZeroMQ、ActiveMQ均支援常用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等。
Jafka/Kafka
Kafka是Apache下的一個子專案,是一個高效能跨語言分散式Publish/Subscribe訊息佇列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行訊息持久化;高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;完全的分散式系統,Broker、Producer、Consumer都原生自動支援分散式,自動實現複雜均衡;支援Hadoop資料並行載入,對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行載入機制來統一了線上和離線的訊息處理,這一點也是本課題所研究系統所看重的。Apache Kafka相對於ActiveMQ是一個非常輕量級的訊息系統,除了效能非常好之外,還是一個工作良好的分散式系統。
其他一些佇列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。
ActiveMQ使用
window下 ActiveMQ安裝
ActiveMQ部署其實很簡單,和所有Java一樣,要跑java程式就必須先安裝JDK並配置好環境變數,這個很簡單。
然後解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,得到解壓後的目錄結構如下圖:
進入bin目錄,發現有win32和win64兩個資料夾,這2個資料夾分別對應windows32位和windows64位作業系統的啟動指令碼。
我的實驗環境是windowsXP,就進入win32目錄,會看到如下目錄結構。
其中activemq.bat便是啟動指令碼,雙擊啟動。
ActiveMQ預設啟動到8161埠,啟動完了後在瀏覽器位址列輸入:http://localhost:8161/admin要求輸入使用者名稱密碼,預設使用者名稱密碼為admin、admin,這個使用者名稱密碼是在conf/users.properties中配置的。輸入使用者名稱密碼後便可看到如下圖的ActiveMQ控制檯介面了。
4.1.1控制臺介紹
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數
Messages Enqueued 進入佇列的訊息 進入佇列的總數量,包括出佇列的。 這個數量只增不減
Messages Dequeued 出了佇列的訊息 可以理解為是消費這消費掉的數量
這個要分兩種情況理解
在queues裡它和進入佇列的總數量相等(因為一個訊息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。
在 topics裡 它因為多消費者從而導致數量會比入佇列數高。
簡單的理解上面的意思就是
當有一個訊息進入這個佇列時,等待消費的訊息是1,進入佇列的訊息是1。
當訊息消費後,等待消費的訊息是0,進入佇列的訊息是1,出佇列的訊息是1.
在來一條訊息時,等待消費的訊息是1,進入佇列的訊息就是2.
沒有消費者時 Pending Messages 和 入佇列數量一樣
有消費者消費的時候 Pedding會減少 出佇列會增加
到最後 就是 入佇列和出佇列的數量一樣多
以此類推,進入佇列的訊息和出佇列的訊息是池子,等待消費的訊息是水流。
實現點對點通訊模式
使用ActiveMQ完成點對點(p2p)通訊模式
引入pom檔案依賴
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
</dependencies>
生產者
public class Producter {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :連線工廠,JMS 用它建立連線
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客戶端到JMS Provider 的連線
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
Session session = connection.createSession(Boolean.falst, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息傳送給誰.
// 獲取session注意引數值my-queue是Query的名字
Destination destination = session.createQueue("my-queue");
// MessageProducer:訊息生產者
MessageProducer producer = session.createProducer(destination);
// 設定不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 傳送一條訊息
for (int i = 1; i <= 5; i++) {
sendMsg(session, producer, i);
}
connection.close();
}
/**
* 在指定的會話上,通過指定的訊息生產者發出一條訊息
*
* @param session
* 訊息會話
* @param producer
* 訊息生產者
*/
public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
// 建立一條文字訊息
TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
// 通過訊息生產者發出訊息
producer.send(message);
}
}
消費者
public class JmsReceiver {
public static void main(String[] args) throws JMSException {
// ConnectionFactory :連線工廠,JMS 用它建立連線
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
// JMS 客戶端到JMS Provider 的連線
Connection connection = connectionFactory.createConnection();
connection.start();
// Session: 一個傳送或接收訊息的執行緒
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// Destination :訊息的目的地;訊息傳送給誰.
// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置
Destination destination = session.createQueue("my-queue");
// 消費者,訊息接收者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.println("收到訊息:" + message.getText());
} else
break;
}
session.close();
connection.close();
}
}
JMS訊息可靠機制
ActiveMQ訊息簽收機制:
客戶端成功接收一條訊息的標誌是一條訊息被簽收,成功應答。
訊息的簽收情形分兩種:
1、帶事務的session
如果session帶有事務,並且事務成功提交,則訊息被自動簽收。如果事務回滾,則訊息會被再次傳送。
2、不帶事務的session
不帶事務的session的簽收方式,取決於session的配置。
Activemq支援以下三種模式:
Session.AUTO_ACKNOWLEDGE 訊息自動簽收
Session.CLIENT_ACKNOWLEDGE 客戶端呼叫acknowledge方法手動簽收
textMessage.acknowledge();//手動簽收
Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,訊息可能會重複傳送。在第二次重新傳送訊息的時候,訊息
只有在被確認之後,才認為已經被成功地消費了。訊息的成功消費通常包含三個階段:客戶接收訊息、客戶處理訊息和訊息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,訊息何時被確認取決於建立會話時的應答模式(acknowledgement mode)。該引數有以下三個可選值:
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數
Messages Enqueued 進入佇列的訊息 進入佇列的總數量,包括出佇列的。 這個數量只增不減
Messages Dequeued 出了佇列的訊息 可以理解為是消費這消費掉的數量
場景1
生產者不開啟session,客戶端必須有手動簽收模式
Session session = createConnection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
消費者不開啟session,客戶端必須有手動簽收模式
TextMessage textMessage = (TextMessage) createConsumer.receive();
//接受訊息
textMessage.acknowledge();
場景2
生產者不開啟session,客戶端自動簽收模式
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
消費者不開啟session,自動簽收訊息
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
場景4
事物訊息 生產者以事物形式,必須要將訊息提交事物,才可以提交到佇列中。
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();
消費者
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();
ActiveMQ 持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
釋出訂閱
生產者:
public class TOPSend {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "my-topic";
public static void main(String[] args) throws JMSException {
start();
}
static public void start() throws JMSException {
System.out.println("生產者已經啟動....");
// 建立ActiveMQConnectionFactory 會話工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
// 啟動JMS 連線
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
send(producer, session);
System.out.println("傳送成功!");
connection.close();
}
static public void send(MessageProducer producer, Session session) throws JMSException {
for (int i = 1; i <= 5; i++) {
System.out.println("我是訊息" + i);
TextMessage textMessage = session.createTextMessage("我是訊息" + i);
Destination destination = session.createTopic(TOPIC);
producer.send(destination, textMessage);
}
}
}
消費者:
public class TopReceiver {
private static String BROKERURL = "tcp://127.0.0.1:61616";
private static String TOPIC = "my-topic";
public static void main(String[] args) throws JMSException {
start();
}
static public void start() throws JMSException {
System.out.println("消費點啟動...");
// 建立ActiveMQConnectionFactory 會話工廠
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);
Connection connection = activeMQConnectionFactory.createConnection();
// 啟動JMS 連線
connection.start();
// 不開訊息啟事物,訊息主要傳送消費者,則表示訊息已經簽收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立一個佇列
Topic topic = session.createTopic(TOPIC);
MessageConsumer consumer = session.createConsumer(topic);
// consumer.setMessageListener(new MsgListener());
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();
if (textMessage != null) {
System.out.println("接受到訊息:" + textMessage.getText());
// textMessage.acknowledge();// 手動簽收
// session.commit();
} else {
break;
}
}
connection.close();
}
}
SpringBoot整合ActiveMQ
生產者:
引入 maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- spring boot web支援:mvc,aop... -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
引入 application.yml配置
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8080
建立QueueConfig
@Configuration
public class QueueConfig {
@Value("${queue}")
private String queue;
@Bean
public Queue logQueue() {
return new ActiveMQQueue(queue);
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDeliveryMode(2);// 進行持久化配置 1表示非持久化,2表示持久化</span>
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
jmsTemplate.setDefaultDestination(queue); // 此處可不設定預設,在傳送訊息時也可設定佇列
jmsTemplate.setSessionAcknowledgeMode(4);// 客戶端簽收模式</span>
return jmsTemplate;
}
// 定義一個訊息監聽器連線工廠,這裡定義的是點對點模式的監聽器連線工廠
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
ActiveMQConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
// 設定連線數
factory.setConcurrency("1-10");
// 重連間隔時間
factory.setRecoveryInterval(1000L);
factory.setSessionAcknowledgeMode(4);
return factory;
}
}
建立Producer
@Component
@EnableScheduling
public class Producer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Scheduled(fixedDelay = 5000)
public void send() {
jmsMessagingTemplate.convertAndSend(queue, "測試訊息佇列" + System.currentTimeMillis());
}
}
啟動
@SpringBootApplication
@EnableScheduling
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
消費者:
引入 maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- spring boot web支援:mvc,aop... -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
引入 YML配置
application.yml
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
queue: springboot-queue
server:
port: 8081
建立Consumer
@JmsListener(destination = "${queue}")
public void receive(TextMessage text, Session session) throws JMSException {
try {
System.out.println("生產者第" + (++count) + "次向消費者傳送訊息..");
// int id = 1 / 0;
String value = text.getText();
System.out.println("消費者收到訊息:" + value);
//手動簽收
text.acknowledge();
} catch (Exception e) {
// 如果程式碼發生異常,需要釋出版本才可以解決的問題,不要使用重試機制,採用日誌記錄方式,定時Job進行補償。
// 如果不需要釋出版本解決的問題,可以採用重試機制進行補償。
// session.recover();// 繼續重試
e.printStackTrace();
}
}
public static void main(String[] args) {
SpringApplication.run(Consumer.class, args);
}
啟動
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
使用訊息中間注意事項
- 消費者程式碼不要丟擲異常,否則activqmq預設有重試機制。
- 如果程式碼發生異常,需要釋出版本才可以解決的問題,不要使用重試機制,採用日誌記錄方式,定時Job進行補償。
- 如果不需要釋出版本解決的問題,可以採用重試機制進行補償。
消費者如果保證訊息冪等性,不被重複消費。
產生原因:網路延遲傳輸中,會造成進行MQ重試中,在重試過程中,可能會造成重複消費。
解決辦法:
1.使用全域性MessageID 判斷消費方使用同一個,解決冪等性。
2.使用JMS可靠訊息機制