看看吧!月薪20K以上的程式設計師才能全部掌握RabbitMq知識,你掌握了多少
一、RabbitMq基礎知識
0、概述
訊息佇列的作用就是接收訊息生產者的訊息,然後將訊息傳送到消費者
1、通道channel
我的理解是生產者/消費者和rabbitmq互動的一個通道,負責交換機、佇列管理;訊息釋出和消費管理;事務管理等
2、交換機
四種交換機:
direct:可以用一個或者多個key繫結到一個或者多個佇列上
topic:支援路由的適配符 # *
Fanout廣播:將訊息傳送給所有的佇列
Header頭交換機:自定義通過頭訊息屬性來定義路由的匹配
3、佇列:儲存訊息的佇列
4、消費者:訊息的接收者
5、生產者:訊息的傳送者
二、 使用com.rabbitmq.client.*
操作mq
2.1、基本操作
0、環境和依賴
<!-- 環境
* jdk 1.8
* idea
* springboot 2.2.6
-->
<!-- 依賴 這裡只匯入這個包,其中包含了Rabbit client的包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1、建立連線和通道
//獲取連線 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");//mq主機地址 factory.setPort(5672);//埠,預設時5672 factory.setUsername("leyou"); factory.setPassword("leyou"); factory.setVirtualHost("/leyou"); Connection connection = factory.newConnection(); //獲取通道 Channel channel = connection..createChannel();
2、申明交換機 / 佇列 / 繫結交換機和佇列
//交換機名,交換機型別 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); /** * 第一個引數是queue:要建立的佇列名 * 第二個引數是durable:是否持久化。如果為true,可以在RabbitMQ崩潰後恢復訊息 * 第三個引數是exclusive:true表示一個佇列只能被一個消費者佔有並消費 * 第四個引數是autoDelete:true表示伺服器不在使用這個佇列是會自動刪除它 * 第五個引數是arguments:包括死信佇列,佇列的ttl */ channel.queueDeclare(QUEUE_ONE,true,false,false,null); //繫結交換機和佇列 佇列名,交換機名,routekey channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);
3、釋出訊息
//1、交換機名 2、routekey 3、mandatory強制(需要return回撥時必須設定為true) 4、釋出訊息引數 5、訊息
channel.basicPublish(EXCHANGE,GIRL,true,null,"xxx降價了".getBytes());
4、接收訊息
//接收訊息前也需要獲取連線和channel,申明佇列
//接收訊息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//拿到訊息
System.out.println(new String(body,"utf-8"));
}
};
/**
* 引數說明
* 1:佇列名字
* 2:是否自動應答 autoACk,為false時需要手動ack
* 3:消費者,當接收到消費者時會呼叫給物件中的 handleDelivery 方法
*/
channel.basicConsume(QUEUE_ONE,true,consumer);
2.2、基本應用
1、功能:
有兩個人小明和小華,小明對美女感興趣,小華對股票和沒事感興趣,使用訊息佇列將他們感興趣的訊息傳送給他們兩個
2、實現:
(1)寫一個類來提供建立連線和通道;
(2)生產者(傳送訊息方)類傳送訊息
(3)消費者(接收訊息)類接收訊息
- 連線類
public class ConnectionUtil {
/**
* 使用原始的rabbitmq client api 操作mq
*/
private static ConnectionFactory factory = new ConnectionFactory();
private static Connection connection;
/*
獲取連線
注意導包:需要導client下面的包
*/
public static Connection getConnection() throws IOException, TimeoutException {
// factory.setHost("localhost");
// factory.setPort(5672);
factory.setUsername("leyou");
factory.setPassword("leyou");
factory.setVirtualHost("/leyou");
connection = factory.newConnection();
return connection;
}
public static void close() throws IOException {
connection.close();
}
/*
建立通道
*/
public static Channel getChannel() throws IOException, TimeoutException {
return getConnection().createChannel();
}
}
- 生產者
//生產者
public class provice{
public void producerMsg() throws IOException, TimeoutException, InterruptedException {
Channel channel = ConnectionUtil.getChannel();
String EXCHANGE = "direct_exchange";
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
//定義兩個佇列名
String QUEUE_ONE = "beauty_queue";
String QUEUE_TWO = "food_queue";
channel.queueDeclare(QUEUE_ONE,true,false,false,null);
channel.queueDeclare(QUEUE_TWO,true,false,false,null);
//定義三個key
String GIRL = "girl";
String SHARE = "share";
String FOOD = "food";
//繫結
channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);
channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE);
channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD);
//傳送訊息
/**
* 引數:1交換機,2routekey 3 mandatory:強制;(需要return回撥時必須設定為true)
* 3引數,4訊息位元組資料
*/
channel.basicPublish(EXCHANGE,GIRL,true,null,"快看,是她".getBytes());
channel.basicPublish(EXCHANGE,SHARE,true,null,"股票漲了".getBytes());
channel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基降價了".getBytes());
//關閉連線
channel.close();
ConnectionUtil.close();
}
}
- 消費者
public class ConsumerMq { // 消費訊息
/**
* 使用原始的rabbitmq client api 操作mq
*/
String EXCHANGE = "direct_exchange";
String QUEUE_ONE = "beauty_queue";
String QUEUE_TWO = "food_queue";
//key
String GIRL = "girl";
String SHARE = "share";
String FOOD = "food";
public void consumer() throws IOException, TimeoutException {
Channel channel = ConnectionUtil.getChannel();
/**
* 第一個引數是queue:要建立的佇列名
* 第二個引數是durable:是否持久化。如果為true,可以在RabbitMQ崩潰後恢復訊息
* 第三個引數是exclusive:true表示一個佇列只能被一個消費者佔有並消費
* 第四個引數是autoDelete:true表示伺服器不在使用這個佇列是會自動刪除它
* 第五個引數是arguments:包括死信佇列,佇列的ttl,
*/
channel.queueDeclare(QUEUE_ONE,true,false,false,null);
channel.queueDeclare(QUEUE_TWO,true,false,false,null);
//在生產者綁定了交換機和佇列,在這裡就不需要繫結
//channel.queueBind(QUEUE_ONE,EXCHANGE,GIRL);
//channel.queueBind(QUEUE_TWO,EXCHANGE,SHARE);
//channel.queueBind(QUEUE_TWO,EXCHANGE,FOOD);
//接收訊息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
//手動應答ack可以在該方法中進行;引數:1.訊息tag,2.是否批量ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* 引數說明
* 1:佇列名字
* 2:是否自動應答 autoACk 為false時需要手動ack
* 3:消費者,當接收到消費者時會呼叫給物件中的 handleDelivery 方法
*/
channel.basicConsume(QUEUE_ONE,false,consumer);
channel.basicConsume(QUEUE_TWO,false,consumer);
}
}
2.3、mq事務,傳送方確認,和訊息回撥
概述
訊息的傳送鏈路 生產者 -> exchange --> queue --> 消費者;為確保訊息傳送到rabbitmq,amqp協議提供了三個機制來保證:事務,傳送方確認(ack),訊息回撥(returncallback);事務的方式和資料庫的事務類似,這裡不做詳細介紹;傳送方確認是當訊息傳送到交換機時, broker(實現amqp協議的服務端,這裡指rabbitmq)會回調發送者的一個固定方法來確認訊息成功傳送;訊息回撥是發生在交換機通過路由key轉發到佇列的過程中,如果訊息不能通過key找到對應的queue則回撥一個固定方法將訊息返回給生產者,確保訊息不丟失
1、mq事務
- rabbitMq是支援事務的,但是使用事務的效率很低,在訊息數量很大的情況下影響效能
2、傳送方確認
對於固定訊息體大小和執行緒數,如果訊息持久化,生產者confirm(或者採用事務機制),消費者ack那麼對效能有很大的影響.
訊息持久化的優化沒有太好方法,用更好的物理儲存(SAS, SSD, RAID卡)總會帶來改善。生產者confirm這一環節的優化則主要在於客戶端程式的優化之上。歸納起來,客戶端實現生產者confirm有三種程式設計方式:
- 普通confirm模式:每傳送一條訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。實際上是一種序列confirm了。
- 批量confirm模式:每傳送一批訊息後,呼叫waitForConfirms()方法,等待伺服器端confirm。
- 非同步confirm模式:提供一個回撥方法,服務端confirm了一條或者多條訊息後Client端會回撥這個方法。
- [ ] 普通confirm模式
//要點
//第1種
//普通confirm模式最簡單,publish一條訊息後,等待伺服器端confirm,如果服務端返回false或者超時時間內未返回,客戶端進行訊息重傳。
//1.發訊息前
channel.confirmSelect();
//2.發訊息後
//判斷訊息傳送是否成功
if(channel.waitForConfirms()){
System.out.println("訊息傳送成功");
}
- [ ] 批量confirm模式
批量confirm模式稍微複雜一點,客戶端程式需要定期(每隔多少秒)或者定量(達到多少條)或者兩則結合起來publish訊息,然後等待伺服器端confirm, 相比普通confirm模式,批量極大提升confirm效率,但是問題在於一旦出現confirm返回false或者超時的情況時,客戶端需要將這一批次的訊息全部重發,這會帶來明顯的重複訊息數量,並且,當訊息經常丟失時,批量confirm效能應該是不升反降的。
channel.confirmSelect();
for(int i=0;i<batchCount;i++){
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}
非同步confirm模式
非同步confirm模式的程式設計實現最複雜,Channel物件提供的ConfirmListener()回撥方法只包含deliveryTag(當前Chanel發出的訊息序號),我們需要自己為每一個Channel維護一個unconfirm的訊息序號集合,每publish一條資料,集合中元素加1,每回調一次handleAck方法,unconfirm集合刪掉相應的一條(multiple=false)或多條(multiple=true)記錄。從程式執行效率上看,這個unconfirm集合最好採用有序集合SortedSet儲存結構。實際上,SDK中的waitForConfirms()方法也是通過SortedSet維護訊息序號的。
關鍵程式碼:
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
//別忘這行程式碼
channel.confirmSelect();
//新增監聽器
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
3、訊息回撥
//要點
//1.傳送訊息是將第三個引數mandatory設定為true
channel.basicPublish(EXCHANGE,FOOD,true,null,"肯德基降價了".getBytes());
//2.新增訊息回撥監聽器
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println("訊息不可路由"+new String(bytes,"utf-8"));
}
});
//注意:開啟回撥不能關閉連線和通道,
2.4、接收方確認
1、概述
接收方ack分為手動和自動,在接收訊息時設定
//第二個引數就是指定是否手動ack false時為手動
channel.basicConsume(QUEUE_ONE,false,consumer);
手動ack有三種
- 單個確認
- 單個拒絕
- 批量拒絕
2、程式碼實現
單個確認ack
//接收訊息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
//手動應答ack可以在該方法中進行;引數:1.訊息tag,2.是否批量ack
channel.basicAck(envelope.getDeliveryTag(),false);
//拒絕訊息;引數:1.訊息tag;2.訊息是否重新入隊,當只有一個消費者時,會引起重複消費
channel.basicReject(envelope.getDeliveryTag(),false);
//批量ack訊息;引數:1.訊息tag;2.是否批量ack訊息,3.是否重回佇列
channel.basicNack(envelope.getDeliveryTag(),true,false);
}
};
//這裡只需要條應答的語句,我這裡知識都列出來
channel.basicConsume(QUEUE_ONE,false,consumer);
//注意上面第二個引數要為false才能手動ack
2.5、訊息TTL和佇列TTL、死信佇列、延遲佇列
這一塊暫時不使用原始RabbitMq Client API實現,後面再研究,但是會使用下面的org.springframework.amqp
來實現
三、使用org.springframework.amqp
操作mq
3.1、前言:
Spring
對RabbitMp
進行了抽象,將交換機,佇列,訊息,繫結,連線等抽象出實體類,方便操作,還提供了RabbitAdmit
和RabbitTemplate
來方便交換機佇列的管理以及訊息的傳送接收等
3.2、基本例項
0、環境和依賴
<!-- 環境
* jdk 1.8
* idea
* springboot 2.2.6
-->
<!-- 依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1、例項
傳送訊息告訴消費者超時打折了快來購物
- 配置類
@Configuration
public class RabbitConfig {
private final static Logger log = LoggerFactory.getLogger(RabbitConfig.class);
private final static String EXCHANGE_NAME = "verification_code_exchange";
private final static String VERIFICATION_CODE_QUEUE = "verification_code_queue";
private final static String VERIFICATION_CODE_ROUTE_KEY = "verification_code_key";
//死信交換機和佇列和key
private final static String DLX_EXCHANGE_NAME = "dlx-exchange";
private final static String DLX_KEY = "verification_code_key";
@Bean
public CachingConnectionFactory connectionFactory(){
CachingConnectionFactory conn = new CachingConnectionFactory();
conn.setUsername("leyou");
conn.setPassword("leyou");
conn.setVirtualHost("/leyou");
//訊息傳送到mq傳送確認訊息給生產者
conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
//訊息傳送到mq,通過繫結的key找不到queue,則傳送訊息給生產者
conn.setPublisherReturns(true);
return conn;
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//設定訊息序列化
rabbitTemplate.setMessageConverter(converter());
//訊息的確認回撥
// rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
// @Override
// public void confirm(CorrelationData correlationData, boolean b, String s) {
//
// }
// });
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
//ack為確認訊息是否成功傳送到mq
if(ack){
//成功傳送
log.info("訊息傳送成功");
}
});
//改標誌位設定位true時,當交換機根據自身型別和routeKey無法找到對應的佇列時,
// 則mq會將訊息返還給生產者
//當為false時則mq會將訊息直接刪除
rabbitTemplate.setMandatory(true);
//訊息,返回碼,返回內容,交換機,路由key
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{
//訊息
log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey);
});
return rabbitTemplate;
}
/**
* 注入rabbitadmin 用來申明交換機和佇列,主要作用是代替原始的使用channl申明的做法,全部交給這個物件來完成
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmit(CachingConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
return rabbitAdmin;
}
/**
* 訊息序化物件
* 預設使用的是JDK的序列化,這裡配置了後就可以將訊息序列化為json格式
*/
@Bean
public MessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
/**
* 申明一個交換機
*/
@Bean
public DirectExchange verificationCodeExchange(RabbitAdmin rabbitAdmin){
DirectExchange exchange = new DirectExchange(EXCHANGE_NAME);
rabbitAdmin.declareExchange(exchange);
return exchange;
}
/**
* 申明一個佇列
* @param rabbitAdmin
* @return
*/
@Bean
public Queue getQueue(RabbitAdmin rabbitAdmin){
Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,null);
rabbitAdmin.declareQueue(queue);
return queue;
}
/**
* 申明一個繫結
* @param rabbitAdmin
* @param verificationCodeExchange
* @return
*/
@Bean
public Binding bindingQueue(RabbitAdmin rabbitAdmin,DirectExchange verificationCodeExchange){
Binding with = BindingBuilder.bind(getQueue(rabbitAdmin)).to(verificationCodeExchange).with(VERIFICATION_CODE_ROUTE_KEY);
rabbitAdmin.declareBinding(with);
return with;
}
}
說明:上面用到了生產者confirm和訊息回撥機制
1、生產者confirm關鍵程式碼://1、建立連線時 conn.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); //2、建立rabbitTemplate時 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { //ack為確認訊息是否成功傳送到mq if(ack){ //成功傳送 log.info("訊息傳送成功"); } });
2、訊息回撥機制關鍵程式碼:
//1、建立連線時 conn.setPublisherReturns(true); //2、建立rabbitTemplate時 //改標誌位設定位true時,當交換機根據自身型別和routeKey無法找到對應的佇列時, // 則mq會將訊息返還給生產者 //當為false時則mq會將訊息直接刪除 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)->{ //訊息 log.info("message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",message,replyCode,replyText,exchange,routingKey); });
生產者:
@Component
public class RabbitSender {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(){
//構建訊息
Message message = MessageBuilder.withBody(
JSONObject.toJSONString(MessageModel.builder().id(msgId).context("超市打折,快來搶購!").build()).getBytes()).build();
//訊息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//訊息的媒體型別
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
//訊息的自定義關聯id
CorrelationData correlationData = new CorrelationData(String.valueOf(msgId));
rabbitTemplate.convertAndSend(exchange,routingKey,message,new MessagePostProcessor(){
//訊息後置處理器,可以在下面這個方法中對訊息進行相關屬性的設定
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//比如可以設定上面 這些屬性等
//message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//訊息持久化問題 //message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);//訊息的媒體型別
return message;
}
},correlationData);
}
}
消費者
@Component
public class RabbitReceive {
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(value = EXCHANGE_NAME, type = ExchangeTypes.DIRECT),
key = VERIFICATION_CODE_ROUTE_KEY,
value = @Queue(value = VERIFICATION_CODE_QUEUE, autoDelete = "false"),
ignoreDeclarationExceptions = "true"),
concurrency = "1", // 指定監聽該佇列的消費者個數
ackMode = "MANUAL"// 手動ack
)
public void receiveCode(Channel channel, Message msg, @Headers Map<String, Object> headers) throws IOException, InterruptedException {
String msgId = (String) headers.get("spring_listener_return_correlation");
long tag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(tag, false);
}
}
其中:傳送方確認(生產者confirm)、訊息回撥上面程式碼都包含了;消費者ack則和原始方法是一樣的
下面介紹訊息TTL,佇列TTL,死信佇列,延遲佇列
- 訊息和佇列的TTL
//訊息ttl
//在構建訊息時設定訊息的過期時間
Message message = MessageBuilder.withBody(
JSONObject.toJSONString(MessageModel.builder().id(msgId).context("超市打折,快來搶購!").build()).getBytes()).build();
//訊息的過期時間
message.getMessageProperties().setExpiration("5000");
//佇列的ttl
//在建立佇列時通過引數設定
Map<String, Object> args = new HashMap<>();
//指定死信交換機
args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
//指定死信佇列的key
args.put("x-dead-letter-routing-key", DLX_KEY);
//設定佇列中訊息的過期時間 ms
args.put("x-message-ttl",10000);
//整個佇列的過期時間,過期後整個佇列會被刪除
//args.put("x-expires",10000);
Queue queue = new Queue(VERIFICATION_CODE_QUEUE, true,false,false,args);
上面還包括死信佇列的屬性設定,和死信佇列key,關於死信佇列的配置,還需要配置一個死信交換機和一個死信佇列;當有訊息或佇列的ttl過期,訊息超過佇列最大長度,訊息被拒絕且設定不重新回佇列,則訊息會被轉發到死信交換機,再轉發到死信佇列。
- 關於延遲佇列的實現方法有兩種
- 使用死信佇列,用一個設定了ttl的佇列來存放訊息,該佇列不需要消費者監聽,然後給該佇列配置死信交換機和佇列,消費者監聽死信佇列,這樣就能達到時間達到延遲收到訊息的目的
- 使用rabbitmq外掛的方式實現,這裡先不寫,放到下一篇筆記中
最後
感謝你看到這裡,看完有什麼的不懂的可以在評論區問我,覺得文章對你有幫助的話記得給我點個贊,每天都會分享java相關技術文章或行業資訊,歡迎大家關注和轉發文章!