1. 程式人生 > >ActiveMQ——訊息中介軟體深入學習

ActiveMQ——訊息中介軟體深入學習

 

訊息中間件概述

訊息中介軟體產生的背景

在客戶端與伺服器進行通訊時.客戶端呼叫,必須等待服務物件完成處理返回結果才能繼續執行。

 客戶與伺服器物件的生命週期緊密耦合,客戶程序和服務物件程序都都必須正常執行;如果由於服務物件崩潰或者網路故障導致使用者的請求不可達,客戶受到異常

點對點通訊: 客戶的一次呼叫只發送給某個單獨的目標物件。

 

什麼訊息中介軟體


面向訊息的中介軟體(MessageOrlented MiddlewareMOM)較好的解決了以上問
題。傳送者將訊息傳送給訊息伺服器,訊息伺服器將消感存放在若千佇列中,在合適
的時候再將訊息轉發給接收者。

這種模式下,傳送和接收是非同步的,傳送者無需等
待; 二者的生命週期未必相同: 傳送訊息的時候接收者不一定執行,接收訊息的時候
傳送者也不一定執行;一對多通訊: 對於一個訊息可以有多個接收者。

JMS介紹

什麼JMS?

JMS是java的訊息服務,JMS的客戶端之間可以通過JMS服務進行非同步的訊息傳輸。

什麼訊息模型

○ Point-to-Point(P2P) --- 對點

○ Publish/Subscribe(Pub/Sub)---  釋出訂閱

即點對點和釋出訂閱模型

 

 P2P (對點)

P2P

  1. P2P模式圖 
  2. 涉及到的概念 
    1. 訊息佇列(Queue)
    2. 傳送者(Sender)
    3. 接收者(Receiver)
    4. 每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
  3. P2P的特點
    1. 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
    2. 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
    3. 接收者在成功接收訊息之後需向佇列應答成功

如果你希望傳送的每個訊息都應該被成功處理的話,那麼你需要P2P模式。

應用場景

A使用者與B使用者傳送訊息

 

Pub/Sub (釋出與訂閱)

Pub/Sub模式圖 

涉及到的概念 

主題(Topic)

釋出者(Publisher)

訂閱者(Subscriber) 
客戶端將訊息傳送到主題。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。

Pub/Sub的特點

每個訊息可以有多個消費者

釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息,而且為了消費訊息,訂閱者必須保持執行的狀態。

為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。

如果你希望傳送的訊息可以不被做任何處理、或者被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型

訊息的消費 
在JMS中,訊息的產生和訊息是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。 
○ 同步 
訂閱者或接收者呼叫receive方法來接收訊息,receive方法在能夠接收到訊息之前(或超時之前)將一直阻塞 
○ 非同步 
訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。

  應用場景:

   使用者註冊、訂單修改庫存、日誌儲存

   畫圖演示

 

MQ產品的分類

 

RabbitMQ

是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味著訊息在傳送給客戶端時先在中心佇列排隊。對路由(Routing),負載均衡(Load balance)或者資料持久化都有很好的支援。

Redis

是一個Key-Value的NoSQL資料庫,開發維護很活躍,雖然它是一個Key-Value資料庫儲存系統,但它本身支援MQ功能,所以完全可以當做一個輕量級的佇列服務來使用。對於RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試資料分為128Bytes、512Bytes、1K和10K四個不同大小的資料。實驗表明:入隊時,當資料比較小時Redis的效能要高於RabbitMQ,而如果資料大小超過了10K,Redis則慢的無法忍受;出隊時,無論資料大小,Redis都表現出非常好的效能,而RabbitMQ的出隊效能則遠低於Redis。

 

入隊

出隊

 

128B

512B

1K

10K

128B

512B

1K

10K

Redis

16088

15961

17094

25

15955

20449

18098

9355

RabbitMQ

10627

9916

9370

2366

3219

3174

2982

1588

ZeroMQ

號稱最快的訊息佇列系統,尤其針對大吞吐量的需求場景。ZMQ能夠實現RabbitMQ不擅長的高階/複雜的佇列,但是開發人員需要自己組合多種技術框架,技術上的複雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中介軟體的模式,你不需要安裝和執行一個訊息伺服器或中介軟體,因為你的應用程式將扮演了這個服務角色。你只需要簡單的引用ZeroMQ程式庫,可以使用NuGet安裝,然後你就可以愉快的在應用程式之間傳送訊息了。但是ZeroMQ僅提供非永續性的佇列,也就是說如果down機,資料將會丟失。其中,Twitter的Storm中使用ZeroMQ作為資料流的傳輸。

ActiveMQ

是Apache下的一個子專案。 類似於ZeroMQ,它能夠以代理人和點對點的技術實現佇列。同時類似於RabbitMQ,它少量程式碼就可以高效地實現高階應用場景。RabbitMQ、ZeroMQ、ActiveMQ均支援常用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等。

Jafka/Kafka

Kafka是Apache下的一個子專案,是一個高效能跨語言分散式Publish/Subscribe訊息佇列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行訊息持久化;高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;完全的分散式系統,Broker、Producer、Consumer都原生自動支援分散式,自動實現複雜均衡;支援Hadoop資料並行載入,對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行載入機制來統一了線上和離線的訊息處理,這一點也是本課題所研究系統所看重的。Apache Kafka相對於ActiveMQ是一個非常輕量級的訊息系統,除了效能非常好之外,還是一個工作良好的分散式系統。

其他一些佇列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。

 

ActiveMQ使用

window下 ActiveMQ安裝

ActiveMQ部署其實很簡單,和所有Java一樣,要跑java程式就必須先安裝JDK並配置好環境變數,這個很簡單。

然後解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,得到解壓後的目錄結構如下圖:

進入bin目錄,發現有win32和win64兩個資料夾,這2個資料夾分別對應windows32位和windows64位作業系統的啟動指令碼。

我的實驗環境是windowsXP,就進入win32目錄,會看到如下目錄結構。

其中activemq.bat便是啟動指令碼,雙擊啟動。

ActiveMQ預設啟動到8161埠,啟動完了後在瀏覽器位址列輸入:http://localhost:8161/admin要求輸入使用者名稱密碼,預設使用者名稱密碼為admin、admin,這個使用者名稱密碼是在conf/users.properties中配置的。輸入使用者名稱密碼後便可看到如下圖的ActiveMQ控制檯介面了。

4.1.1控制臺介紹

Number Of Consumers  消費者 這個是消費者端的消費者數量 
Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數 
Messages Enqueued 進入佇列的訊息  進入佇列的總數量,包括出佇列的。 這個數量只增不減 
Messages Dequeued 出了佇列的訊息  可以理解為是消費這消費掉的數量 
這個要分兩種情況理解 
在queues裡它和進入佇列的總數量相等(因為一個訊息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。 
在 topics裡 它因為多消費者從而導致數量會比入佇列數高。 
簡單的理解上面的意思就是 
當有一個訊息進入這個佇列時,等待消費的訊息是1,進入佇列的訊息是1。 
當訊息消費後,等待消費的訊息是0,進入佇列的訊息是1,出佇列的訊息是1. 
在來一條訊息時,等待消費的訊息是1,進入佇列的訊息就是2. 


沒有消費者時  Pending Messages   和 入佇列數量一樣 
有消費者消費的時候 Pedding會減少 出佇列會增加 
到最後 就是 入佇列和出佇列的數量一樣多 
以此類推,進入佇列的訊息和出佇列的訊息是池子,等待消費的訊息是水流。 

實現點對點通訊模式

 使用ActiveMQ完成點對點(p2p通訊模式

引入pom檔案依賴

  <dependencies>

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-core</artifactId>

<version>5.7.0</version>

</dependency>

</dependencies>

 

生產者

public class Producter {

public static void main(String[] args) throws JMSException {

// ConnectionFactory :連線工廠,JMS 用它建立連線

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,

ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");

// JMS 客戶端到JMS Provider 的連線

Connection connection = connectionFactory.createConnection();

connection.start();

// Session: 一個傳送或接收訊息的執行緒

Session session = connection.createSession(Boolean.falst, Session.AUTO_ACKNOWLEDGE);

// Destination :訊息的目的地;訊息傳送給誰.

// 獲取session注意引數值my-queue是Query的名字

Destination destination = session.createQueue("my-queue");

// MessageProducer:訊息生產者

MessageProducer producer = session.createProducer(destination);

// 設定不持久化

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

// 傳送一條訊息

for (int i = 1; i <= 5; i++) {

sendMsg(session, producer, i);

}

connection.close();

}

/**

 * 在指定的會話上,通過指定的訊息生產者發出一條訊息

 *

 * @param session

 *            訊息會話

 * @param producer

 *            訊息生產者

 */

public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {

// 建立一條文字訊息

TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);

// 通過訊息生產者發出訊息

producer.send(message);

}

}

消費者

 

public class JmsReceiver {

public static void main(String[] args) throws JMSException {

// ConnectionFactory :連線工廠,JMS 用它建立連線

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,

ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");

// JMS 客戶端到JMS Provider 的連線

Connection connection = connectionFactory.createConnection();

connection.start();

// Session: 一個傳送或接收訊息的執行緒

Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

// Destination :訊息的目的地;訊息傳送給誰.

// 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置

Destination destination = session.createQueue("my-queue");

// 消費者,訊息接收者

MessageConsumer consumer = session.createConsumer(destination);

while (true) {

TextMessage message = (TextMessage) consumer.receive();

if (null != message) {

System.out.println("收到訊息:" + message.getText());

} else

break;

}

session.close();

connection.close();

}

}

 

 

JMS訊息可靠機制

ActiveMQ訊息簽收機制:

客戶端成功接收一條訊息的標誌是一條訊息被簽收,成功應答。

訊息的簽收情形分兩種:

1、帶事務的session

   如果session帶有事務,並且事務成功提交,則訊息被自動簽收。如果事務回滾,則訊息會被再次傳送。

2、不帶事務的session

   不帶事務的session的簽收方式,取決於session的配置。

   Activemq支援以下三種模式:

   Session.AUTO_ACKNOWLEDGE  訊息自動簽收

 

   Session.CLIENT_ACKNOWLEDGE  客戶端呼叫acknowledge方法手動簽收

textMessage.acknowledge();//手動簽收

   Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,訊息可能會重複傳送。在第二次重新傳送訊息的時候,訊息

只有在被確認之後,才認為已經被成功地消費了。訊息的成功消費通常包含三個階段:客戶接收訊息、客戶處理訊息和訊息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,訊息何時被確認取決於建立會話時的應答模式(acknowledgement mode)。該引數有以下三個可選值:

 

Number Of Consumers  消費者 這個是消費者端的消費者數量 

Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數 
Messages Enqueued 進入佇列的訊息  進入佇列的總數量,包括出佇列的。 這個數量只增不減 
Messages Dequeued 出了佇列的訊息  可以理解為是消費這消費掉的數量 

 

場景1  

生產者不開啟session,客戶端必須有手動簽收模式

Session session = createConnection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

消費不開啟session,客戶端必須有手動簽收模式

TextMessage textMessage = (TextMessage) createConsumer.receive();

//接受訊息

textMessage.acknowledge();

場景2

生產者不開啟session,客戶端自動簽收模式

Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

消費不開啟session,自動簽收訊息

Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

場景4

事物訊息 生產事物形式,必須要訊息提交事物,才可以提交到佇列中

Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

session.commit();

消費

Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

session.commit();

ActiveMQ 持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

釋出訂閱 

生產者:

 

public class TOPSend {



private static String BROKERURL = "tcp://127.0.0.1:61616";

private static String TOPIC = "my-topic";



public static void main(String[] args) throws JMSException {

start();

}



static public void start() throws JMSException {

System.out.println("生產者已經啟動....");

// 建立ActiveMQConnectionFactory 會話工廠

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(

ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);

Connection connection = activeMQConnectionFactory.createConnection();

// 啟動JMS 連線

connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

MessageProducer producer = session.createProducer(null);

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

send(producer, session);

System.out.println("傳送成功!");

connection.close();

}



static public void send(MessageProducer producer, Session session) throws JMSException {

for (int i = 1; i <= 5; i++) {

System.out.println("我是訊息" + i);

TextMessage textMessage = session.createTextMessage("我是訊息" + i);

Destination destination = session.createTopic(TOPIC);

producer.send(destination, textMessage);

}

}



}

 

消費者:

 

public class TopReceiver {

private static String BROKERURL = "tcp://127.0.0.1:61616";

private static String TOPIC = "my-topic";



public static void main(String[] args) throws JMSException {

start();

}



static public void start() throws JMSException {

System.out.println("消費點啟動...");

// 建立ActiveMQConnectionFactory 會話工廠

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(

ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKERURL);

Connection connection = activeMQConnectionFactory.createConnection();

// 啟動JMS 連線

connection.start();

// 不開訊息啟事物,訊息主要傳送消費者,則表示訊息已經簽收

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 建立一個佇列

Topic topic = session.createTopic(TOPIC);

MessageConsumer consumer = session.createConsumer(topic);

// consumer.setMessageListener(new MsgListener());

while (true) {

TextMessage textMessage = (TextMessage) consumer.receive();

if (textMessage != null) {

System.out.println("接受到訊息:" + textMessage.getText());

// textMessage.acknowledge();// 手動簽收

// session.commit();

} else {

break;

}

}

connection.close();

}



}

 

SpringBoot整合ActiveMQ

生產者:

引入 maven依賴

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.5.4.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter</artifactId>

</dependency>

<!-- spring boot web支援:mvc,aop... -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

 

引入 application.yml配置

spring:

  activemq:

    broker-url: tcp://127.0.0.1:61616

    user: admin

    password: admin

queue: springboot-queue

server:

  port: 8080

 

建立QueueConfig

@Configuration

public class QueueConfig {

@Value("${queue}")

private String queue;



@Bean

public Queue logQueue() {

return new ActiveMQQueue(queue);

}



@Bean

public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {

JmsTemplate jmsTemplate = new JmsTemplate();

jmsTemplate.setDeliveryMode(2);// 進行持久化配置 1表示非持久化,2表示持久化</span>

jmsTemplate.setConnectionFactory(activeMQConnectionFactory);

jmsTemplate.setDefaultDestination(queue); // 此處可不設定預設,在傳送訊息時也可設定佇列

jmsTemplate.setSessionAcknowledgeMode(4);// 客戶端簽收模式</span>

return jmsTemplate;

}



// 定義一個訊息監聽器連線工廠,這裡定義的是點對點模式的監聽器連線工廠

@Bean(name = "jmsQueueListener")

public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(

ActiveMQConnectionFactory activeMQConnectionFactory) {

DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

factory.setConnectionFactory(activeMQConnectionFactory);

// 設定連線數

factory.setConcurrency("1-10");

// 重連間隔時間

factory.setRecoveryInterval(1000L);

factory.setSessionAcknowledgeMode(4);

return factory;

}



}

 

建立Producer

@Component

@EnableScheduling

public class Producer {

@Autowired

private JmsMessagingTemplate jmsMessagingTemplate;

@Autowired

private Queue queue;



@Scheduled(fixedDelay = 5000)

public void send() {

jmsMessagingTemplate.convertAndSend(queue, "測試訊息佇列" + System.currentTimeMillis());

}

}

 

啟動

@SpringBootApplication

@EnableScheduling

public class App {

public static void main(String[] args) {

SpringApplication.run(App.class, args);

}

}

 

 

 

消費:

引入 maven依賴

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>1.5.4.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter</artifactId>

</dependency>

<!-- spring boot web支援:mvc,aop... -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-activemq</artifactId>

</dependency>

</dependencies>

<build>

<plugins>

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

</plugin>

</plugins>

</build>

 

引入 YML配置

application.yml

spring:

  activemq:

    broker-url: tcp://127.0.0.1:61616

    user: admin

    password: admin

queue: springboot-queue

server:

  port: 8081

 

建立Consumer

@JmsListener(destination = "${queue}")

public void receive(TextMessage text, Session session) throws JMSException {

try {

System.out.println("生產者第" + (++count) + "次向消費者傳送訊息..");

// int id = 1 / 0;

String value = text.getText();

System.out.println("消費者收到訊息:" + value);

//手動簽收

text.acknowledge();

} catch (Exception e) {

// 如果程式碼發生異常,需要釋出版本才可以解決的問題,不要使用重試機制,採用日誌記錄方式,定時Job進行補償。

// 如果不需要釋出版本解決的問題,可以採用重試機制進行補償。

// session.recover();// 繼續重試

e.printStackTrace();

}

}



public static void main(String[] args) {

SpringApplication.run(Consumer.class, args);

}

 

啟動

@SpringBootApplication

public class App {

public static void main(String[] args) {

SpringApplication.run(App.class, args);

}

}

 

 

使用訊息中間注意事項

  1. 消費者程式碼不要丟擲異常,否則activqmq預設有重試機制。
  2. 如果程式碼發生異常,需要釋出版本才可以解決的問題,不要使用重試機制,採用日誌記錄方式,定時Job進行補償。
  3. 如果不需要釋出版本解決的問題,可以採用重試機制進行補償。

 

消費者如果保證訊息冪等性,不被重複消費

產生原因:網路延遲傳輸中,會造成進行MQ重試中,在重試過程中,可能會造成重複消費。

解決辦法: 

1.使用全域性MessageID 判斷消費方使用同一個,解決冪等性。

2.使用JMS可靠訊息機制