1. 程式人生 > >第4篇 rabbitmq可靠確認模式的java封裝及示例

第4篇 rabbitmq可靠確認模式的java封裝及示例

對rabbitmq的封裝,有幾個目標:
1 提供send介面
2 提供consume介面
3 保證訊息的事務性處理

所謂事務性處理,是指對一個訊息的處理必須嚴格可控,必須滿足原子性,只有兩種可能的處理結果:
(1) 處理成功,從佇列中刪除訊息
(2) 處理失敗(網路問題,程式問題,服務掛了),將訊息重新放回佇列
為了做到這點,我們使用rabbitmq的手動ack模式,這個後面細說。

1 send介面

public interface MessageSender {    
    DetailRes send(Object message);
}

send介面相對簡單,我們使用spring的RabbitTemplate來實現,程式碼如下:

//1 構造template, exchange, routingkey等
//2 設定message序列化方法
//3 設定傳送確認
//4 構造sender方法
public MessageSender buildMessageSender(final String exchange, final String routingKey, final String queue) throws IOException, TimeoutException {
    Connection connection = connectionFactory.createConnection();
    //1
    buildQueue(exchange, routingKey, queue, connection);
    final
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setExchange(exchange); rabbitTemplate.setRoutingKey(routingKey); //2 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //3 rabbitTemplate.setConfirmCallback(new
RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.info("send message failed: " + cause); //+ correlationData.toString()); throw new RuntimeException("send error " + cause); } } }); //4 return new MessageSender() { @Override public DetailRes send(Object message) { try { rabbitTemplate.convertAndSend(message); } catch (RuntimeException e) { e.printStackTrace(); log.info("send failed " + e); try { //retry rabbitTemplate.convertAndSend(message); } catch (RuntimeException error) { error.printStackTrace(); log.info("send failed again " + error); return new DetailRes(false, error.toString()); } } return new DetailRes(true, ""); } }; }

2 consume介面

public interface MessageConsumer {    
    DetailRes consume();
}

在consume介面中,會呼叫使用者自己的MessageProcess,介面定義如下:

public interface MessageProcess<T> {    
    DetailRes process(T message);
}

consume的實現相對來說複雜一點,程式碼如下:

//1 建立連線和channel
//2 設定message序列化方法
//3 構造consumer
public <T> MessageConsumer buildMessageConsumer(String exchange, String routingKey,
                                                final String queue, final MessageProcess<T> messageProcess) throws IOException {
    final Connection connection = connectionFactory.createConnection();

    //1
    buildQueue(exchange, routingKey, queue, connection);

    //2
    final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    final MessageConverter messageConverter = new Jackson2JsonMessageConverter();

    //3
    return new MessageConsumer() {
        QueueingConsumer consumer;

        {
            consumer = buildQueueConsumer(connection, queue);
        }

        @Override
        //1 通過delivery獲取原始資料
        //2 將原始資料轉換為特定型別的包
        //3 處理資料
        //4 手動傳送ack確認
        public DetailRes consume() {
            QueueingConsumer.Delivery delivery = null;
            Channel channel = consumer.getChannel();

            try {
                //1
                delivery = consumer.nextDelivery();
                Message message = new Message(delivery.getBody(),
                        messagePropertiesConverter.toMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8"));

                //2
                @SuppressWarnings("unchecked")
                T messageBean = (T) messageConverter.fromMessage(message);

                //3
                DetailRes detailRes = messageProcess.process(messageBean);

                //4
                if (detailRes.isSuccess()) {
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } else {
                    log.info("send message failed: " + detailRes.getErrMsg());
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                }

                return detailRes;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return new DetailRes(false, "interrupted exception " + e.toString());
            } catch (IOException e) {
                e.printStackTrace();
                retry(delivery, channel);
                log.info("io exception : " + e);

                return new DetailRes(false, "io exception " + e.toString());
            } catch (ShutdownSignalException e) {
                e.printStackTrace();

                try {
                    channel.close();
                } catch (IOException io) {
                    io.printStackTrace();
                } catch (TimeoutException timeout) {
                    timeout.printStackTrace();
                }

                consumer = buildQueueConsumer(connection, queue);

                return new DetailRes(false, "shutdown exception " + e.toString());
            } catch (Exception e) {
                e.printStackTrace();
                log.info("exception : " + e);
                retry(delivery, channel);

                return new DetailRes(false, "exception " + e.toString());
            }
        }
    };
}

3 保證訊息的事務性處理
rabbitmq預設的處理方式為auto ack,這意味著當你從訊息佇列取出一個訊息時,ack自動傳送,mq就會將訊息刪除。而為了保證訊息的正確處理,我們需要將訊息處理修改為手動確認的方式。
(1) sender的手工確認模式
首先將ConnectionFactory的模式設定為publisherConfirms,如下

connectionFactory.setPublisherConfirms(true);

之後設定rabbitTemplate的confirmCallback,如下:

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
       if (!ack) {
           log.info("send message failed: " + cause); //+ correlationData.toString());
           throw new RuntimeException("send error " + cause);
       }
    }
});

(2) consume的手工確認模式
首先在queue建立中指定模式

channel.exchangeDeclare(exchange, "direct", true, false, null);
/**
 * Declare a queue
 * @see com.rabbitmq.client.AMQP.Queue.Declare
 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 * @param queue the name of the queue
 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
 * @param arguments other properties (construction arguments) for the queue
 * @return a declaration-confirm method to indicate the queue was successfully declared
 * @throws java.io.IOException if an error is encountered
 */
channel.queueDeclare(queue, true, false, false, null);

只有在訊息處理成功後傳送ack確認,或失敗後傳送nack使資訊重新投遞

if (detailRes.isSuccess()) {
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
    log.info("send message failed: " + detailRes.getErrMsg());
    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}

4 自動重連機制
為了保證rabbitmq的高可用性,我們使用rabbitmq Cluster模式,並配合haproxy。這樣,在一臺機器down掉時或者網路發生抖動時,就會發生當前連線失敗的情況,如果不對這種情況做處理,就會造成當前的服務不可用。
在spring-rabbitmq中,已實現了connection的自動重連,但是connection重連後,channel的狀態並不正確。因此我們需要自己捕捉ShutdownSignalException異常,並重新生成channel。如下:

catch (ShutdownSignalException e) {
    e.printStackTrace();
    channel.close();
    //recreate channel
    consumer = buildQueueConsumer(connection, queue);
}

5 consumer執行緒池
在對訊息處理的過程中,我們期望多執行緒並行執行來增加效率,因此對consumer做了一個執行緒池的封裝。
執行緒池通過builder模式構造,需要準備如下引數:

//執行緒數量
int threadCount;
//處理間隔(每個執行緒處理完成後休息的時間)
long intervalMils;
//exchange及queue資訊
String exchange;
String routingKey;
String queue;
//使用者自定義處理介面
MessageProcess<T> messageProcess;

核心迴圈也較為簡單,程式碼如下:

public void run() {
    while (!stop) {
        try {
            //2
            DetailRes detailRes = messageConsumer.consume();

            if (infoHolder.intervalMils > 0) {
                try {
                    Thread.sleep(infoHolder.intervalMils);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.info("interrupt " + e);
                }
            }

            if (!detailRes.isSuccess()) {
                log.info("run error " + detailRes.getErrMsg());
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.info("run exception " + e);
        }
    }
}

6 使用示例
最後,我們還是用一個例子做結。
(1) 定義model

//參考lombok
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
    int id;
    String name;
}

(2) rabbitmq配置
配置我們使用@Configuration實現,如下:

@Configuration
public class RabbitMQConf {
    @Bean
    ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);

        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setPublisherConfirms(true); // enable confirm mode

        return connectionFactory;
    }
}

(3) sender示例

@Service
public class SenderExample {
    private static final String EXCHANGE = "example";
    private static final String ROUTING = "user-example";
    private static final String QUEUE = "user-example";

    @Autowired
    ConnectionFactory connectionFactory;

    private MessageSender messageSender;

    @PostConstruct
    public void init() throws IOException, TimeoutException {
        MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
        messageSender = mqAccessBuilder.buildMessageSender(EXCHANGE, ROUTING, QUEUE);
    }

    public DetailRes send(UserMessage userMessage) {
        return messageSender.send(userMessage);
    }
}

(4) MessageProcess(使用者自定義處理介面)示例,本例中我們只是簡單的將資訊打印出來

public class UserMessageProcess implements MessageProcess<UserMessage> {
    @Override
    public DetailRes process(UserMessage userMessage) {
        System.out.println(userMessage);

        return new DetailRes(true, "");
    }
}

(5) consumer示例

@Service
public class ConsumerExample {
    private static final String EXCHANGE = "example";
    private static final String ROUTING = "user-example";
    private static final String QUEUE = "user-example";

    @Autowired
    ConnectionFactory connectionFactory;

    private MessageConsumer messageConsumer;

    @PostConstruct
    public void init() throws IOException, TimeoutException {
        MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
        messageConsumer = mqAccessBuilder.buildMessageConsumer(EXCHANGE, ROUTING, QUEUE, new UserMessageProcess());
    }

    public DetailRes consume() {
        return messageConsumer.consume();
    }
}

(6) 執行緒池consumer示例
在main函式中,我們使用一個獨立執行緒傳送資料,並使用執行緒池接收資料。

@Service
public class PoolExample {
    private static final String EXCHANGE = "example";
    private static final String ROUTING = "user-example";
    private static final String QUEUE = "user-example";

    @Autowired
    ConnectionFactory connectionFactory;

    private ThreadPoolConsumer<UserMessage> threadPoolConsumer;

    @PostConstruct
    public void init() {
        MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
        MessageProcess<UserMessage> messageProcess = new UserMessageProcess();

        threadPoolConsumer = new ThreadPoolConsumer.ThreadPoolConsumerBuilder<UserMessage>()
                .setThreadCount(Constants.THREAD_COUNT).setIntervalMils(Constants.INTERVAL_MILS)
                .setExchange(EXCHANGE).setRoutingKey(ROUTING).setQueue(QUEUE)
                .setMQAccessBuilder(mqAccessBuilder).setMessageProcess(messageProcess)
                .build();
    }

    public void start() throws IOException {
        threadPoolConsumer.start();
    }

    public void stop() {
        threadPoolConsumer.stop();
    }

    public static void main(String[] args) throws IOException {
        ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
        PoolExample poolExample = ac.getBean(PoolExample.class);
        final SenderExample senderExample = ac.getBean(SenderExample.class);

        poolExample.start();

        new Thread(new Runnable() {
            int id = 0;

            @Override
            public void run() {
                while (true) {
                    senderExample.send(new UserMessage(id++, "" + System.nanoTime()));

                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

7 github地址