演算法09 五大查詢之:雜湊查詢
一、引言
模組之間的耦合度多高,導致一個模組宕機後,全部功能都不能用了,並且同步通訊的成本過高,使用者體驗差。
二、RabbitMQ介紹
市面上比較火爆的幾款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
語言的支援:ActiveMQ,RocketMQ只支援Java語言,Kafka可以支援多們語言,RabbitMQ支援多種語言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒級別,RabbitMQ是微秒級別的。
訊息丟失,訊息重複問題: RabbitMQ針對訊息的持久化,和重複問題都有比較成熟的解決方案。
學習成本:RabbitMQ非常簡單。
RabbitMQ是由Rabbit公司去研發和維護的,最終是在Pivotal。
RabbitMQ嚴格的遵循AMQP協議,高階訊息佇列協議,幫助我們在程序之間傳遞非同步訊息。
三、RabbitMQ安裝
version: "3.1" services: rabbitmq: image: daocloud.io/library/rabbitmq:management restart: always container_name: rabbitmq ports: - 5672:5672 - 15672:15672 volumes: - ./data:/var/lib/rabbitmq
四、RabbitMQ架構【重點
】
4.1 官方的簡單架構圖
Publisher - 生產者:釋出訊息到RabbitMQ中的Exchange
Consumer - 消費者:監聽RabbitMQ中的Queue中的訊息
Exchange - 交換機:和生產者建立連線並接收生產者的訊息
Queue - 佇列:Exchange會將訊息分發到指定的Queue,Queue和消費者進行互動
Routes - 路由:交換機以什麼樣的策略將訊息釋出到Queue
4.3 檢視圖形化介面並建立一個Virtual Host
建立一個全新的使用者和全新的Virtual Host,並且將test使用者設定上可以操作/test的許可權
五、RabbitMQ的使用【重點
】
5.1 RabbitMQ的通訊方式
5.2 Java連線RabbitMQ
5.2.1 建立maven專案
…………
5.2.2 匯入依賴
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
5.2.3 建立工具類連線RabbitMQ
public static Connection getConnection(){
// 建立Connection工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.199.109");
factory.setPort(5672);
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("/test");
// 建立Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回
return conn;
}
5.3 Hello-World
一個生產者,一個預設的交換機,一個佇列,一個消費者
結構圖 |
---|
建立生產者,建立一個channel,釋出訊息到exchange,指定路由規則。
@Test
public void publish() throws Exception {
//1. 獲取Connection
Connection connection = RabbitMQClient.getConnection();
//2. 建立Channel
Channel channel = connection.createChannel();
//3. 釋出訊息到exchange,同時指定路由的規則
String msg = "Hello-World!";
// 引數1:指定exchange,使用""。
// 引數2:指定路由的規則,使用具體的佇列名稱。
// 引數3:指定傳遞的訊息所攜帶的properties,使用null。
// 引數4:指定釋出的具體訊息,byte[]型別
channel.basicPublish("","HelloWorld",null,msg.getBytes());
// Ps:exchange是不會幫你將訊息持久化到本地的,Queue才會幫你持久化訊息。
System.out.println("生產者釋出訊息成功!");
//4. 釋放資源
channel.close();
connection.close();
}
建立消費者,建立一個channel,建立一個佇列,並且去消費當前佇列
@Test
public void consume() throws Exception {
//1. 獲取連線物件
Connection connection = RabbitMQClient.getConnection();
//2. 建立channel
Channel channel = connection.createChannel();
//3. 宣告佇列-HelloWorld
//引數1:queue - 指定佇列的名稱
//引數2:durable - 當前佇列是否需要持久化(true)
//引數3:exclusive - 是否排外(conn.close() - 當前佇列會被自動刪除,當前佇列只能被一個消費者消費)
//引數4:autoDelete - 如果這個佇列沒有消費者在消費,佇列自動刪除
//引數5:arguments - 指定當前佇列的其他資訊
channel.queueDeclare("HelloWorld",true,false,false,null);
//4. 開啟監聽Queue
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到訊息:" + new String(body,"UTF-8"));
}
};
//引數1:queue - 指定消費哪個佇列
//引數2:autoAck - 指定是否自動ACK (true,接收到訊息後,會立即告訴RabbitMQ)
//引數3:consumer - 指定消費回撥
channel.basicConsume("HelloWorld",true,consume);
System.out.println("消費者開始監聽佇列!");
// System.in.read();
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
5.4 Work
一個生產者,一個預設的交換機,一個佇列,兩個消費者
結構圖 |
---|
只需要在消費者端,新增Qos能力以及更改為手動ack即可讓消費者,根據自己的能力去消費指定的訊息,而不是預設情況下由RabbitMQ平均分配了,生產者不變,正常釋出訊息到預設的exchange,並指定routing
消費者指定Qoa和手動ack
//1 指定當前消費者,一次消費多少個訊息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者1號接收到訊息:" + new String(body,"UTF-8"));
//2. 手動ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//3. 指定手動ack
channel.basicConsume("Work",false,consumer);
5.5 Publish/Subscribe
一個生產者,一個交換機,兩個佇列,兩個消費者
結構圖 |
---|
宣告一個Fanout型別的exchange,並且將exchange和queue繫結在一起,繫結的方式就是直接繫結。
讓生產者建立一個exchange並且指定型別,和一個或多個佇列繫結到一起。
//3. 建立exchange - 繫結某一個佇列
//引數1: exchange的名稱
//引數2: 指定exchange的型別 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("pubsub-queue1","pubsub-exchange","");
channel.queueBind("pubsub-queue2","pubsub-exchange","");
消費者還是正常的監聽某一個佇列即可。
5.6 Routing
一個生產者,一個交換機,兩個佇列,兩個消費者
結構圖 |
---|
生產者在建立DIRECT型別的exchange後,根據RoutingKey去繫結相應的佇列,並且在傳送訊息時,指定訊息的具體RoutingKey即可。
//3. 建立exchange, routing-queue-error,routing-queue-info,
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue-error","routing-exchange","ERROR");
channel.queueBind("routing-queue-info","routing-exchange","INFO");
//4. 釋出訊息到exchange,同時指定路由的規則
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());
消費者沒有變化
5.7 Topic
一個生產者,一個交換機,兩個佇列,兩個消費者
結構圖 |
---|
生產者建立Topic的exchange並且繫結到佇列中,這次繫結可以通過*和#關鍵字,對指定RoutingKey內容,編寫時注意格式 xxx.xxx.xxx去編寫, * -> 一個xxx,而# -> 代表多個xxx.xxx,在傳送訊息時,指定具體的RoutingKey到底是什麼。
//2. 建立exchange並指定繫結方式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
channel.queueBind("topic-queue-2","topic-exchange","fast.#");
channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");
//3. 釋出訊息到exchange,同時指定路由的規則
channel.basicPublish("topic-exchange","fast.red.monkey",null,"紅快猴子".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白貓".getBytes());
消費者只是監聽佇列,沒變化。
註釋:
交換機必須得根據選擇的模式來建立不同的交換機
*號的表示必須得有一級,
#號表示的是可以有,也可以沒有,也可以是多級
RabbitMQ有四種交換機型別,分別是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。
Direct Exchange:路由交換機,只能精準匹配
Fanout Exchange:釋出訂閱交換機(速度最快)
Topic Exchange:主題交換機,可以模糊匹配
Headers Exchanges:不處理路由鍵。而是根據傳送的訊息內容中的headers屬性進行匹配。
高階:
ttl:設定佇列的過期時間,訊息超時之後,就會被移除
死信佇列:訊息因為某些條件等原因而沒有被消費,之後被移除了,可以將訊息移到死信佇列中處理。
可以將這些條件加入到引數中,這樣就可以根據這些引數對訊息進行過濾
六、RabbitMQ整合SpringBoot【重點
】
6.1 SpringBoot整合RabbitMQ
6.1.1 建立SpringBoot工程
6.1.2 匯入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.1.3 編寫配置檔案
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
6.1.4 宣告exchange、queue
@Configuration
public class RabbitMQConfig {
//1. 建立exchange - topic
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("boot-topic-exchange",true,false);
}
//2. 建立queue
@Bean
public Queue getQueue(){
return new Queue("boot-queue",true,false,false,null);
}
//3. 繫結在一起
@Bean
public Binding getBinding(TopicExchange topicExchange,Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
}
6.1.5 釋出訊息到RabbitMQ
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","紅色大狼狗!!");
}
6.1.6 建立消費者監聽訊息
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(Object message){
System.out.println("接收到訊息:" + message);
}
}
6.2 手動Ack
6.2.1 新增配置檔案
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
6.2.2 手動ack
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到訊息:" + msg);
int i = 1 / 0;
// 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
七、RabbitMQ的其他操作
7.1 訊息的可靠性
RabbitMQ的事務:事務可以保證訊息100%傳遞,可以通過事務的回滾去記錄日誌,後面定時再次傳送當前訊息。事務的操作,效率太低,加了事務操作後,比平時的操作效率至少要慢100倍。
RabbitMQ除了事務,還提供了Confirm的確認機制,這個效率比事務高很多。
7.1.1 普通Confirm方式
//3.1 開啟confirm
channel.confirmSelect();
//3.2 傳送訊息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//3.3 判斷訊息傳送是否成功
if(channel.waitForConfirms()){
System.out.println("訊息傳送成功");
}else{
System.out.println("傳送訊息失敗");
}
7.1.2 批量Confirm方式。
//3.1 開啟confirm
channel.confirmSelect();
//3.2 批量傳送訊息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 確定批量操作是否成功
channel.waitForConfirmsOrDie(); // 當你傳送的全部訊息,有一個失敗的時候,就直接全部失敗 丟擲異常IOException
7.1.3 非同步Confirm方式。
//3.1 開啟confirm
channel.confirmSelect();
//3.2 批量傳送訊息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 開啟非同步回撥
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("訊息傳送成功,標識:" + deliveryTag + ",是否是批量" + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("訊息傳送失敗,標識:" + deliveryTag + ",是否是批量" + multiple);
}
});
7.1.4 Return機制
Confirm只能保證訊息到達exchange,無法保證訊息可以被exchange分發到指定queue。
而且exchange是不能持久化訊息的,queue是可以持久化訊息。
採用Return機制來監聽訊息是否從exchange送到了指定的queue中
訊息傳遞可靠性 |
---|
開啟Return機制,並在傳送訊息時,指定mandatory為true
// 開啟return機制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 當訊息沒有送達到queue時,才會執行。
System.out.println(new String(body,"UTF-8") + "沒有送達到Queue中!!");
}
});
// 在傳送訊息時,指定mandatory引數為true
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());
7.2 SpringBoot實現
7.2.1 編寫配置檔案
spring:
rabbitmq:
publisher-confirm-type: simple
publisher-returns: true
7.2.2 開啟Confirm和Return
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // init-method
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("訊息已經送達到Exchange");
}else{
System.out.println("訊息沒有送達到Exchange");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("訊息沒有送達到Queue");
}
}
7.3 避免訊息重複消費
重複消費訊息,會對非冪等行操作造成問題
重複消費訊息的原因是,消費者沒有給RabbitMQ一個ack
為了解決訊息重複消費的問題,可以採用Redis,在消費者消費訊息之前,現將訊息的id放到Redis中,
id-0(正在執行業務)
id-1(執行業務成功)
如果ack失敗,在RabbitMQ將訊息交給其他的消費者時,先執行setnx,如果key已經存在,獲取他的值,如果是0,當前消費者就什麼都不做,如果是1,直接ack。
極端情況:第一個消費者在執行業務時,出現了死鎖,在setnx的基礎上,再給key設定一個生存時間。
生產者,傳送訊息時,指定messageId
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) //指定訊息書否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());
消費者,在消費訊息時,根據具體業務邏輯去操作redis
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Jedis jedis = new Jedis("192.168.199.109",6379);
String messageId = properties.getMessageId();
//1. setnx到Redis中,預設指定value-0
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if(result != null && result.equalsIgnoreCase("OK")) {
System.out.println("接收到訊息:" + new String(body, "UTF-8"));
//2. 消費成功,set messageId 1
jedis.set(messageId,"1");
channel.basicAck(envelope.getDeliveryTag(),false);
}else {
//3. 如果1中的setnx失敗,獲取key對應的value,如果是0,return,如果是1
String s = jedis.get(messageId);
if("1".equalsIgnoreCase(s)){
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
};
7.4 SpringBoot如何實現
7.4.1 匯入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
7.4.2 編寫配置檔案
spring:
redis:
host: 192.168.199.109
port: 6379
7.4.3 修改生產者
@Test
void contextLoads() throws IOException {
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","紅色大狼狗!!",messageId);
System.in.read();
}
7.4.4 修改消費者
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 獲取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 設定key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消費訊息
System.out.println("接收到訊息:" + msg);
//3. 設定key的value為1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 獲取Redis中的value即可 如果是1,手動ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
八、RabbitMQ應用
8.1 客戶模組
8.1.1 匯入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
8.1.2 編寫配置檔案
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
8.1.3 編寫配置類
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("openapi-customer-exchange",true,false);
}
@Bean
public Queue queue(){
return new Queue("openapi-customer-queue");
}
@Bean
public Binding binding(Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
8.1.4 修改Service
//3. 傳送訊息
rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.add",JSON.toJSON(customer));
/*//3. 調用搜索模組,新增資料到ES
//1. 準備請求引數和請求頭資訊
String json = JSON.toJSON(customer);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
HttpEntity<String> entity = new HttpEntity<>(json,headers);
//2. 使用RestTemplate調用搜索模組
restTemplate.postForObject("http://localhost:8080/search/customer/add", entity, String.class);*/
8.2 客戶模組
8.2.1 匯入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
8.2.2 編寫配置檔案
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual
8.2.3 編寫配置類
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("openapi-customer-exchange",true,false);
}
@Bean
public Queue queue(){
return new Queue("openapi-customer-queue");
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
8.2.4 編寫消費者
@Component
public class CustomerListener {
@Autowired
private CustomerService customerService;
@RabbitListener(queues = "openapi-customer-queue")
public void consume(String json, Channel channel, Message message) throws IOException {
//1. 獲取RoutingKey
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
//2. 使用switch
switch (receivedRoutingKey){
case "openapi.customer.add":
//3. add操作呼叫Service完成新增
customerService.saveCustomer(JSON.parseJSON(json, Customer.class));
//4. 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
重點:confirm機制和return機制:
springBoot中應用rabbitmq:
yml檔案配置
server:
port: 8089
spring:
rabbitmq:
host: 47.96.188.147
port: 5672
username: test
password: test
virtual-host: /
#新版本使用這個即可,開啟confirm機制
publisher-confirm-type: correlated
#老版本的話用下面這個即可 開啟確認模式
#publisher-confirms: true
#開啟確認模式 開啟returns模式
publisher-returns: true
java程式碼部分:
//監聽訊息是否傳送到佇列
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 執行了...." + replyText);
}
});
//監聽訊息是否發到交換機
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b){
System.out.println("訊息成功傳送");
}else {
System.out.println("錯誤原因" + s);
}
}
});
也可以使用實現RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback介面,重寫以上兩個方法,完成訊息確認機制