1. 程式人生 > >阿里雲訊息佇列mq(消費者)如何整合springboot,並能使用services

阿里雲訊息佇列mq(消費者)如何整合springboot,並能使用services

建立一個訊息佇列的資料庫維護表:

CREATE TABLE `consumer_local` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `version` bigint(20) NOT NULL,
  `topic` varchar(30) NOT NULL COMMENT '消費者所屬的主題',
  `consumer_id` varchar(50) NOT NULL COMMENT '消費者Consumer_id',
  `access_key` varchar(60) NOT NULL COMMENT '阿里雲身份驗證',
  `secre_key`
varchar(300) NOT NULL COMMENT '阿里雲身份驗證,在阿里雲伺服器管理控制檯建立', `ons_address` varchar(300) NOT NULL COMMENT '設定的TCP接入域名', `suspend_time_millis` varchar(10) NOT NULL COMMENT '順序訊息消費失敗進行重試前的等待時間(毫秒)', `max_reconsume_times` varchar(10) NOT NULL COMMENT '訊息消費失敗時的最大重試次數', `sharding_key` varchar(30) NOT NULL COMMENT '順序訊息的區間'
, `tag` varchar(30) NOT NULL COMMENT '處理訊息的型別', `investor` varchar(50) NOT NULL COMMENT '消費監控的類名', `status` varchar(30) NOT NULL COMMENT '消費者的狀態(stop,running,init)', `state` int(1) DEFAULT '0', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

然後再springboot的配置檔案中新增配置的資料庫Id:1

aliyun:
  consumerId: 1

然後建立使用springboot配置的配置檔案類:

package com.ryhui.properties;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.OrderConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.ryhui.model.ConsumerLocal;
import com.ryhui.service.modelService.ConsumerLocalService;
import com.ryhui.utils.MessageListenerUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * Created by 10351 on 2018/7/4.
 */
@Component
@ConfigurationProperties(prefix="aliyun")
@Slf4j
public class AliMQConfig {
    @Autowired
    private MessageListenerUtil messageListenerUtil;

    @Autowired
    private ConsumerLocalService consumerLocalService;

    private String consumerId;

    public String getConsumerId() {
        return consumerId;
    }

    public void setConsumerId(String consumerId) {
        this.consumerId = consumerId;
    }

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public OrderConsumerBean getConsumer() {
        ConsumerLocal consumerLocal = consumerLocalService.selectById(Long.valueOf(consumerId));
        if(consumerLocal!=null&&consumerLocal.getStatus().equals("stop")){

            OrderConsumerBean consumerBean = new OrderConsumerBean();
            Properties properties = new Properties();
            properties.put(PropertyKeyConst.ConsumerId, consumerLocal.getConsumerId());
            // AccessKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
            properties.put(PropertyKeyConst.AccessKey, consumerLocal.getAccessKey());
            // SecretKey 阿里雲身份驗證,在阿里雲伺服器管理控制檯建立
            properties.put(PropertyKeyConst.SecretKey, consumerLocal.getSecreKey());
            //訊息處理失敗後多久重新發送訊息
            properties.put(PropertyKeyConst.SuspendTimeMillis, consumerLocal.getSuspendTimeMillis());
            //重發的次數
            properties.put(PropertyKeyConst.MaxReconsumeTimes, consumerLocal.getMaxReconsumeTimes());
            //消費者的執行緒數
            properties.put(PropertyKeyConst.ConsumeThreadNums,"1");
            //消費者的介入地址
            properties.put(PropertyKeyConst.ONSAddr, consumerLocal.getOnsAddress());
            consumerBean.setProperties(properties);
            Subscription subscription = new Subscription();
            subscription.setTopic(consumerLocal.getTopic());
            subscription.setExpression(consumerLocal.getTag());
            Map<Subscription, MessageOrderListener> map = new HashMap();
            map.put(subscription, new MessageOrderListener(){
                @Override
                public OrderAction consume(Message message, ConsumeOrderContext consumeOrderContext) {
                    log.info("【{}】有訊息進入,開始處理",message.getMsgID());
                    messageListenerUtil.sendMessage(message);
                    log.info("【{}】訊息處理結束",message.getMsgID());
                    return OrderAction.Success;
                }
            });
            consumerBean.setSubscriptionTable(map);
            consumerLocal.setStatus("running");
            consumerLocalService.update(consumerLocal);
            return consumerBean;
        }else{
            return new OrderConsumerBean();
        }
    }
}

監聽使用內部類格式,能夠使用再配置檔案中引入的spring管理類。spring管理類如下:

package com.ryhui.utils;

import com.aliyun.openservices.ons.api.Message;
import com.ryhui.model.ConsumerLocal;
import com.ryhui.model.ConsumerMessage;
import com.ryhui.service.modelService.ConsumerLocalService;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Created by 10351 on 2018/7/4.
 */
@Component
public class MessageListenerUtil {
    @Resource
    private ConsumerLocalService consumerLocalService;

    public String sendMessage(Message message){
        System.out.println(message.toString());
        ConsumerLocal consumerLocal = consumerLocalService.selectByTopicAndTag(message.getTopic(),message.getTag());
        if(consumerLocal!=null){
            ConsumerMessage consumerMessage = consumerLocalService.selectByMessageUid(message.getMsgID());
            if(consumerMessage==null){
                consumerMessage = consumerLocalService.handlerMessage(message,consumerLocal.getId());
            }
            consumerLocalService.updateMessage(consumerMessage.getId(),"success");
        }
        System.out.println(message.toString()+"end");
        return "hello";
    }
}

專案啟動後會自動開啟消費者,如果需要多個消費者,就需要在資料庫多配置一條資料,然後把配置的id寫入到配置檔案中。然後再配置類中引用。然後根據這個配置寫一個消費者邏輯。模式同上。