1. 程式人生 > >Java 小記 — RabbitMQ 的實踐與思考

Java 小記 — RabbitMQ 的實踐與思考

oot body 可能 技術 業務場景 www. 一個 ava sync

前言

本篇隨筆將匯總一些我對消息隊列 RabbitMQ 的認識,順便談談其在高並發和秒殺系統中的具體應用。
技術分享圖片

1. 預備示例

想了下,還是先拋出一個簡單示例,隨後再根據其具體應用場景進行擴展,我覺得這樣表述條理更清晰些。

RabbitConfig:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }
}

Client:

@Component
public class Client {

    @Autowired
private RabbitTemplate rabbitTemplate; public void sendCall(String content) { for (int i = 0; i < 10000; i++) { String message = i + "-" + content; System.out.println(String.format("Sender: %s", message)); rabbitTemplate.convertAndSend
(MQConstant.CALL, message); } } }

Server:

@Component
public class Server {

    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(String.format("Receiver: reply(
\"%s\") Yes, I just saw your message!", message)); } }

Result:

Sender: Hello, are you there!
Receiver: reply("Hello, are you there!") Yes, I just saw your message!

以上示例會在 rabbitmq 中創建一條隊列 CALL, 消息在其中等待消費:
技術分享圖片

在此基礎上的簡單擴展我就不再寫案例了,比如領域模塊完成了其核心業務規則之後可能需要更新緩存、寫個郵件、記個復雜日誌、做個統計報表等等,這些不需要及時反饋或者耗時的附屬業務都可以通過異步隊列分發,以此來提升核心業務的響應速度,同時如此處理能讓領域邊界更加清晰,代碼的可維護性和持續拓展的能力也會有所提升。

2. 削峰

上個示例中我提到的應用場景是解耦和通知,再接著擴展,因其具備良好的緩沖性質,所以還有一個非常適合的應用場景那就是削峰。對於突如其來的極高並發請求,我們可以先瞬速地將其加入隊列並回復用戶一個友好提示,然後服務器可在其能承受的範圍內慢慢處理,以此來防止突發的 CPU 和內存 “爆表”。

改造之後對於發送方來說當然是比較爽的,他只是將請求加入消息隊列而已,處理壓力都歸到了消費端。接著思考,這樣處理有沒有副作用?如果這個請求剛好是線程阻塞的,那還要加入隊列慢慢排隊處理,那不是完蛋了,用戶要猴年馬月才能得到反饋?所以針對此,我覺得應該將消費端的方法改為異步調用(即多線程)以提升吞吐量,在 Spring Boot 中的寫法也非常簡單:

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
    }

}

參照示例一的方法,我發布了 10000 條消息加入隊列,且消費端的調用每次阻塞一秒,那可有意思了,什麽時候能處理完?但如果開幾百個線程同時處理的話,那幾十秒就夠了,當然具體多少合適還應根據具體的業務場景和服務器配置酌情考慮。另外,別忘了配線程池:

@Configuration
public class AsyncConfig {

    @Bean
    public Executor asyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(500);
        executor.setQueueCapacity(10);

        executor.setThreadNamePrefix("MyExecutor-");

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

3. Exchange

RabbitMQ 可能為 N 個應用同時提供服務,要是你和你的藍顏知己突然心有靈犀,在不同的業務上使用了同一個 routingKey,想想就刺激。因此,隊列多了自然要進行分組管理,限定好 Exchange 的規則,接下來就可以獨自玩耍了。

MQConstant:

public class MQConstant {

    public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE";

    public static final String CALL = MQConstant.EXCHANGE + ".CALL";

    public static final String ALL = MQConstant.EXCHANGE + ".#";
}

RabbitConfig:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MQConstant.EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL);
    }
}

此時我們再去查隊列 CALL,可以看到已經綁定了Exchange:
技術分享圖片

當然 Exchange 的作用遠不止如此,以上示例為 Topic 模式,除此之外還有 Direct、Headers 和 Fanout 模式,寫法都差不多,感興趣的童鞋可以去查看 “官方文檔” 進行更深入了解。

4. 延時隊列

延時任務的場景相信小夥伴們都接觸過,特別是搶購的時候,在規定時間內未付款訂單就被回收了。微信支付的 API 裏面也有一個支付完成後的延時再確認消息推送,實現原理應該都差不多。

利用 RabbitMQ 實現該功能首先要了解他的兩個特性,分別是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解個大概,一個是生存時間,一個是死信。整個過程也很容易理解,TTL 相當於一個緩沖隊列,等待其過期之後消息會由 DLX 轉發到實際消費隊列,如此便實現了他的延時過程。

MQConstant:

public class MQConstant {

    public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE";

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";

    public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL";

    public static final String CALL = "CALL";

}

ExpirationMessagePostProcessor:

public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl;

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
                .setExpiration(ttl.toString());
        return message;
    }
}

Client:

@Component
public class Client {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCall(String content) {
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 5000;
            String message = i + "-" + content;
            System.out.println(String.format("Sender: %s", message));
            rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration));

        }
    }
}

Server:

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date());
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date));
    }

}

Result:

Sender: 1-Hello, are you there!
Sender: 2-Hello, are you there!
Sender: 3-Hello, are you there!
Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12
Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17
Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22

結果一目了然,分別在隊列中延遲了 5秒,10秒,15秒,當然,以上只是我的簡單示例,童鞋們可翻閱官方文檔(“ ttl ” && “ dlx ”)進一步深入學習。

結語

本篇隨筆不該就這麽結束,但晚上心情不好,百感交集,無法繼續寫作,無奈至此。近期正在尋覓新的工作機會,我的微信:youclk,無論有沒有推薦的,給我點鼓勵,謝謝!

Java 小記 — RabbitMQ 的實踐與思考