1. 程式人生 > 其它 >演算法09 五大查詢之:雜湊查詢

演算法09 五大查詢之:雜湊查詢

一、引言

模組之間的耦合度多高,導致一個模組宕機後,全部功能都不能用了,並且同步通訊的成本過高,使用者體驗差。

二、RabbitMQ介紹


市面上比較火爆的幾款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 語言的支援:ActiveMQ,RocketMQ只支援Java語言,Kafka可以支援多們語言,RabbitMQ支援多種語言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒級別,RabbitMQ是微秒級別的。

  • 訊息丟失,訊息重複問題: RabbitMQ針對訊息的持久化,和重複問題都有比較成熟的解決方案。

  • 學習成本:RabbitMQ非常簡單。

RabbitMQ是由Rabbit公司去研發和維護的,最終是在Pivotal。

RabbitMQ嚴格的遵循AMQP協議,高階訊息佇列協議,幫助我們在程序之間傳遞非同步訊息。

三、RabbitMQ安裝


version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./data:/var/lib/rabbitmq

四、RabbitMQ架構【重點


4.1 官方的簡單架構圖

  • Publisher - 生產者:釋出訊息到RabbitMQ中的Exchange

  • Consumer - 消費者:監聽RabbitMQ中的Queue中的訊息

  • Exchange - 交換機:和生產者建立連線並接收生產者的訊息

  • Queue - 佇列:Exchange會將訊息分發到指定的Queue,Queue和消費者進行互動

  • Routes - 路由:交換機以什麼樣的策略將訊息釋出到Queue

4.3 檢視圖形化介面並建立一個Virtual Host

建立一個全新的使用者和全新的Virtual Host,並且將test使用者設定上可以操作/test的許可權

五、RabbitMQ的使用【重點


5.1 RabbitMQ的通訊方式

5.2 Java連線RabbitMQ

5.2.1 建立maven專案

…………

5.2.2 匯入依賴
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.6.0</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>
5.2.3 建立工具類連線RabbitMQ
public static Connection getConnection(){
    // 建立Connection工廠
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("192.168.199.109");
    factory.setPort(5672);
    factory.setUsername("test");
    factory.setPassword("test");
    factory.setVirtualHost("/test");

    // 建立Connection
    Connection conn = null;
    try {
        conn = factory.newConnection();
    } catch (Exception e) {
        e.printStackTrace();
    }
    // 返回
    return conn;
}

5.3 Hello-World

一個生產者,一個預設的交換機,一個佇列,一個消費者

結構圖

建立生產者,建立一個channel,釋出訊息到exchange,指定路由規則。

@Test
public void publish() throws Exception {
    //1. 獲取Connection
    Connection connection = RabbitMQClient.getConnection();

    //2. 建立Channel
    Channel channel = connection.createChannel();

    //3. 釋出訊息到exchange,同時指定路由的規則
    String msg = "Hello-World!";
    // 引數1:指定exchange,使用""。
    // 引數2:指定路由的規則,使用具體的佇列名稱。
    // 引數3:指定傳遞的訊息所攜帶的properties,使用null。
    // 引數4:指定釋出的具體訊息,byte[]型別
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
    // Ps:exchange是不會幫你將訊息持久化到本地的,Queue才會幫你持久化訊息。
    System.out.println("生產者釋出訊息成功!");
    //4. 釋放資源
    channel.close();
    connection.close();
}

建立消費者,建立一個channel,建立一個佇列,並且去消費當前佇列

@Test
public void consume() throws Exception {
    //1. 獲取連線物件
    Connection connection = RabbitMQClient.getConnection();

    //2. 建立channel
    Channel channel = connection.createChannel();

    //3. 宣告佇列-HelloWorld
    //引數1:queue - 指定佇列的名稱
    //引數2:durable - 當前佇列是否需要持久化(true)
    //引數3:exclusive - 是否排外(conn.close() - 當前佇列會被自動刪除,當前佇列只能被一個消費者消費)
    //引數4:autoDelete - 如果這個佇列沒有消費者在消費,佇列自動刪除
    //引數5:arguments - 指定當前佇列的其他資訊
    channel.queueDeclare("HelloWorld",true,false,false,null);

    //4. 開啟監聽Queue
    DefaultConsumer consume = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("接收到訊息:" + new String(body,"UTF-8"));
        }
    };

    //引數1:queue - 指定消費哪個佇列
    //引數2:autoAck - 指定是否自動ACK (true,接收到訊息後,會立即告訴RabbitMQ)
    //引數3:consumer - 指定消費回撥
    channel.basicConsume("HelloWorld",true,consume);

    System.out.println("消費者開始監聽佇列!");
    // System.in.read();
    System.in.read();

    //5. 釋放資源
    channel.close();
    connection.close();
}

5.4 Work

一個生產者,一個預設的交換機,一個佇列,兩個消費者

結構圖

只需要在消費者端,新增Qos能力以及更改為手動ack即可讓消費者,根據自己的能力去消費指定的訊息,而不是預設情況下由RabbitMQ平均分配了,生產者不變,正常釋出訊息到預設的exchange,並指定routing

消費者指定Qoa和手動ack

//1 指定當前消費者,一次消費多少個訊息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("消費者1號接收到訊息:" + new String(body,"UTF-8"));

        //2. 手動ack
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
};
//3. 指定手動ack
channel.basicConsume("Work",false,consumer);

5.5 Publish/Subscribe

一個生產者,一個交換機,兩個佇列,兩個消費者

結構圖

宣告一個Fanout型別的exchange,並且將exchange和queue繫結在一起,繫結的方式就是直接繫結。

讓生產者建立一個exchange並且指定型別,和一個或多個佇列繫結到一起。

//3. 建立exchange - 繫結某一個佇列
//引數1: exchange的名稱
//引數2: 指定exchange的型別  FANOUT - pubsub ,   DIRECT - Routing , TOPIC - Topics
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("pubsub-queue1","pubsub-exchange","");
channel.queueBind("pubsub-queue2","pubsub-exchange","");

消費者還是正常的監聽某一個佇列即可。

5.6 Routing

一個生產者,一個交換機,兩個佇列,兩個消費者

結構圖

生產者在建立DIRECT型別的exchange後,根據RoutingKey去繫結相應的佇列,並且在傳送訊息時,指定訊息的具體RoutingKey即可。

//3. 建立exchange, routing-queue-error,routing-queue-info,
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue-error","routing-exchange","ERROR");
channel.queueBind("routing-queue-info","routing-exchange","INFO");

//4. 釋出訊息到exchange,同時指定路由的規則
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());

消費者沒有變化

5.7 Topic

一個生產者,一個交換機,兩個佇列,兩個消費者

結構圖

生產者建立Topic的exchange並且繫結到佇列中,這次繫結可以通過*和#關鍵字,對指定RoutingKey內容,編寫時注意格式 xxx.xxx.xxx去編寫, * -> 一個xxx,而# -> 代表多個xxx.xxx,在傳送訊息時,指定具體的RoutingKey到底是什麼。

//2. 建立exchange並指定繫結方式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
channel.queueBind("topic-queue-1","topic-exchange","*.red.*");
channel.queueBind("topic-queue-2","topic-exchange","fast.#");
channel.queueBind("topic-queue-2","topic-exchange","*.*.rabbit");

//3. 釋出訊息到exchange,同時指定路由的規則
channel.basicPublish("topic-exchange","fast.red.monkey",null,"紅快猴子".getBytes());
channel.basicPublish("topic-exchange","slow.black.dog",null,"黑漫狗".getBytes());
channel.basicPublish("topic-exchange","fast.white.cat",null,"快白貓".getBytes());

消費者只是監聽佇列,沒變化。

註釋:

交換機必須得根據選擇的模式來建立不同的交換機

*號的表示必須得有一級,

#號表示的是可以有,也可以沒有,也可以是多級

RabbitMQ有四種交換機型別,分別是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

Direct Exchange:路由交換機,只能精準匹配

Fanout Exchange:釋出訂閱交換機(速度最快)

Topic Exchange:主題交換機,可以模糊匹配

Headers Exchanges:不處理路由鍵。而是根據傳送的訊息內容中的headers屬性進行匹配。

高階:

ttl:設定佇列的過期時間,訊息超時之後,就會被移除
死信佇列:訊息因為某些條件等原因而沒有被消費,之後被移除了,可以將訊息移到死信佇列中處理。

可以將這些條件加入到引數中,這樣就可以根據這些引數對訊息進行過濾

六、RabbitMQ整合SpringBoot【重點


6.1 SpringBoot整合RabbitMQ

6.1.1 建立SpringBoot工程
6.1.2 匯入依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
6.1.3 編寫配置檔案
spring:
  rabbitmq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
6.1.4 宣告exchange、queue
@Configuration
public class RabbitMQConfig {
    //1. 建立exchange - topic
    @Bean
    public TopicExchange getTopicExchange(){
        return new TopicExchange("boot-topic-exchange",true,false);
    }

    //2. 建立queue
    @Bean
    public Queue getQueue(){
        return new Queue("boot-queue",true,false,false,null);
    }

    //3. 繫結在一起
    @Bean
    public Binding getBinding(TopicExchange topicExchange,Queue queue){
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    }
}
6.1.5 釋出訊息到RabbitMQ
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","紅色大狼狗!!");
}
6.1.6 建立消費者監聽訊息
@Component
public class Consumer {

    @RabbitListener(queues = "boot-queue")
    public void getMessage(Object message){
        System.out.println("接收到訊息:" + message);
    }

}

6.2 手動Ack

6.2.1 新增配置檔案
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
6.2.2 手動ack
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
    System.out.println("接收到訊息:" + msg);
    int i = 1 / 0;
    // 手動ack
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}

七、RabbitMQ的其他操作


7.1 訊息的可靠性

RabbitMQ的事務:事務可以保證訊息100%傳遞,可以通過事務的回滾去記錄日誌,後面定時再次傳送當前訊息。事務的操作,效率太低,加了事務操作後,比平時的操作效率至少要慢100倍。

RabbitMQ除了事務,還提供了Confirm的確認機制,這個效率比事務高很多。

7.1.1 普通Confirm方式
//3.1 開啟confirm
channel.confirmSelect();
//3.2 傳送訊息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//3.3 判斷訊息傳送是否成功
if(channel.waitForConfirms()){
    System.out.println("訊息傳送成功");
}else{
    System.out.println("傳送訊息失敗");
}
7.1.2 批量Confirm方式。
//3.1 開啟confirm
channel.confirmSelect();
//3.2 批量傳送訊息
for (int i = 0; i < 1000; i++) {
    String msg = "Hello-World!" + i;
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 確定批量操作是否成功
channel.waitForConfirmsOrDie();     // 當你傳送的全部訊息,有一個失敗的時候,就直接全部失敗 丟擲異常IOException

7.1.3 非同步Confirm方式。
//3.1 開啟confirm
channel.confirmSelect();
//3.2 批量傳送訊息
for (int i = 0; i < 1000; i++) {
    String msg = "Hello-World!" + i;
    channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 開啟非同步回撥
channel.addConfirmListener(new ConfirmListener() {

    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("訊息傳送成功,標識:" + deliveryTag + ",是否是批量" + multiple);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("訊息傳送失敗,標識:" + deliveryTag + ",是否是批量" + multiple);
    }
});
7.1.4 Return機制

Confirm只能保證訊息到達exchange,無法保證訊息可以被exchange分發到指定queue。

而且exchange是不能持久化訊息的,queue是可以持久化訊息。

採用Return機制來監聽訊息是否從exchange送到了指定的queue中

訊息傳遞可靠性

開啟Return機制,並在傳送訊息時,指定mandatory為true

// 開啟return機制
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 當訊息沒有送達到queue時,才會執行。
        System.out.println(new String(body,"UTF-8") + "沒有送達到Queue中!!");
    }
});

// 在傳送訊息時,指定mandatory引數為true
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());

7.2 SpringBoot實現

7.2.1 編寫配置檔案
spring:
  rabbitmq:
    publisher-confirm-type: simple
    publisher-returns: true
7.2.2 開啟Confirm和Return
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct  // init-method
    public void initMethod(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("訊息已經送達到Exchange");
        }else{
            System.out.println("訊息沒有送達到Exchange");
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("訊息沒有送達到Queue");
    }
}

7.3 避免訊息重複消費

重複消費訊息,會對非冪等行操作造成問題

重複消費訊息的原因是,消費者沒有給RabbitMQ一個ack

為了解決訊息重複消費的問題,可以採用Redis,在消費者消費訊息之前,現將訊息的id放到Redis中,

id-0(正在執行業務)

id-1(執行業務成功)

如果ack失敗,在RabbitMQ將訊息交給其他的消費者時,先執行setnx,如果key已經存在,獲取他的值,如果是0,當前消費者就什麼都不做,如果是1,直接ack。

極端情況:第一個消費者在執行業務時,出現了死鎖,在setnx的基礎上,再給key設定一個生存時間。

生產者,傳送訊息時,指定messageId

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
    .deliveryMode(1)     //指定訊息書否需要持久化 1 - 需要持久化  2 - 不需要持久化
    .messageId(UUID.randomUUID().toString())
    .build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());

消費者,在消費訊息時,根據具體業務邏輯去操作redis

DefaultConsumer consume = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        Jedis jedis = new Jedis("192.168.199.109",6379);
        String messageId = properties.getMessageId();
        //1. setnx到Redis中,預設指定value-0
        String result = jedis.set(messageId, "0", "NX", "EX", 10);
        if(result != null && result.equalsIgnoreCase("OK")) {
            System.out.println("接收到訊息:" + new String(body, "UTF-8"));
            //2. 消費成功,set messageId 1
            jedis.set(messageId,"1");
            channel.basicAck(envelope.getDeliveryTag(),false);
        }else {
            //3. 如果1中的setnx失敗,獲取key對應的value,如果是0,return,如果是1
            String s = jedis.get(messageId);
            if("1".equalsIgnoreCase(s)){
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        }
    }
};

7.4 SpringBoot如何實現

7.4.1 匯入依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
7.4.2 編寫配置檔案
spring:
  redis:
    host: 192.168.199.109
    port: 6379
7.4.3 修改生產者
@Test
void contextLoads() throws IOException {
    CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","紅色大狼狗!!",messageId);
    System.in.read();
}
7.4.4 修改消費者
@Autowired
private StringRedisTemplate redisTemplate;


@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
    //0. 獲取MessageId
    String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    //1. 設定key到Redis
    if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
        //2. 消費訊息
        System.out.println("接收到訊息:" + msg);

        //3. 設定key的value為1
        redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
        //4.  手動ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }else {
        //5. 獲取Redis中的value即可 如果是1,手動ack
        if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

八、RabbitMQ應用


8.1 客戶模組

8.1.1 匯入依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
8.1.2 編寫配置檔案
spring:
  rabbitmq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
8.1.3 編寫配置類
@Configuration
public class RabbitMQConfig {

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("openapi-customer-exchange",true,false);
    }

    @Bean
    public Queue queue(){
        return new Queue("openapi-customer-queue");
    }

    @Bean
    public Binding binding(Queue queue,TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
    }


}
8.1.4 修改Service
//3. 傳送訊息
rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.add",JSON.toJSON(customer));


/*//3. 調用搜索模組,新增資料到ES
        //1. 準備請求引數和請求頭資訊
        String json = JSON.toJSON(customer);
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
        HttpEntity<String> entity = new HttpEntity<>(json,headers);
        //2. 使用RestTemplate調用搜索模組
        restTemplate.postForObject("http://localhost:8080/search/customer/add", entity, String.class);*/

8.2 客戶模組

8.2.1 匯入依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
8.2.2 編寫配置檔案
spring:
  rabbitmq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        acknowledge-mode: manual
8.2.3 編寫配置類
@Configuration
public class RabbitMQConfig {

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("openapi-customer-exchange",true,false);
    }

    @Bean
    public Queue queue(){
        return new Queue("openapi-customer-queue");
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
    }
}
8.2.4 編寫消費者
@Component
public class CustomerListener {

    @Autowired
    private CustomerService customerService;

    @RabbitListener(queues = "openapi-customer-queue")
    public void consume(String json, Channel channel, Message message) throws IOException {
        //1. 獲取RoutingKey
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        //2. 使用switch
        switch (receivedRoutingKey){
            case "openapi.customer.add":
                //3. add操作呼叫Service完成新增
                customerService.saveCustomer(JSON.parseJSON(json, Customer.class));
                //4. 手動ack
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }
    }
}

重點:confirm機制和return機制:

springBoot中應用rabbitmq:

yml檔案配置

server:
  port: 8089
spring:
  rabbitmq:
    host: 47.96.188.147
    port: 5672
    username: test
    password: test
    virtual-host: /
    #新版本使用這個即可,開啟confirm機制
    publisher-confirm-type: correlated
    #老版本的話用下面這個即可 開啟確認模式
    #publisher-confirms: true
    #開啟確認模式 開啟returns模式
    publisher-returns: true
java程式碼部分:
//監聽訊息是否傳送到佇列
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("return 執行了...." + replyText);
    }
});
//監聽訊息是否發到交換機
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b){
            System.out.println("訊息成功傳送");
        }else {
            System.out.println("錯誤原因" + s);
        }
    }
});

也可以使用實現RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnCallback介面,重寫以上兩個方法,完成訊息確認機制