1. 程式人生 > 實用技巧 >使用RabbitMQ實現分散式事務

使用RabbitMQ實現分散式事務

  基於SOA理念的微服務越來越流行,甚至一些區域網部署的專案也採用微服務架構。微服務的好處很多,但同時也帶來了很多新的問題,分散式事務便是其中一個,出現問題後自然也會出現解決辦法,比如兩段提交、三段提交等。

  用RabbitMQ實現分散式事務主要是利用訊息確認機制,以及後期補償措施。訊息確認有3個部分:訊息傳送到交換機的確認、交換機路由訊息到佇列的確認、消費者消費完訊息的確認。訊息確認機制可以保證訊息沒有丟失,針對於事務的一致性可以通過補償的措施完成。

一、安裝RabbitMQ並啟動

訪問http://127.0.0.1:15672,對mq進行各種配置或操作

二、搭建訊息釋出端

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

  以話題模式建立mq,建立topic.ab topic.abc兩個話題,建立topic.exchange交換機,路由鍵支援固定的某個話題,或者*匹配一個單詞,或者#匹配多個單詞,這裡展示了第一種、第三種兩種方式。通過設定mq的PublisherConfirms與PublisherReturns屬性監聽訊息,可以在回撥事件裡實現自己的處理邏輯,比如根據資料標識把訊息重發或者異常處理。

@Configuration
public
class TopicRabbitConfig { //話題 public final static String TOPIC_AB = "topic.ab"; public final static String TOPIC_ABC = "topic.abc"; public final static String TOPIC_EXCHANGE = "topic.exchange"; /** * 建立佇列 */ @Bean public Queue firstQueue() { return new Queue(TOPIC_AB); } @Bean
public Queue secondQueue() { return new Queue(TOPIC_ABC); } /** * 建立交換機 */ @Bean TopicExchange exchange() { return new TopicExchange(TOPIC_EXCHANGE); } /** * 將firstQueue和topicExchange繫結,路由鍵值為topic.ab * 路由鍵是topic.ab的訊息才會分發到該佇列 */ @Bean Binding bindingFirstQueue() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(TOPIC_AB); } /** * 將secondQueue和topicExchange繫結,通配路由鍵規則topic.# * 路由鍵是topic.開頭的訊息都會分發到該佇列 */ @Bean Binding bindingSecondQueue() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } @Bean public RabbitTemplate createRabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); //增加connection數 https://www.jianshu.com/p/6579e48d18ae connectionFactory.setChannelCacheSize(100); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMandatory(true); //訊息是否能投遞到交換機反饋 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println(String.format("ConfirmCallback 資料標識:%s,是否成功:%s,失敗原因:%s", correlationData.getId(), ack, cause)); } } }); //訊息是否能投遞到佇列反饋 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println(String.format("ReturnCallback 迴應碼:%s,迴應資訊:%s,交換機:%s,路由鍵:%s,資料標識:%s", replyCode, replyText, exchange, routingKey, message.getMessageProperties().getMessageId())); } }); return rabbitTemplate; } }

  實現一個傳送訊息的介面,分別模擬傳送異常訊息,測試訊息反饋

  傳送到交換機異常:ConfirmCallback 資料標識:8f276e00-0a0f-4302-87c2-818fe827712e,是否成功:false,失敗原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'eee' in vhost '/', class-id=60, method-id=40)

  傳送到佇列異常:ReturnCallback 迴應碼:312,迴應資訊:NO_ROUTE,交換機:topic.exchange,路由鍵:ccc,資料標識:dd022151-1029-418e-a75d-4db642b98aaa

@RestController
public class RabbitmqController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendTopicMessage")
    public String sendTopicMessage1() {
        try {
            Employee employee = new Employee(1, "join", 12);
            String msgId = UUID.randomUUID().toString();

            ObjectMapper mapper = new ObjectMapper();
            String messaged = mapper.writeValueAsString(employee);
            Message message = MessageBuilder.withBody(messaged.getBytes()).setMessageId(msgId).build();
            CorrelationData correlationData = new CorrelationData(msgId);
       //模擬傳送到交換機異常
// rabbitTemplate.send("eee", TopicRabbitConfig.TOPIC_AB, message, correlationData);
       //模擬傳送到佇列異常
// rabbitTemplate.send(TopicRabbitConfig.TOPIC_EXCHANGE, "ccc", message, correlationData); rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE, TopicRabbitConfig.TOPIC_AB, message, correlationData); } catch (Exception e) { e.printStackTrace(); } return "ok"; } }
@Data
@AllArgsConstructor
public class Employee implements Serializable {
    private Integer id;
    private String name;
    private Integer age;
}

三、搭建訊息消費端

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

  設定監聽容器

@Configuration
public class TopicRabbitConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    @Autowired
    private MyAckReceiver myAckReceiver;

    //話題
    public final static String TOPIC_AB = "topic.ab";
    public final static String TOPIC_ABC = "topic.abc";

    public final static String TOPIC_EXCHANGE = "topic.exchange";

    /**
     * 建立佇列
     */
    @Bean
    public Queue firstQueue() {
        return new Queue(TOPIC_AB);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(TOPIC_ABC);
    }

    /**
     * 建立交換機
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    /**
     * 將firstQueue和topicExchange繫結,路由鍵值為topic.ab
     * 路由鍵是topic.ab的訊息才會分發到該佇列
     */
    @Bean
    Binding bindingFirstQueue() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with(TOPIC_AB);
    }

    /**
     * 將secondQueue和topicExchange繫結,通配路由鍵規則topic.#
     * 路由鍵是topic.開頭的訊息都會分發到該佇列
     */
    @Bean
    Binding bindingSecondQueue() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // RabbitMQ預設是自動確認,這裡改為手動確認訊息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
     //需要監聽的佇列 container.setQueueNames(TOPIC_AB, TOPIC_ABC); container.setMessageListener(myAckReceiver);
return container; } }

  建立監聽事件

  basicAck:對消費的訊息進行確認,確認後訊息消費成功,如果不確認的話會一直阻塞住,直到殺死程式訊息恢復到傳送佇列。

  basicReject:對於正在處理的訊息,但是消費異常或流程異常不能繼續執行,可以設定true重新把訊息新增到佇列或設定false把訊息丟掉。

  對於這兩種模式可以靈活運用,結合業務對資料進行補償或回滾。

@Component
public class MyAckReceiver implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            if (message.getMessageProperties().getConsumerQueue().equals(TopicRabbitConfig.TOPIC_AB)) {
                ObjectMapper mapper = new ObjectMapper();
                String messaged = new String(message.getBody());
                Employee student = mapper.readValue(messaged.getBytes("utf-8"), Employee.class);
                System.out.println("MyAckReceiver: TOPIC_AB " + JSON.toJSONString(student));
            }
            if (message.getMessageProperties().getConsumerQueue().equals(TopicRabbitConfig.TOPIC_ABC)) {
                ObjectMapper mapper = new ObjectMapper();
                String messaged = new String(message.getBody());
                Employee student = mapper.readValue(messaged.getBytes("utf-8"), Employee.class);
                System.out.println("MyAckReceiver: TOPIC_ABC " + JSON.toJSONString(student));
            }
            //沒有確認時候 關閉程式後 訊息自動恢復到佇列
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            e.printStackTrace();
            channel.basicReject(deliveryTag, false);//收到訊息了 沒處理 但不放回佇列
        }
    }
}