1. 程式人生 > >springboot 整合 activemq:傳送自定義物件以及失敗訊息重試

springboot 整合 activemq:傳送自定義物件以及失敗訊息重試

                <dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-activemq</artifactId>
			<version>2.0.0.BUILD-SNAPSHOT</version>
		</dependency>
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MessageConfig {
    @Bean
    public ActiveMQQueue queue(){
        return new ActiveMQQueue("m2m_queue");
    }
    @Bean
    public ActiveMQTopic topic(){
        return new ActiveMQTopic("topic_queue");
    }

    /**
     * 訊息重試配置項
     * @return
     */
    @Bean
    public RedeliveryPolicy redeliveryPolicy(){
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setUseExponentialBackOff(true);//是否在每次失敗重發是,增長等待時間
        redeliveryPolicy.setMaximumRedeliveryDelay(-1);//設定重發最大拖延時間,-1 表示沒有拖延,只有setUseExponentialBackOff(true)時生效
        redeliveryPolicy.setMaximumRedeliveries(10);//重發次數
        redeliveryPolicy.setInitialRedeliveryDelay(1);//重發時間間隔
        redeliveryPolicy.setBackOffMultiplier(2);//第一次失敗後重發前等待500毫秒,第二次500*2,依次遞增
        redeliveryPolicy.setUseCollisionAvoidance(false);//是否避免訊息碰撞
        return redeliveryPolicy;
    }

    @Bean
    public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", url);
        factory.setRedeliveryPolicy(redeliveryPolicy);
        return factory;
    }

    @Bean
    public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory){
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setDeliveryMode(2);//設定持久化,1 非持久, 2 持久化
        jmsTemplate.setConnectionFactory(factory);
        /**
          SESSION_TRANSACTED = 0  事物提交併確認
          AUTO_ACKNOWLEDGE = 1    自動確認

                  CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
                  DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
                 INDIVIDUAL_ACKNOWLEDGE = 4    單條訊息確認 activemq 獨有                 */
        jmsTemplate.setSessionAcknowledgeMode(4);//訊息確認模式 return jmsTemplate; } @Bean("jmsListener") public DefaultJmsListenerContainerFactory listener(ActiveMQConnectionFactory factory){ DefaultJmsListenerContainerFactory listener = new DefaultJmsListenerContainerFactory(); listener.setConnectionFactory(factory); listener.setConcurrency("1-10");//設定連線數 listener.setRecoveryInterval(1000L);//重連間隔時間 listener.setSessionAcknowledgeMode(4); return listener; } }

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Topic;
import java.util.Date;
import java.util.List;


@Service
public class Proudcer {
    private final static Logger log= LoggerFactory.getLogger(Proudcer .class);
    @Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    private Queue queue;
    @Autowired
    private Topic topic;
 
    private void sendMessage() {
        String name="";
        try {
            name=queue.getQueueName();
            TbItem tbItem = new TbItem();
            tbItem.setId(1L);
            tbItem.setImage("image");
            jmsTemplate.send(name, session -> session.createObjectMessage(tbItem));
        } catch (JMSException e) {
            e.printStackTrace();
        }      
    }
  
}

先看jmsTemplate.send(name ,session ->session.createObjectMessage(tbItem));的原始碼(這裡使用了lambda表示式)等價於

            jmsTemplate.send(name, new MessageCreator() {
                @Override
                //Session 物件是一個包含訊息生產和消費的上下文應用(簡而言之就是包含了生產者和消費者各種資訊)
                public Message createMessage(Session session) throws JMSException {
                    return session.createObjectMessage(tbItem);//將自定義物件包裹進ActiveMQObjectMessage物件中
                }
            });
原始碼:  
        @Override
	public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
		....//省略
	}

在activemq中有一個實體類ActiveMQObjectMessage的setObject 方法,就是將自定義的物件封裝起來。

public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {

     ....

     public void setObject(Serializable newObject) throws JMSException {
        checkReadOnlyBody();
        this.object = newObject;
        setContent(null);
        ActiveMQConnection connection = getConnection();
        if (connection == null || !connection.isObjectMessageSerializationDefered()) {
            storeContent();
        }
    }
}

實體bean(必須實現Serializable)

public class TbItem implements Serializable{

    private Long id;

    private String image;

} 

消費者(監聽者)


import com.cn.common.pojo.TbItem;
import org.apache.activemq.Message;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.Session;

@Component
public class ItemESMessageListener {

	private final static Logger log= LoggerFactory.getLogger(ItemESMessageListener.class);
	
	@JmsListener(destination="m2m_queue",containerFactory = "jmsListener")
	public void onMessage(Message message,Session session) throws JMSException {
		try {
			log.info("=======接收到訊息=={}",message);
			if (message instanceof ActiveMQObjectMessage){
				ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
				TbItem tbItem = (TbItem)objectMessage.getObject();
				String image = tbItem.getImage();
				Long id = tbItem.getId();
				log.info("===========image={}",image);
                              //如果執行了這個,那麼即使後面的程式碼丟擲了異常在catch塊中執行session.recover()方法也不會重試,所以
                              //這個方法要放在最後執行
                                objectMessage.acknowledge();
                         }                         .....//其它程式碼       
                 } catch (Exception e) {        e.printStackTrace();
                        session.recover();
                 }}}

啟動專案發現報錯 This class is not trusted to be serialized as ObjectMessage payload


原因:在原始碼中為了避免收到惡意程式碼,引入了安全機制,只允許指定的包裡的物件能夠被傳輸。所以也就是說自定義的物件不在預設的指定包裡。

解決辦法:在上面配置 ActiveMQConnectionFactory時,加入factory.setTrustAllPackages(true)

    @Bean
    public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", url);
        factory.setRedeliveryPolicy(redeliveryPolicy);
        factory.setTrustAllPackages(true);
        return factory;
    }