1. 程式人生 > >SpringBoot的RabbitMQ訊息佇列: 三、第二模式"Work queues"

SpringBoot的RabbitMQ訊息佇列: 三、第二模式"Work queues"

    上一節的兩個工程,一個負責傳送,一個負責接收,也就是一一對於的關係。

     只要訊息發出了,接收者就處理;當接收效率較低時,就會出現接收者處理不過來,我們就可能會處理不過來,於是我們就可能多配置接受者。這個模式就是"Work queues",它的結構如下


    多個接收者,它們會出現什麼情況呢?是否像大鍋飯,有的人撐死,有的人餓死。這個通過例子驗證。

一、再建一個接收者工程 HelloReceiving2


1、把HelloReceiver工程中的HelloRabbitConfig、HelloReceiver、logback.xml依次拷貝過去

2、修改application.properties為

#伺服器配置
spring.application.name=rabbitmq-hello-receiving
server.port=9092 
#rabbitmq連線引數
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456

二、執行工程

1、在工程HelloSending所在資料夾開啟cmd,執行mvn spring-boot:run

2、在工程HelloReceiving所在資料夾開啟cmd,執行mvn spring-boot:run

3、在工程HelloReceiving2所在資料夾開啟cmd,執行mvn spring-boot:run
4、在瀏覽器中輸入http://localhost:9080/send/上帝1,http://localhost:9080/send/上帝2,http://localhost:9080/send/上帝3
觀察兩個Receiving的日誌.

查看出不均衡吧,為了突出這個不公平,我們修改傳送程式碼如下

package com.example;

import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloSender {

	protected static Logger logger=LoggerFactory.getLogger(HelloSender.class); 
	
	@Autowired
    private AmqpTemplate rabbitTemplate;

    public String send(String name) {
    	String context = "hello "+name+" --" + new Date();
    	String sendStr;
    	for(int i=1;i<=100;i++){
    		sendStr="第["+i+"]個 hello "+name+" --" + new Date(); 
    		logger.debug("HelloSender: " + sendStr);
    		this.rabbitTemplate.convertAndSend("hello", sendStr);
    	}
		return context;
    }
}

再次http://localhost:9080/send/上帝,會發現更多的不公平。

三、Message acknowledgment 訊息確認

1、預設情況下,RabbitMQ 會順序的分發每個Message。當分發後,會將該Message刪除,然後將下一個Message分發到下一個Consumer。這種分發方式叫做round-robin

2、每個Consumer可能需要一段時間才能處理完收到的資料。如果在這個過程中,Consumer出錯了,異常退出了,而資料還沒有處理完成,那麼非常不幸,這段資料就丟失了。因為我們採用no-ack的方式進行確認,也就是說,每次Consumer接到資料後,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然後從queue中刪除了。

3、如果一個Consumer異常退出了,它處理的資料能夠被另外的Consumer處理,這樣資料在這種情況下就不會丟失了(注意是這種情況下)。

4、為了保證資料不被丟失,RabbitMQ支援訊息確認機制,即acknowledgments。為了保證資料能被正確處理而不僅僅是被Consumer收到,那麼我們不能採用no-ack。而應該是在處理完資料後傳送ack。

5、在處理資料後傳送的ack,就是告訴RabbitMQ資料已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。

6、如果Consumer退出了但是沒有傳送ack,那麼RabbitMQ就會把這個Message傳送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下資料也不會丟失。

7、這裡並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連線中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做資料處理。

    訊息確認,對於spring-boot來說,就是一個開關,它就是spring.rabbitmq.listener.acknowledge-mode

    acknowledgeMode有三值:

    A、NONE = no acks will be sent (incompatible with channelTransacted=true).

          RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.

    B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().

    C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.

Note that acknowledgeMode is complementary to channelTransacted - if the channel is transacted then the broker requires a commit notification in addition to the ack. This is the default mode. See also txSize.

    非常簡單,在application.properties中增加spring.rabbitmq.listener.acknowledge-mode=AUTO

為了更好的演示異常,我們把生產者、消費者都做了sleep.程式碼如下:

sending

package com.example;

import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HelloSender {

	protected static Logger logger=LoggerFactory.getLogger(HelloSender.class); 
	
	@Autowired
    private AmqpTemplate rabbitTemplate;

    public String send(String name) throws InterruptedException {
    	String context = "hello "+name+" --" + new Date();
    	String sendStr;
    	for(int i=1;i<=100;i++){
    		sendStr="第["+i+"]個 hello "+name+" --" + new Date(); 
    		logger.debug("HelloSender: " + sendStr);
    		this.rabbitTemplate.convertAndSend("hello", sendStr);
    		Thread.sleep(1000);
    	}
		return context;
    }
}
Receiving
package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
	protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);

	@RabbitHandler
	public void process(String hello) {
		logger.debug("HelloReceiver : " + hello);
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

Receiving2
package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
	protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);

	@RabbitHandler
	public void process(String hello) {
		logger.debug("HelloReceiver : " + hello);
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
你會注意到兩個消費者的sleep時間不一樣,這是為了方便異常退出一個之後,檢視另一個是否接收並處理。

四、訊息持久化

在上一節中我們知道了即使Consumer異常退出,Message也不會丟失。但是如果RabbitMQ Server退出呢?軟體都有bug,即使RabbitMQ Server是完美毫無bug的(當然這是不可能的,是軟體就有bug,沒有bug的那不叫軟體),它還是有可能退出的:被其它軟體影響,或者系統重啟了,系統panic了。。。

為了保證在RabbitMQ退出或者crash了資料仍沒有丟失,需要將queue和Message都要持久化。

queue持久化,就是在例項時呼叫具有引數durable的建構函式.

package com.example;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HelloRabbitConfig {

    @Bean
    public Queue helloQueue() {
    	return new Queue("hello",true);
    }
}

五、單消費者,排他佇列

訊息只能一個消費者接收處理,其它消費者只能看著,這也是佇列例項時呼叫具有引數exclusive 的建構函式。

    /**
     * Construct a new queue, given a name, durability, exclusive and auto-delete flags.
     * @param name the name of the queue.
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
     * connection)
     * @param autoDelete true if the server should delete the queue when it is no longer in use
     */
    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, null);
    }

六、引數

  • spring.rabbitmq.addresses指定client連線到的server的地址,多個以逗號分隔.

  • spring.rabbitmq.dynamic是否建立AmqpAdmin bean. 預設為: true)

  • spring.rabbitmq.host指定RabbitMQ host.預設為: localhost)

  • spring.rabbitmq.listener.acknowledge-mode指定Acknowledge的模式.

  • spring.rabbitmq.listener.auto-startup是否在啟動時就啟動mq,預設: true)

  • spring.rabbitmq.listener.concurrency指定最小的消費者數量.

  • spring.rabbitmq.listener.max-concurrency指定最大的消費者數量.

  • spring.rabbitmq.listener.prefetch指定一個請求能處理多少個訊息,如果有事務的話,必須大於等於transaction數量.

  • spring.rabbitmq.listener.transaction-size指定一個事務處理的訊息數量,最好是小於等於prefetch的數量.

  • spring.rabbitmq.password指定broker的密碼.

  • spring.rabbitmq.port指定RabbitMQ 的埠,預設: 5672)

  • spring.rabbitmq.requested-heartbeat指定心跳超時,0為不指定.

  • spring.rabbitmq.ssl.enabled是否開始SSL,預設: false)

  • spring.rabbitmq.ssl.key-store指定持有SSL certificate的key store的路徑

  • spring.rabbitmq.ssl.key-store-password指定訪問key store的密碼.

  • spring.rabbitmq.ssl.trust-store指定持有SSL certificates的Trust store.

  • spring.rabbitmq.ssl.trust-store-password指定訪問trust store的密碼.

  • spring.rabbitmq.username指定登陸broker的使用者名稱.

  • spring.rabbitmq.virtual-host指定連線到broker的Virtual host.


相關推薦

SpringBoot的RabbitMQ訊息佇列: 第二模式"Work queues"

    上一節的兩個工程,一個負責傳送,一個負責接收,也就是一一對於的關係。      只要訊息發出了,接收者就處理;當接收效率較低時,就會出現接收者處理不過來,我們就可能會處理不過來,於是我們就可能多配置接受者。這個模式就是"Work queues",它的結構如下   

redis 訊息佇列 釋出訂閱模式

向佇列中放入元素命令  lpush key value1 value2 value3,rpush  key value1 value2 value3; 從佇列中取元素命令  lpop  key;rpo

RabbitMQ (訊息佇列)專題學習03 Work Queues(工作佇列)

一、概述 工作佇列(Work queues) (使用Java客戶端) 在前面的專題學習中,我們使用Java語言實現了一個簡單的從名為"hello"的佇列中傳送和接收訊息的程式,在這部內容中我們將建立一個工作佇列被用來分配定時訊息任務,而且通過多個接收者(工作者)實現。 工作

Kafka訊息佇列介紹環境搭建及應用:C#實現消費者-生產者訂閱

一:kafka介紹 kafka(官網地址:http://kafka.apache.org)是一種高吞吐量的分散式釋出訂閱的訊息佇列系統,具有高效能和高吞吐率。 1.1 術語介紹 Broker Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker

(八)RabbitMQ訊息佇列-通過Topic主題模式分發訊息

前兩章我們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。如果對direct模式下通過routingkey來匹配訊息的模式已經有一定了解那fanout也很好理解。簡單的可以理解成direct是通過routingkey精準匹配的,而topic是通過r

建立模式----抽象工廠方法模式

一、抽象工廠方法模式 工廠方法模式有一個問題就是,類的建立依賴工廠類,也就是說,如果想要拓展程式,必須對工廠類進行修改,這違背了閉包原則,所以,從設計角度考慮,有一定的問題,如何解決?就用到抽象工廠模式,建立多個工廠類,這樣一旦需要增加新的功能,直接增加新的工廠類就可以了,不需要修改之前的程式碼

修正模式

1.懶惰匹配與貪婪匹配 貪婪匹配:匹配結果存在奇異時取其長 //表示式的含義:匹配imooc, //.點表示匹配除換行符之外的任意字元 //+ 匹配至少一次到無窮次原子,即{1,} //並且以123結尾 $pattern = '/imooc.+123/'; $subject = 'I

訊息佇列的兩種模式

Java訊息服務(Java Message Service,JMS)應用程式介面是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。 點對點與釋出訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問題

RocketMQ批量消費訊息重試消費模式刷盤方式

一、Consumer 批量消費可以通過consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條這裡需要分為2種情況1、Consumer端先啟動  2、Consumer端後啟動.   正常情況下:應該是Consumer需要先啟動1

常用訊息佇列對比選擇參考和訊息佇列認知

目錄: 1、訊息佇列之常用協議 1.1、AMQP 1.2、MQTT協議 1.3、STOMP協議 1.4、XMPP協議 2、訊息佇列之模型 3、訊息佇列的組成模組 4、常用訊息佇列介紹 4.1、RabbitMQ 4.2、ActiveMQ 4.3、Rocket

訊息佇列的兩種模式及實現

轉載:http://blog.csdn.net/heyutao007/article/details/50131089 訊息佇列的兩種模式 Java訊息服務(Java Message Service,JMS)應用程式介面是一個Java平臺中關於面向訊息中介軟體(MO

Message Queue學習筆記 --- 訊息佇列的兩種模式

wechat:812716131 ------------------------------------------------------ 技術交流群請聯絡上面wechat ----------------------------------------------

程序間通訊機制(管道訊號共享記憶體/訊號量/訊息佇列執行緒間通訊機制(互斥鎖條件變數posix匿名訊號量)

(1)系統中每個訊號量的資料結構(sem)struct sem {     int semval; /* 訊號量的當前值 */     unsigned short  semzcnt;  /* # waiting for zero */     unsigned short  semncnt;  /* # w

SpringBoot的RabbitMQ訊息佇列: 一訊息傳送接收第一印象

編制RabbitMQ配置、傳送、接受的程式碼 1、編寫配置檔案類 在com.example包中增加類,名稱為HelloRabbitConfig,並修改程式碼為 package com.example;   import org.springframework.amqp.core.Queue; 

訊息佇列及釋出/訂閱模式

摘自:《大型網站技術架構》 李智慧1 訊息驅動架構     訊息驅動架構(Event Driven Architecture) :通過在底耦合的模組之間傳輸事件訊息,以保持模組的鬆散耦合,並藉助事件訊息的通訊完成模組間合作,典型的EDA架構就是作業系統中常見的生產者消費者

訊息佇列-zmq常用通訊模式

zmq是一個訊息佇列。可以在程序內、程序間、TCP、多播中,以訊息為單位傳輸資料,而不是socket的位元組流。官方主頁上有下載、使用、文件,蠻全的。 常用模式有:Request-Reply,Publish-Subscribe,Parallel Pipeline。 Requ

MQ訊息佇列(SpringBoot 整合rocketMq)

一. JMS規範 在瞭解rocketMq之前先了解一下jms規範,rocketmq雖然不完全基於jms規範,但是他參考了jms規範和 CORBA Notification 規範等,可以說是青出於藍而勝於藍。 JMS即Java訊息服務(Java Message Servic

剖析nsq訊息佇列() 訊息傳輸的可靠性和持久化[二]diskqueue

上一篇主要說了一下nsq是如何保證訊息被消費端成功消費,大概提了一下訊息的持久化,--mem-queue-size 設定為 0,所有的訊息將會儲存到磁碟。 總有人說nsq的持久化問題,消除疑慮的方法就是閱讀原碼做benchmark測試,個人感覺nsq還是很靠譜的。 nsq自己實現了一個先進先出的訊息檔案佇列g

java23種設計模式——工廠模式

原始碼在我的[github](https://github.com/witmy/JavaDesignPattern)和[gitee](https://gitee.com/witmy/JavaDesignPattern)中獲取 # 目錄 [java23種設計模式—— 一、設計模式介紹](https://www.

RabbitMQ知識盤點【壹】_訊息佇列介紹及訊息路由模式

最近在看訊息佇列的書籍,把一些收穫總結一下。 首先說說什麼是訊息佇列。這裡就不說那種教科書的定義了,以我的理解,訊息佇列就是通過接收和傳送訊息,使不同的應用系統連線起來。實現了業務系統的解耦,也跨越