RabbiMQ基礎以及spring-boot-starter-amqp使用
? RabbitMQ是一種基於amq協議的消息隊列,本文主要記錄一下rabbitmq的基礎內容以及使用spring-boot-starter-amqp
操作rabbitmq。
1,rabbitmq中的幾個重要概念
a) 虛擬主機(vhost)
? 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。虛擬主機的作用在於進行權限管控,rabbitmq默認有一個虛擬主機"/"。可以使用rabbitmqctl add_vhost
命令添加虛擬主機,然後使用rabbitmqctl set_permissions
命令設置指定用戶在指定虛擬主機下的權限,以此達到權限管控的目的。
b) 消息通道(channel)
消息通道: 在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。
c) 交換機(exchange)
? 交換機: exchange的功能是用於消息分發,它負責接收消息並轉發到與之綁定的隊列,exchange不存儲消息,如果一個exchange沒有binding任何Queue,那麽當它會丟棄生產者發送過來的消息,在啟用ACK機制後,如果exchange找不到隊列,則會返回錯誤。一個exchange可以和多個Queue進行綁定。
交換機有四種類型:
路由模式(Direct):
?direct 類型的行為是"先匹配, 再投送"。即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 才會被交換器投送到綁定的隊列中去。direct是rabbitmq的默認交換機類型。
通配符模式(Topic):
?類似路由模式,但是routing_key支持模糊匹配,按規則轉發消息(最靈活)。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。
發布訂閱模式(Fanout):
?轉發消息到所有綁定隊列,忽略routing_key。
Headers:
? 設置header attribute參數類型的交換機。相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的類型,忽略routing_key。在隊列與交換器綁定時, 會設定一組鍵值對規則, 消息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或全部匹配時, 消息被投送到對應隊列。
? 在綁定Queue與Exchange時指定一組鍵值對,當消息發送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進行匹配。如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列。headers屬性是一個鍵值對,可以是Hashtable,鍵值對的值可以是任何類型。
匹配規則x-match有下列兩種類型:
x-match = all :表示所有的鍵值對都匹配才能接受到消息
x-match = any :表示只要有鍵值對匹配就能接受到消息
2,使用spring-boot-starter-amqp
操作rabbitmq
首先添加相關依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
在application.properties
中配置rabbitmq相關配置:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=cord
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
定義rabbitmq配置類:
RabbitMQConfig.java
@Configuration
public class RabbitMQConfig {
private static final String topicExchangeName = "topic-exchange";
private static final String fanoutExchange = "fanout-exchange";
private static final String headersExchange = "headers-exchange";
private static final String queueName = "cord";
//聲明隊列
@Bean
public Queue queue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
return new Queue("cord", false, true, true);
}
//聲明Topic交換機
@Bean
TopicExchange topicExchange() {
return new TopicExchange(topicExchangeName);
}
//將隊列與Topic交換機進行綁定,並指定路由鍵
@Bean
Binding topicBinding(Queue queue, TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with("org.cord.#");
}
//聲明fanout交換機
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(fanoutExchange);
}
//將隊列與fanout交換機進行綁定
@Bean
Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
//聲明Headers交換機
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(headersExchange);
}
//將隊列與headers交換機進行綁定
@Bean
Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
Map<String, Object> map = new HashMap<>();
map.put("First","A");
map.put("Fourth","D");
//whereAny表示部分匹配,whereAll表示全部匹配
// return BindingBuilder.bind(queue).to(headersExchange).whereAll(map).match();
return BindingBuilder.bind(queue).to(headersExchange).whereAny(map).match();
}
}
定義生產者:
Producer.java
@Component
public class Producer {
@Autowired
private AmqpTemplate template;
@Autowired
private AmqpAdmin admin;
/**
* @param routingKey 路由關鍵字
* @param msg 消息體
*/
public void sendDirectMsg(String routingKey, String msg) {
template.convertAndSend(routingKey, msg);
}
/**
* @param routingKey 路由關鍵字
* @param msg 消息體
* @param exchange 交換機
*/
public void sendExchangeMsg(String exchange, String routingKey, String msg) {
template.convertAndSend(exchange, routingKey, msg);
}
/**
* @param map 消息headers屬性
* @param exchange 交換機
* @param msg 消息體
*/
public void sendHeadersMsg(String exchange, String msg, Map<String, Object> map) {
template.convertAndSend(exchange, null, msg, message -> {
message.getMessageProperties().getHeaders().putAll(map);
return message;
});
}
}
定義消費者:
Consumer.class
@Component
public class Consumer {
@RabbitListener(queues = "cord")
//@RabbitListener(queues = "cord", containerFactory="myFactory")
public void processMessage(String msg) {
System.out.format("Receiving Message: -----[%s]----- \n.", msg);
}
}
測試用例:
RabbitmqTest.java
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = CordApplication.class)
public class RabbitmqTest {
@Autowired
private Producer producer;
//Direct
@Test
public void sendDirectMsg() {
producer.sendDirectMsg("cord", String.valueOf(System.currentTimeMillis()));
}
//Topic
@Test
public void sendtopicMsg() {
producer.sendExchangeMsg("topic-exchange","org.cord.test", "hello world");
}
//Fanout
@Test
public void sendFanoutMsg() {
producer.sendExchangeMsg("fanout-exchange", "abcdefg", String.valueOf(System.currentTimeMillis()));
}
//Headers
@Test
public void sendHeadersMsg() {
Map<String, Object> map = new HashMap<>();
map.put("First","A");
producer.sendHeadersMsg("headers-exchange", "hello word", map);
}
}
https://spring.io/guides/gs/messaging-rabbitmq/
https://blog.csdn.net/qq1052441272/article/details/53940754
https://stackoverflow.com/questions/19240290/how-do-i-implement-headers-exchange-in-rabbitmq-using-java
https://blog.csdn.net/ztx114/article/details/78410727
https://www.cnblogs.com/jfl-xx/p/7324285.html
RabbiMQ基礎以及spring-boot-starter-amqp使用