springboot 整合 activemq:傳送自定義物件以及失敗訊息重試
阿新 • • 發佈:2018-12-31
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</dependency>
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; @Configuration public class MessageConfig { @Bean public ActiveMQQueue queue(){ return new ActiveMQQueue("m2m_queue"); } @Bean public ActiveMQTopic topic(){ return new ActiveMQTopic("topic_queue"); } /** * 訊息重試配置項 * @return */ @Bean public RedeliveryPolicy redeliveryPolicy(){ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setUseExponentialBackOff(true);//是否在每次失敗重發是,增長等待時間 redeliveryPolicy.setMaximumRedeliveryDelay(-1);//設定重發最大拖延時間,-1 表示沒有拖延,只有setUseExponentialBackOff(true)時生效 redeliveryPolicy.setMaximumRedeliveries(10);//重發次數 redeliveryPolicy.setInitialRedeliveryDelay(1);//重發時間間隔 redeliveryPolicy.setBackOffMultiplier(2);//第一次失敗後重發前等待500毫秒,第二次500*2,依次遞增 redeliveryPolicy.setUseCollisionAvoidance(false);//是否避免訊息碰撞 return redeliveryPolicy; } @Bean public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", url); factory.setRedeliveryPolicy(redeliveryPolicy); return factory; } @Bean public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory){ JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setDeliveryMode(2);//設定持久化,1 非持久, 2 持久化 jmsTemplate.setConnectionFactory(factory); /** SESSION_TRANSACTED = 0 事物提交併確認 AUTO_ACKNOWLEDGE = 1 自動確認
CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
INDIVIDUAL_ACKNOWLEDGE = 4 單條訊息確認 activemq 獨有 */ jmsTemplate.setSessionAcknowledgeMode(4);//訊息確認模式 return jmsTemplate; } @Bean("jmsListener") public DefaultJmsListenerContainerFactory listener(ActiveMQConnectionFactory factory){ DefaultJmsListenerContainerFactory listener = new DefaultJmsListenerContainerFactory(); listener.setConnectionFactory(factory); listener.setConcurrency("1-10");//設定連線數 listener.setRecoveryInterval(1000L);//重連間隔時間 listener.setSessionAcknowledgeMode(4); return listener; } }
import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.Topic; import java.util.Date; import java.util.List; @Service public class Proudcer { private final static Logger log= LoggerFactory.getLogger(Proudcer .class); @Autowired private JmsTemplate jmsTemplate; @Autowired private Queue queue; @Autowired private Topic topic; private void sendMessage() { String name=""; try { name=queue.getQueueName(); TbItem tbItem = new TbItem(); tbItem.setId(1L); tbItem.setImage("image"); jmsTemplate.send(name, session -> session.createObjectMessage(tbItem)); } catch (JMSException e) { e.printStackTrace(); } } }
先看jmsTemplate.send(name ,session ->session.createObjectMessage(tbItem));的原始碼(這裡使用了lambda表示式)等價於
jmsTemplate.send(name, new MessageCreator() {
@Override
//Session 物件是一個包含訊息生產和消費的上下文應用(簡而言之就是包含了生產者和消費者各種資訊)
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(tbItem);//將自定義物件包裹進ActiveMQObjectMessage物件中
}
});
原始碼: @Override
public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {
....//省略
}
在activemq中有一個實體類ActiveMQObjectMessage的setObject 方法,就是將自定義的物件封裝起來。
public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage {
....
public void setObject(Serializable newObject) throws JMSException {
checkReadOnlyBody();
this.object = newObject;
setContent(null);
ActiveMQConnection connection = getConnection();
if (connection == null || !connection.isObjectMessageSerializationDefered()) {
storeContent();
}
}
}
實體bean(必須實現Serializable)
public class TbItem implements Serializable{
private Long id;
private String image;
}
消費者(監聽者)
import com.cn.common.pojo.TbItem;
import org.apache.activemq.Message;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.Session;
@Component
public class ItemESMessageListener {
private final static Logger log= LoggerFactory.getLogger(ItemESMessageListener.class);
@JmsListener(destination="m2m_queue",containerFactory = "jmsListener")
public void onMessage(Message message,Session session) throws JMSException {
try {
log.info("=======接收到訊息=={}",message);
if (message instanceof ActiveMQObjectMessage){
ActiveMQObjectMessage objectMessage=(ActiveMQObjectMessage)message;
TbItem tbItem = (TbItem)objectMessage.getObject();
String image = tbItem.getImage();
Long id = tbItem.getId();
log.info("===========image={}",image);
//如果執行了這個,那麼即使後面的程式碼丟擲了異常在catch塊中執行session.recover()方法也不會重試,所以
//這個方法要放在最後執行
objectMessage.acknowledge();} .....//其它程式碼
} catch (Exception e) { e.printStackTrace();
session.recover();
}}}
啟動專案發現報錯 This class is not trusted to be serialized as ObjectMessage payload
原因:在原始碼中為了避免收到惡意程式碼,引入了安全機制,只允許指定的包裡的物件能夠被傳輸。所以也就是說自定義的物件不在預設的指定包裡。
解決辦法:在上面配置 ActiveMQConnectionFactory時,加入factory.setTrustAllPackages(true)
@Bean
public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", url);
factory.setRedeliveryPolicy(redeliveryPolicy);
factory.setTrustAllPackages(true);
return factory;
}