1. 程式人生 > 其它 >spring專案在啟動的時候執行方法初始化

spring專案在啟動的時候執行方法初始化

說明:老專案,使用的是spring 3專案,需要對接RocketMQ,配置完之後,在消費者監聽方法中,發現業務處理service注入不進來,最後檢查發現是因為消費者監聽工具類沒有被正確的初始化,所以它裡邊的業務service注入之後是個null,於是各種折騰,特此記錄一下

方式一:

解決:對需要初始化的類實現InitializingBean介面,重寫afterPropertiesSet()方法,在afterPropertiesSet方法中呼叫需要被初始化的方法

程式碼如下:

import xx.xxx.component.BaseServiceMqConsumer;
import xx.xxx.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;


@DependsOn("RocketMqConfig")
@Component
public class RocketMqConsumerUtil implements InitializingBean
{ private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class); @Autowired private VideoConsumerService videoConsumerService; /** * 接收訊息 */ public void listener(){ // 獲取訊息生產者 DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer(); // 訂閱主體 try { consumer.subscribe(RocketMqUtil.topic, "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * * 預設msgs裡只有一條訊息,可以通過設定consumeMessageBatchMaxSize引數來批量接收訊息 */ public ConsumeConcurrentlyStatus consumeMessage( List
<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); String msg = null; try { msg = new String(messageExt.getBody(),"utf-8"); } catch (UnsupportedEncodingException e) { log.error("訊息編碼失敗,MsgBody:{}",new String(messageExt.getBody())); e.printStackTrace(); } log.info("消費開始-MsgBody:{}",msg); // String msg = new String(messageExt.getBody()); // log.info("MsgBody:{}",new String(messageExt.getBody())); if (messageExt.getTopic().equals(RocketMqUtil.topic)) { // topic的消費邏輯 if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) { // 根據Tag消費訊息,具體消費訊息的業務方法 videoConsumerService.dealVideoMsg(msg); } } else if (messageExt.getTopic().equals("TopicTest2")) { // 執行TopicTest2的消費邏輯 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可
<br> */ consumer.start(); log.info("rocketmq-consumer 啟動成功---------------------------------------"); } catch (MQClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void afterPropertiesSet() throws Exception { listener();//呼叫需要被初始化的方法 } }

方式二:

使用註解@PostContruct 指定需要被初始化執行的方法

package net.greatsoft.xxx.utils;

import xxx.xxx.component.BaseServiceMqConsumer;
import net.greatsoft.xxx.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;


@DependsOn("RocketMqConfig")
@Component
public class RocketMqConsumerUtil  {

    private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

    @Autowired
    private VideoConsumerService videoConsumerService;

    /**
     * 接收訊息8
     */
    @PostConstruct
    public void listener(){

        // 獲取訊息生產者
        DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

        // 訂閱主體
        try {
            consumer.subscribe(RocketMqUtil.topic, "*");

            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 預設msgs裡只有一條訊息,可以通過設定consumeMessageBatchMaxSize引數來批量接收訊息
                 */
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt messageExt = msgs.get(0);
                    String msg = null;
                    try {
                        msg = new String(messageExt.getBody(),"utf-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("訊息編碼失敗,MsgBody:{}",new String(messageExt.getBody()));
                        e.printStackTrace();
                    }
                    log.info("消費開始-MsgBody:{}",msg);
                    if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
                        // topic的消費邏輯
                        if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
                            // 根據Tag消費訊息,具體消費訊息的業務方法
                            videoConsumerService.dealVideoMsg(msg);
                        }
                    } else if (messageExt.getTopic().equals("TopicTest2")) {
                        // 執行TopicTest2的消費邏輯
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可<br>
             */
            consumer.start();
            log.info("rocketmq-consumer 啟動成功---------------------------------------");
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }


}

方式三:

在spring的xml配置檔案中使用 <Bean>的init 屬性來執行初始化的Bean

    <bean id="rocketMqConsumerUtil" class="xx.xxx.utils.RocketMqConsumerUtil"
    scope="singleton" init-method="listener"/>
package net.greatsoft.jinNanHealth.utils;

import net.greatsoft.jinNanHealth.component.BaseServiceMqConsumer;
import net.greatsoft.jinNanHealth.service.VideoConsumerService;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author xc
 * @date 2020-07-23
 */
@DependsOn("RocketMqUtil")
@Component
public class RocketMqConsumerUtil  {

    private static Logger log = LoggerFactory.getLogger(RocketMqConsumerUtil.class);

    @Autowired
    private VideoConsumerService videoConsumerService;

    /**
     * 接收訊息8
     */
    public void listener(){

        // 獲取訊息生產者
        DefaultMQPushConsumer consumer = BaseServiceMqConsumer.getDefaultMQPushConsumer();

        // 訂閱主體
        try {
            consumer.subscribe(RocketMqUtil.topic, "*");

            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 預設msgs裡只有一條訊息,可以通過設定consumeMessageBatchMaxSize引數來批量接收訊息
                 */
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt messageExt = msgs.get(0);
                    String msg = null;
                    try {
                        msg = new String(messageExt.getBody(),"utf-8");
                    } catch (UnsupportedEncodingException e) {
                        log.error("訊息編碼失敗,MsgBody:{}",new String(messageExt.getBody()));
                        e.printStackTrace();
                    }
             log.info("消費開始-MsgBody:{}",msg);
                    if (messageExt.getTopic().equals(RocketMqUtil.topic)) {
                        // topic的消費邏輯
                        if (messageExt.getTags() != null && messageExt.getTags().equals(RocketMqUtil.tag)) {
                            // 根據Tag消費訊息,具體消費訊息的業務方法
                            videoConsumerService.dealVideoMsg(msg);
                        }
                    } else if (messageExt.getTopic().equals("TopicTest2")) {
                        // 執行TopicTest2的消費邏輯
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可<br>
             */
            consumer.start();
            log.info("rocketmq-consumer 啟動成功---------------------------------------");
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }


}