SpringBoot的RabbitMQ訊息佇列: 三、第二模式"Work queues"
上一節的兩個工程,一個負責傳送,一個負責接收,也就是一一對於的關係。
只要訊息發出了,接收者就處理;當接收效率較低時,就會出現接收者處理不過來,我們就可能會處理不過來,於是我們就可能多配置接受者。這個模式就是"Work queues",它的結構如下
多個接收者,它們會出現什麼情況呢?是否像大鍋飯,有的人撐死,有的人餓死。這個通過例子驗證。
一、再建一個接收者工程 HelloReceiving2
1、把HelloReceiver工程中的HelloRabbitConfig、HelloReceiver、logback.xml依次拷貝過去
2、修改application.properties為
#伺服器配置
spring.application.name=rabbitmq-hello-receiving
server.port=9092
#rabbitmq連線引數
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456
二、執行工程
1、在工程HelloSending所在資料夾開啟cmd,執行mvn spring-boot:run
2、在工程HelloReceiving所在資料夾開啟cmd,執行mvn spring-boot:run
3、在工程HelloReceiving2所在資料夾開啟cmd,執行mvn spring-boot:run
4、在瀏覽器中輸入http://localhost:9080/send/上帝1,http://localhost:9080/send/上帝2,http://localhost:9080/send/上帝3
觀察兩個Receiving的日誌.
查看出不均衡吧,為了突出這個不公平,我們修改傳送程式碼如下
package com.example; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class HelloSender { protected static Logger logger=LoggerFactory.getLogger(HelloSender.class); @Autowired private AmqpTemplate rabbitTemplate; public String send(String name) { String context = "hello "+name+" --" + new Date(); String sendStr; for(int i=1;i<=100;i++){ sendStr="第["+i+"]個 hello "+name+" --" + new Date(); logger.debug("HelloSender: " + sendStr); this.rabbitTemplate.convertAndSend("hello", sendStr); } return context; } }
再次http://localhost:9080/send/上帝,會發現更多的不公平。
三、Message acknowledgment 訊息確認
1、預設情況下,RabbitMQ 會順序的分發每個Message。當分發後,會將該Message刪除,然後將下一個Message分發到下一個Consumer。這種分發方式叫做round-robin
2、每個Consumer可能需要一段時間才能處理完收到的資料。如果在這個過程中,Consumer出錯了,異常退出了,而資料還沒有處理完成,那麼非常不幸,這段資料就丟失了。因為我們採用no-ack的方式進行確認,也就是說,每次Consumer接到資料後,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然後從queue中刪除了。
3、如果一個Consumer異常退出了,它處理的資料能夠被另外的Consumer處理,這樣資料在這種情況下就不會丟失了(注意是這種情況下)。
4、為了保證資料不被丟失,RabbitMQ支援訊息確認機制,即acknowledgments。為了保證資料能被正確處理而不僅僅是被Consumer收到,那麼我們不能採用no-ack。而應該是在處理完資料後傳送ack。
5、在處理資料後傳送的ack,就是告訴RabbitMQ資料已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。
6、如果Consumer退出了但是沒有傳送ack,那麼RabbitMQ就會把這個Message傳送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下資料也不會丟失。
7、這裡並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連線中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做資料處理。
訊息確認,對於spring-boot來說,就是一個開關,它就是spring.rabbitmq.listener.acknowledge-mode
acknowledgeMode有三值:
A、NONE = no acks will be sent (incompatible with channelTransacted=true).
RabbitMQ calls this "autoack" because the broker assumes all messages are acked without any action from the consumer.
B、MANUAL = the listener must acknowledge all messages by calling Channel.basicAck().
C、AUTO = the container will acknowledge the message automatically, unless the MessageListener throws an exception.
Note that acknowledgeMode is complementary to channelTransacted - if the channel is transacted then the broker requires a commit notification in addition to the ack. This is the default mode. See also txSize.
非常簡單,在application.properties中增加spring.rabbitmq.listener.acknowledge-mode=AUTO
為了更好的演示異常,我們把生產者、消費者都做了sleep.程式碼如下:
sending
package com.example;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class HelloSender {
protected static Logger logger=LoggerFactory.getLogger(HelloSender.class);
@Autowired
private AmqpTemplate rabbitTemplate;
public String send(String name) throws InterruptedException {
String context = "hello "+name+" --" + new Date();
String sendStr;
for(int i=1;i<=100;i++){
sendStr="第["+i+"]個 hello "+name+" --" + new Date();
logger.debug("HelloSender: " + sendStr);
this.rabbitTemplate.convertAndSend("hello", sendStr);
Thread.sleep(1000);
}
return context;
}
}
Receiving
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);
@RabbitHandler
public void process(String hello) {
logger.debug("HelloReceiver : " + hello);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Receiving2
package com.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {
protected static Logger logger = LoggerFactory.getLogger(HelloReceiver.class);
@RabbitHandler
public void process(String hello) {
logger.debug("HelloReceiver : " + hello);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
你會注意到兩個消費者的sleep時間不一樣,這是為了方便異常退出一個之後,檢視另一個是否接收並處理。四、訊息持久化
在上一節中我們知道了即使Consumer異常退出,Message也不會丟失。但是如果RabbitMQ Server退出呢?軟體都有bug,即使RabbitMQ Server是完美毫無bug的(當然這是不可能的,是軟體就有bug,沒有bug的那不叫軟體),它還是有可能退出的:被其它軟體影響,或者系統重啟了,系統panic了。。。
為了保證在RabbitMQ退出或者crash了資料仍沒有丟失,需要將queue和Message都要持久化。
queue持久化,就是在例項時呼叫具有引數durable的建構函式.
package com.example;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HelloRabbitConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello",true);
}
}
五、單消費者,排他佇列
訊息只能一個消費者接收處理,其它消費者只能看著,這也是佇列例項時呼叫具有引數exclusive 的建構函式。
/**
* Construct a new queue, given a name, durability, exclusive and auto-delete flags.
* @param name the name of the queue.
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
* connection)
* @param autoDelete true if the server should delete the queue when it is no longer in use
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
this(name, durable, exclusive, autoDelete, null);
}
六、引數
-
spring.rabbitmq.addresses指定client連線到的server的地址,多個以逗號分隔.
-
spring.rabbitmq.dynamic是否建立AmqpAdmin bean. 預設為: true)
-
spring.rabbitmq.host指定RabbitMQ host.預設為: localhost)
-
spring.rabbitmq.listener.acknowledge-mode指定Acknowledge的模式.
-
spring.rabbitmq.listener.auto-startup是否在啟動時就啟動mq,預設: true)
-
spring.rabbitmq.listener.concurrency指定最小的消費者數量.
-
spring.rabbitmq.listener.max-concurrency指定最大的消費者數量.
-
spring.rabbitmq.listener.prefetch指定一個請求能處理多少個訊息,如果有事務的話,必須大於等於transaction數量.
-
spring.rabbitmq.listener.transaction-size指定一個事務處理的訊息數量,最好是小於等於prefetch的數量.
-
spring.rabbitmq.password指定broker的密碼.
-
spring.rabbitmq.port指定RabbitMQ 的埠,預設: 5672)
-
spring.rabbitmq.requested-heartbeat指定心跳超時,0為不指定.
-
spring.rabbitmq.ssl.enabled是否開始SSL,預設: false)
-
spring.rabbitmq.ssl.key-store指定持有SSL certificate的key store的路徑
-
spring.rabbitmq.ssl.key-store-password指定訪問key store的密碼.
-
spring.rabbitmq.ssl.trust-store指定持有SSL certificates的Trust store.
-
spring.rabbitmq.ssl.trust-store-password指定訪問trust store的密碼.
-
spring.rabbitmq.username指定登陸broker的使用者名稱.
-
spring.rabbitmq.virtual-host指定連線到broker的Virtual host.
相關推薦
SpringBoot的RabbitMQ訊息佇列: 三、第二模式"Work queues"
上一節的兩個工程,一個負責傳送,一個負責接收,也就是一一對於的關係。 只要訊息發出了,接收者就處理;當接收效率較低時,就會出現接收者處理不過來,我們就可能會處理不過來,於是我們就可能多配置接受者。這個模式就是"Work queues",它的結構如下
redis 訊息佇列 釋出、訂閱模式
向佇列中放入元素命令 lpush key value1 value2 value3,rpush key value1 value2 value3; 從佇列中取元素命令 lpop key;rpo
RabbitMQ (訊息佇列)專題學習03 Work Queues(工作佇列)
一、概述 工作佇列(Work queues) (使用Java客戶端) 在前面的專題學習中,我們使用Java語言實現了一個簡單的從名為"hello"的佇列中傳送和接收訊息的程式,在這部內容中我們將建立一個工作佇列被用來分配定時訊息任務,而且通過多個接收者(工作者)實現。 工作
Kafka訊息佇列介紹、環境搭建及應用:C#實現消費者-生產者訂閱
一:kafka介紹 kafka(官網地址:http://kafka.apache.org)是一種高吞吐量的分散式釋出訂閱的訊息佇列系統,具有高效能和高吞吐率。 1.1 術語介紹 Broker Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker
(八)RabbitMQ訊息佇列-通過Topic主題模式分發訊息
前兩章我們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。如果對direct模式下通過routingkey來匹配訊息的模式已經有一定了解那fanout也很好理解。簡單的可以理解成direct是通過routingkey精準匹配的,而topic是通過r
三、建立模式----抽象工廠方法模式
一、抽象工廠方法模式 工廠方法模式有一個問題就是,類的建立依賴工廠類,也就是說,如果想要拓展程式,必須對工廠類進行修改,這違背了閉包原則,所以,從設計角度考慮,有一定的問題,如何解決?就用到抽象工廠模式,建立多個工廠類,這樣一旦需要增加新的功能,直接增加新的工廠類就可以了,不需要修改之前的程式碼
三、修正模式
1.懶惰匹配與貪婪匹配 貪婪匹配:匹配結果存在奇異時取其長 //表示式的含義:匹配imooc, //.點表示匹配除換行符之外的任意字元 //+ 匹配至少一次到無窮次原子,即{1,} //並且以123結尾 $pattern = '/imooc.+123/'; $subject = 'I
訊息佇列的兩種模式
Java訊息服務(Java Message Service,JMS)應用程式介面是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。 點對點與釋出訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問題
RocketMQ批量消費、訊息重試、消費模式、刷盤方式
一、Consumer 批量消費可以通過consumer.setConsumeMessageBatchMaxSize(10);//每次拉取10條這裡需要分為2種情況1、Consumer端先啟動 2、Consumer端後啟動. 正常情況下:應該是Consumer需要先啟動1
常用訊息佇列對比、選擇參考和訊息佇列認知
目錄: 1、訊息佇列之常用協議 1.1、AMQP 1.2、MQTT協議 1.3、STOMP協議 1.4、XMPP協議 2、訊息佇列之模型 3、訊息佇列的組成模組 4、常用訊息佇列介紹 4.1、RabbitMQ 4.2、ActiveMQ 4.3、Rocket
訊息佇列的兩種模式及實現
轉載:http://blog.csdn.net/heyutao007/article/details/50131089 訊息佇列的兩種模式 Java訊息服務(Java Message Service,JMS)應用程式介面是一個Java平臺中關於面向訊息中介軟體(MO
Message Queue學習筆記 --- 訊息佇列的兩種模式
wechat:812716131 ------------------------------------------------------ 技術交流群請聯絡上面wechat ----------------------------------------------
程序間通訊機制(管道、訊號、共享記憶體/訊號量/訊息佇列)、執行緒間通訊機制(互斥鎖、條件變數、posix匿名訊號量)
(1)系統中每個訊號量的資料結構(sem)struct sem { int semval; /* 訊號量的當前值 */ unsigned short semzcnt; /* # waiting for zero */ unsigned short semncnt; /* # w
SpringBoot的RabbitMQ訊息佇列: 一、訊息傳送接收第一印象
編制RabbitMQ配置、傳送、接受的程式碼 1、編寫配置檔案類 在com.example包中增加類,名稱為HelloRabbitConfig,並修改程式碼為 package com.example; import org.springframework.amqp.core.Queue;
訊息佇列及釋出/訂閱模式
摘自:《大型網站技術架構》 李智慧1 訊息驅動架構 訊息驅動架構(Event Driven Architecture) :通過在底耦合的模組之間傳輸事件訊息,以保持模組的鬆散耦合,並藉助事件訊息的通訊完成模組間合作,典型的EDA架構就是作業系統中常見的生產者消費者
訊息佇列-zmq常用通訊模式
zmq是一個訊息佇列。可以在程序內、程序間、TCP、多播中,以訊息為單位傳輸資料,而不是socket的位元組流。官方主頁上有下載、使用、文件,蠻全的。 常用模式有:Request-Reply,Publish-Subscribe,Parallel Pipeline。 Requ
MQ訊息佇列三(SpringBoot 整合rocketMq)
一. JMS規範 在瞭解rocketMq之前先了解一下jms規範,rocketmq雖然不完全基於jms規範,但是他參考了jms規範和 CORBA Notification 規範等,可以說是青出於藍而勝於藍。 JMS即Java訊息服務(Java Message Servic
剖析nsq訊息佇列(三) 訊息傳輸的可靠性和持久化[二]diskqueue
上一篇主要說了一下nsq是如何保證訊息被消費端成功消費,大概提了一下訊息的持久化,--mem-queue-size 設定為 0,所有的訊息將會儲存到磁碟。 總有人說nsq的持久化問題,消除疑慮的方法就是閱讀原碼做benchmark測試,個人感覺nsq還是很靠譜的。 nsq自己實現了一個先進先出的訊息檔案佇列g
java23種設計模式——三、工廠模式
原始碼在我的[github](https://github.com/witmy/JavaDesignPattern)和[gitee](https://gitee.com/witmy/JavaDesignPattern)中獲取 # 目錄 [java23種設計模式—— 一、設計模式介紹](https://www.
RabbitMQ知識盤點【壹】_訊息佇列介紹及三種訊息路由模式
最近在看訊息佇列的書籍,把一些收穫總結一下。 首先說說什麼是訊息佇列。這裡就不說那種教科書的定義了,以我的理解,訊息佇列就是通過接收和傳送訊息,使不同的應用系統連線起來。實現了業務系統的解耦,也跨越