1. 程式人生 > 實用技巧 >RabbitMQ基礎元件封裝實踐2

RabbitMQ基礎元件封裝實踐2

一、可靠性訊息投遞

1、rabbit-core-producer工程下建立服務

@Service
public class MessageStoreService {

	@Autowired
	private BrokerMessageMapper brokerMessageMapper;
	
	public int insert(BrokerMessage brokerMessage) {
		return this.brokerMessageMapper.insert(brokerMessage);
	}
	
	public BrokerMessage selectByMessageId(String messageId) {
		return this.brokerMessageMapper.selectByPrimaryKey(messageId);
	}

	public void succuess(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_OK.getCode(),
				new Date());
	}
	
	public void failure(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_FAIL.getCode(),
				new Date());
	}
	
	public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
		return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
	}
	
	public int updateTryCount(String brokerMessageId) {
		return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
	}
	
	
	
	
}

  

2、定義訊息傳送狀態

public enum BrokerMessageStatus {

	SENDING("0"),
	SEND_OK("1"),
	SEND_FAIL("2"),
	SEND_FAIL_A_MOMENT("3");
	
	private String code;
	
	private BrokerMessageStatus(String code) {
		this.code = code;
	}
	
	public String getCode() {
		return this.code;
	}
	
}

  

常量資訊

public interface BrokerMessageConst {
	
	//超時時間為1分鐘
	int TIMEOUT = 1;
}

  

RabbitBrokerImpl增加方式可靠性訊息方法

 @Override
    public void reliantSend(Message message) {
        message.setMessageType(MessageType.RELIANT);
        BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
        if(bm == null) {
            //1. 把資料庫的訊息傳送日誌先記錄好
            Date now = new Date();
            BrokerMessage brokerMessage = new BrokerMessage();
            brokerMessage.setMessageId(message.getMessageId());
            brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
            //tryCount 在最開始傳送的時候不需要進行設定
            brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
            brokerMessage.setCreateTime(now);
            brokerMessage.setUpdateTime(now);
            brokerMessage.setMessage(message);
            messageStoreService.insert(brokerMessage);
        }
        //2. 執行真正的傳送訊息邏輯
        sendKernel(message);
    }

  

收到確認訊息操作

RabbitTemplateContainer類中的confirm方法修改如下:

 @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //訊息應答
       List<String> strings =  splitter.splitToList(correlationData.getId());
       String messageId = strings.get(0);
       long sendTime = Long.parseLong(strings.get(1));
        String messageType = strings.get(2);
       if(ack){
           log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }else {
           log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }
        if(ack) {
            //	當Broker 返回ACK成功時, 就是更新一下日誌表裡對應的訊息傳送狀態為 SEND_OK

            // 	如果當前訊息型別為reliant 我們就去資料庫查詢並進行更新
            if(MessageType.RELIANT.endsWith(messageType)) {
                this.messageStoreService.succuess(messageId);
            }
            log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
        } else {
            log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);

        }
    }

  

二、分散式定時任務元件封裝 rabbit-task

1、增加依賴。引入elastic-job

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbit-parent</artifactId>
        <groupId>com.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbit-task</artifactId>

    <properties>
        <elastic-job.version>2.1.4</elastic-job.version>
    </properties>
    <dependencies>
        <!-- spring boot dependency -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--  elastic-job dependency -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

</project>

  

2、增加自動裝配

1) 建立類JobParserAutoConfigurartion ,用於解析Elastic-Job連線zk註冊中心的配置,並初始化配置資訊到zk註冊中心。

將配置屬性讀取到JobZookeeperProperties 類中

@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "elastic.job.zk", name = {"namespace", "serverLists"}, matchIfMissing = false)
@EnableConfigurationProperties(JobZookeeperProperties.class)
public class JobParserAutoConfigurartion {

	@Bean(initMethod = "init")
	public ZookeeperRegistryCenter zookeeperRegistryCenter(JobZookeeperProperties jobZookeeperProperties) {
		ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(jobZookeeperProperties.getServerLists(),
				jobZookeeperProperties.getNamespace());
		zkConfig.setBaseSleepTimeMilliseconds(zkConfig.getBaseSleepTimeMilliseconds());
		zkConfig.setMaxSleepTimeMilliseconds(zkConfig.getMaxSleepTimeMilliseconds());
		zkConfig.setConnectionTimeoutMilliseconds(zkConfig.getConnectionTimeoutMilliseconds());
		zkConfig.setSessionTimeoutMilliseconds(zkConfig.getSessionTimeoutMilliseconds());
		zkConfig.setMaxRetries(zkConfig.getMaxRetries());
		zkConfig.setDigest(zkConfig.getDigest());
		log.info("初始化job註冊中心配置成功, zkaddress : {}, namespace : {}", jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
		return new ZookeeperRegistryCenter(zkConfig);
	}
	
	@Bean
	public ElasticJobConfParser elasticJobConfParser(JobZookeeperProperties jobZookeeperProperties, ZookeeperRegistryCenter zookeeperRegistryCenter) {
		return new ElasticJobConfParser(jobZookeeperProperties, zookeeperRegistryCenter);
	}
	
}

 

2)、在resources資料夾下建立META-INF

在META-INF資料夾下建立spring.factories

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.task.autoconfigure.JobParserAutoConfigurartion

  

3)、JobZookeeperProperties 類中

@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class JobZookeeperProperties {

	private String namespace;
	
	private String serverLists;
	
	private int maxRetries = 3;

	private int connectionTimeoutMilliseconds = 15000;
	
	private int sessionTimeoutMilliseconds = 60000;
	
	private int baseSleepTimeMilliseconds = 1000;
	
	private int maxSleepTimeMilliseconds = 3000;
	
	private String digest = "";
	
}

  

3、增加模組裝配

1) 增加註解。 匯入JobParserAutoConfigurartion配置

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(JobParserAutoConfigurartion.class)
public @interface EnableElasticJob {

}

  

2) 增加Job配置註解ElasticJobConfig

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConfig {

	String name();	//elasticjob的名稱
	
	String cron() default "";
	
	int shardingTotalCount() default 1;
	
	String shardingItemParameters() default "";
	
	String jobParameter() default "";
	
	boolean failover() default false;
	
	boolean misfire() default true;
	
	String description() default "";
	
	boolean overwrite() default false;
	
	boolean streamingProcess() default false;
	
	String scriptCommandLine() default "";
	
	boolean monitorExecution() default false;
	
	public int monitorPort() default -1;	//must

	public int maxTimeDiffSeconds() default -1;	//must

	public String jobShardingStrategyClass() default "";	//must

	public int reconcileIntervalMinutes() default 10;	//must

	public String eventTraceRdbDataSource() default "";	//must

	public String listener() default "";	//must

	public boolean disabled() default false;	//must

	public String distributedListener() default "";

	public long startedTimeoutMilliseconds() default Long.MAX_VALUE;	//must

	public long completedTimeoutMilliseconds() default Long.MAX_VALUE;		//must

	public String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";

	public String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
	
}

  

3、增加解析註解(ElasticJobConfig)的資訊類ElasticJobConfParser

@Slf4j
public class ElasticJobConfParser implements ApplicationListener<ApplicationReadyEvent> {

	private JobZookeeperProperties jobZookeeperProperties;
	
	private ZookeeperRegistryCenter zookeeperRegistryCenter;
	
	public ElasticJobConfParser(JobZookeeperProperties jobZookeeperProperties,
			ZookeeperRegistryCenter zookeeperRegistryCenter) {
		this.jobZookeeperProperties = jobZookeeperProperties;
		this.zookeeperRegistryCenter = zookeeperRegistryCenter;
	}

	@Override
	public void onApplicationEvent(ApplicationReadyEvent event) {
		try {
			ApplicationContext applicationContext = event.getApplicationContext();
			Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
			for(Iterator<?> it = beanMap.values().iterator(); it.hasNext();) {
				Object confBean = it.next();
				Class<?> clazz = confBean.getClass();
				if(clazz.getName().indexOf("$") > 0) {
					String className = clazz.getName();
					clazz = Class.forName(className.substring(0, className.indexOf("$")));
				}
				// 	獲取介面型別 用於判斷是什麼型別的任務
				String jobTypeName = clazz.getInterfaces()[0].getSimpleName();
				//	獲取配置項 ElasticJobConfig
				ElasticJobConfig conf = clazz.getAnnotation(ElasticJobConfig.class);
				
				String jobClass = clazz.getName();
				String jobName = this.jobZookeeperProperties.getNamespace() + "." + conf.name();
				String cron = conf.cron();
				String shardingItemParameters = conf.shardingItemParameters();
				String description = conf.description();
				String jobParameter = conf.jobParameter();
				String jobExceptionHandler = conf.jobExceptionHandler();
				String executorServiceHandler = conf.executorServiceHandler();

				String jobShardingStrategyClass = conf.jobShardingStrategyClass();
				String eventTraceRdbDataSource = conf.eventTraceRdbDataSource();
				String scriptCommandLine = conf.scriptCommandLine();

				boolean failover = conf.failover();
				boolean misfire = conf.misfire();
				boolean overwrite = conf.overwrite();
				boolean disabled = conf.disabled();
				boolean monitorExecution = conf.monitorExecution();
				boolean streamingProcess = conf.streamingProcess();

				int shardingTotalCount = conf.shardingTotalCount();
				int monitorPort = conf.monitorPort();
				int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
				int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();				
				
				//	把噹噹網的esjob的相關configuration
				JobCoreConfiguration coreConfig = JobCoreConfiguration
						.newBuilder(jobName, cron, shardingTotalCount)
						.shardingItemParameters(shardingItemParameters)
						.description(description)
						.failover(failover)
						.jobParameter(jobParameter)
						.misfire(misfire)
						.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
						.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
						.build();
				
				//	到底要建立什麼樣的任務.
				JobTypeConfiguration typeConfig = null;
				if(ElasticJobTypeEnum.SIMPLE.getType().equals(jobTypeName)) {
					typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
				}
				
				if(ElasticJobTypeEnum.DATAFLOW.getType().equals(jobTypeName)) {
					typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
				}
				
				if(ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
					typeConfig = new ScriptJobConfiguration(coreConfig, scriptCommandLine);
				}
				
				// LiteJobConfiguration
				LiteJobConfiguration jobConfig = LiteJobConfiguration
						.newBuilder(typeConfig)
						.overwrite(overwrite)
						.disabled(disabled)
						.monitorPort(monitorPort)
						.monitorExecution(monitorExecution)
						.maxTimeDiffSeconds(maxTimeDiffSeconds)
						.jobShardingStrategyClass(jobShardingStrategyClass)
						.reconcileIntervalMinutes(reconcileIntervalMinutes)
						.build();
				
				// 	建立一個Spring的beanDefinition
				BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
				factory.setInitMethodName("init");
				factory.setScope("prototype");
				
				//	1.新增bean構造引數,相當於新增自己的真實的任務實現類
				if (!ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
					factory.addConstructorArgValue(confBean);
				}
				//	2.添加註冊中心
				factory.addConstructorArgValue(this.zookeeperRegistryCenter);
				//	3.新增LiteJobConfiguration
				factory.addConstructorArgValue(jobConfig);

				//	4.如果有eventTraceRdbDataSource 則也進行新增
				if (StringUtils.hasText(eventTraceRdbDataSource)) {
					BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
					rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
					factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
				}
				
				//  5.新增監聽
				List<?> elasticJobListeners = getTargetElasticJobListeners(conf);
				factory.addConstructorArgValue(elasticJobListeners);
				
				// 	接下來就是把factory 也就是 SpringJobScheduler注入到Spring容器中
				DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

				String registerBeanName = conf.name() + "SpringJobScheduler";
				defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());
				SpringJobScheduler scheduler = (SpringJobScheduler)applicationContext.getBean(registerBeanName);
				scheduler.init();
				log.info("啟動elastic-job作業: " + jobName);
			}
			log.info("共計啟動elastic-job作業數量為: {} 個", beanMap.values().size());
			
		} catch (Exception e) {
			log.error("elasticjob 啟動異常, 系統強制退出", e);
			System.exit(1);
		}
	}
	
	private List<BeanDefinition> getTargetElasticJobListeners(ElasticJobConfig conf) {
		List<BeanDefinition> result = new ManagedList<BeanDefinition>(2);
		String listeners = conf.listener();
		if (StringUtils.hasText(listeners)) {
			BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners);
			factory.setScope("prototype");
			result.add(factory.getBeanDefinition());
		}

		String distributedListeners = conf.distributedListener();
		long startedTimeoutMilliseconds = conf.startedTimeoutMilliseconds();
		long completedTimeoutMilliseconds = conf.completedTimeoutMilliseconds();

		if (StringUtils.hasText(distributedListeners)) {
			BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners);
			factory.setScope("prototype");
			factory.addConstructorArgValue(Long.valueOf(startedTimeoutMilliseconds));
			factory.addConstructorArgValue(Long.valueOf(completedTimeoutMilliseconds));
			result.add(factory.getBeanDefinition());
		}
		return result;
	}

}  

實現了ApplicationListener<ApplicationReadyEvent>介面,等Bean都初始化完成,應用起來後執行onApplicationEvent方法。

2) 增加列舉ElasticJobTypeEnum

public enum ElasticJobTypeEnum {

	SIMPLE("SimpleJob", "簡單型別job"),
	DATAFLOW("DataflowJob", "流式型別job"),
	SCRIPT("ScriptJob", "指令碼型別job");
	
	private String type;
	
	private String desc;
	
	private ElasticJobTypeEnum(String type, String desc) {
		this.type = type;
		this.desc = desc;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public String getDesc() {
		return desc;
	}

	public void setDesc(String desc) {
		this.desc = desc;
	}

}

  

三、分散式定時任務測試

1、建立工程rabbit-task-test工程

工程結構如下:

1、application.properties 配置如下

server.port=8881
elastic.job.zk.namespace=elastic-job
elastic.job.zk.serverLists=47.xx.xx.120:2181

  

2、啟動ElasticJob。

增加註解EnableElasticJob

@EnableElasticJob
@SpringBootApplication
@ServletComponentScan(basePackages = {"com.example.rabbittasktest.esjob"})
public class RabbitTaskTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitTaskTestApplication.class, args);
    }

}

  

3、建立定時任務

1 ) 建立定時任務1 ,5秒鐘執行一次

@Component
@ElasticJobConfig(
        name = "com.example.rabbittasktest.esjob.TestJob",
        cron = "0/5 * * * * ?", //5秒鐘一次
        description = "測試定時任務",
        overwrite = true,
        shardingTotalCount = 5
)
@Slf4j
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("執行測試job");
    }
}

  

  

2) 建立定時任務2

10秒鐘一次執行一次

@Component
@ElasticJobConfig(
        name = "com.example.rabbittasktest.esjob.TestJob2",
        cron = "0/10 * * * * ?", //10秒鐘一次
        description = "測試定時任務2",
        overwrite = true,
        shardingTotalCount = 2
)
@Slf4j
public class TestJob2 implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("執行測試job2");
    }
}

  

啟動工程,說明已經成功了。