1. 程式人生 > >rabbitmq系列(三)訊息冪等性處理

rabbitmq系列(三)訊息冪等性處理

一、springboot整合rabbitmq

  1. 我們需要新建兩個工程,一個作為生產者,另一個作為消費者。在pom.xml中新增amqp依賴:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在application.yml檔案中新增rabbitmq的相關資訊:
spring:
  rabbitmq:
    # 連線地址
    host: 127.0.0.1
    # 埠
    port: 5672
    # 登入賬號
    username: guest
    # 登入密碼
    password: guest
    # 虛擬主機
    virtual-host: /
  1. 在生產者工程中新建配置項rabbitmqConfig.java,申明名稱為”byte-zb“直連交換機和佇列,使用”byte-zb“的routing-key將佇列和交換機繫結,程式碼如下:
@Configuration
public class RabbitConfig {

    public static final String QUEUE_NAME = "byte-zb";

    public static final String EXCHANGE_NAME = "byte-zb";

    public static final String ROUTING_KEY = "byte-zb";

    // 佇列申明
    @Bean
    public Queue queue(){
        return new Queue(QUEUE_NAME);
    }

    // 申明交換機
    @Bean
    public DirectExchange directExchange(){

        return new DirectExchange(EXCHANGE_NAME);
    }

    // 資料繫結申明
    @Bean
    public Binding directBinding(){

        return BindingBuilder.bind(queue()).to(directExchange()).with(ROUTING_KEY);
    }
}
  1. 建立生產者傳送一條訊息,程式碼如下:
@RestController
public class Producer {
    public static final String QUEUE_NAME = "byte-zb";

    public static final String EXCHANGE_NAME = "byte-zb";

    @Autowired
    private AmqpTemplate amqpTemplate;

    @RequestMapping("/send")
    public void sendMessage(){

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email","11111111111");
        jsonObject.put("timestamp",System.currentTimeMillis());
        String json = jsonObject.toJSONString();
        System.out.println(json);
        amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,json);
    }

}
  1. 在消費者工程裡建立消費者消費訊息,程式碼如下:
@Component
public class Consumer throws Exception{

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message){

        System.out.println("接收到的訊息為"+message);
    }
}

我們啟動生產者,然後請求send介面,然後開啟rabbitmq控制檯發現多了一個名為”byte-zb“的交換機和佇列,並且佇列中出現了一個未消費的訊息,然後啟動消費者,我們會在控制檯上發現列印了一條訊息,同時rabbitmq控制檯中”byte-zb“的佇列中訊息沒有了。

二、自動補償機制

如果消費者訊息消費不成功的話,會出現什麼情況呢?我們修改一下消費者程式碼,然後看看。

@Component
public class Consumer {

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) throws Exception {

        System.out.println("接收到的訊息為"+message);
        int i = 1 / 0;
    }
}

我們會看到消費者工程控制檯一直在重新整理報錯,當消費者配出異常,也就是說當訊息消費不成功的話,該訊息會存放在rabbitmq的服務端,一直進行重試,直到不丟擲異常為止。

如果一直拋異常,我們的服務很容易掛掉,那有沒有辦法控制重試幾次不成功就不再重試了呢?答案是有的。我們在消費者application.yml中增加一段配置。

spring:
  rabbitmq:
    # 連線地址
    host: 127.0.0.1
    # 埠
    port: 5672
    # 登入賬號
    username: guest
    # 登入密碼
    password: guest
    # 虛擬主機
    virtual-host: /
    listener:
      simple:
        retry:
          enabled: true # 開啟消費者進行重試
          max-attempts: 5 # 最大重試次數
          initial-interval: 3000 # 重試時間間隔

上面配置的意思是消費異常後,重試五次,每次隔3s。繼續啟動消費者看看效果,我們發現重試五次以後,就不再重試了。

三、結合實際案例來使用訊息補償機制

像上面那種情況出現的異常其實不管怎麼重試都不會成功,實際上用到訊息補償的就是呼叫第三方介面的這種。

案例:生者往佇列中扔一條訊息,包含郵箱和傳送內容。消費者拿到訊息後將呼叫郵件介面傳送郵件。有時候可能郵件介面由於網路等原因不通,這時候就需要去重試了。

在呼叫介面的工具類中,如果出現異常我們直接返回null,工具類具體程式碼就不貼了,如果返回null之後怎麼處理呢?我們只需要丟擲異常,rabbitListener捕獲到異常後就會自動重試。

我們改造一下消費者程式碼:

@Component
public class Consumer {

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) throws Exception {

        System.out.println("接收到的訊息為"+message);
        JSONObject jsonObject = JSONObject.parseObject(message);
        String email = jsonObject.getString("email");
        String content = jsonObject.getString("timestamp");

        String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;
        // 如果發生異常則返回null
        String body = HttpUtils.httpGet(httpUrl, "utf-8");
        //
        if(body == null){
            throw new Exception();
        }
    }
}

當然我們可以自定義異常丟擲。具體怎麼試驗呢,第一步啟動生產者和消費者,這時候我們發現消費者在重試,第二步我們啟動郵件服務,這時候我們會發現郵件傳送成功了,消費者不再重試了。

四、解決訊息冪等性問題

一些剛接觸java的同學可能對冪等性不太清楚。冪等性就是重複消費造成結果不一致。為了保證冪等性,因此消費者消費訊息只能消費一次訊息。我麼可以是用全域性的訊息id來控制冪等性。當訊息被消費了之後我們可以選擇快取儲存這個訊息id,然後當再次消費的時候,我們可以查詢快取,如果存在這個訊息id,我們就不錯處理直接return即可。先改造生產者程式碼,在訊息中新增訊息id:

@RequestMapping("/send")
    public void sendMessage(){

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("email","11111111111");
        jsonObject.put("timestamp",System.currentTimeMillis());
        String json = jsonObject.toJSONString();
        System.out.println(json);

            Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
        amqpTemplate.convertAndSend(EXCHANGE_NAME,QUEUE_NAME,message);
    }

消費者程式碼改造:

@Component
public class Consumer {

    public static final String QUEUE_NAME = "byte-zb";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(Message message) throws Exception {

        Jedis jedis = new Jedis("localhost", 6379);

        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"UTF-8");
        System.out.println("接收導的訊息為:"+msg+"==訊息id為:"+messageId);

        String messageIdRedis = jedis.get("messageId");

        if(messageId == messageIdRedis){
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("email");
        String content = jsonObject.getString("timestamp");

        String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;
        // 如果發生異常則返回null
        String body = HttpUtils.httpGet(httpUrl, "utf-8");
        //
        if(body == null){
            throw new Exception();
        }
        jedis.set("messageId",messageId);
    }
}

我們在消費者端使用redis儲存訊息id,只做演示,具體專案請根據實際情況選擇相應的工具進行儲存。

如果文章對您有幫助,請記得點贊關注喲~
歡迎大家關注我的公眾號:位元組傳說,每日推送技術文章供大家學習參考。