RabbitMQ基礎元件封裝實踐2
一、可靠性訊息投遞
1、rabbit-core-producer工程下建立服務
@Service public class MessageStoreService { @Autowired private BrokerMessageMapper brokerMessageMapper; public int insert(BrokerMessage brokerMessage) { return this.brokerMessageMapper.insert(brokerMessage); } public BrokerMessage selectByMessageId(String messageId) { return this.brokerMessageMapper.selectByPrimaryKey(messageId); } public void succuess(String messageId) { this.brokerMessageMapper.changeBrokerMessageStatus(messageId, BrokerMessageStatus.SEND_OK.getCode(), new Date()); } public void failure(String messageId) { this.brokerMessageMapper.changeBrokerMessageStatus(messageId, BrokerMessageStatus.SEND_FAIL.getCode(), new Date()); } public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){ return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode()); } public int updateTryCount(String brokerMessageId) { return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date()); } }
2、定義訊息傳送狀態
public enum BrokerMessageStatus { SENDING("0"), SEND_OK("1"), SEND_FAIL("2"), SEND_FAIL_A_MOMENT("3"); private String code; private BrokerMessageStatus(String code) { this.code = code; } public String getCode() { return this.code; } }
常量資訊
public interface BrokerMessageConst { //超時時間為1分鐘 int TIMEOUT = 1; }
RabbitBrokerImpl增加方式可靠性訊息方法
@Override public void reliantSend(Message message) { message.setMessageType(MessageType.RELIANT); BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId()); if(bm == null) { //1. 把資料庫的訊息傳送日誌先記錄好 Date now = new Date(); BrokerMessage brokerMessage = new BrokerMessage(); brokerMessage.setMessageId(message.getMessageId()); brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode()); //tryCount 在最開始傳送的時候不需要進行設定 brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT)); brokerMessage.setCreateTime(now); brokerMessage.setUpdateTime(now); brokerMessage.setMessage(message); messageStoreService.insert(brokerMessage); } //2. 執行真正的傳送訊息邏輯 sendKernel(message); }
收到確認訊息操作
RabbitTemplateContainer類中的confirm方法修改如下:
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //訊息應答 List<String> strings = splitter.splitToList(correlationData.getId()); String messageId = strings.get(0); long sendTime = Long.parseLong(strings.get(1)); String messageType = strings.get(2); if(ack){ log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime); }else { log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime); } if(ack) { // 當Broker 返回ACK成功時, 就是更新一下日誌表裡對應的訊息傳送狀態為 SEND_OK // 如果當前訊息型別為reliant 我們就去資料庫查詢並進行更新 if(MessageType.RELIANT.endsWith(messageType)) { this.messageStoreService.succuess(messageId); } log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime); } else { log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime); } }
二、分散式定時任務元件封裝 rabbit-task
1、增加依賴。引入elastic-job
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rabbit-parent</artifactId> <groupId>com.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rabbit-task</artifactId> <properties> <elastic-job.version>2.1.4</elastic-job.version> </properties> <dependencies> <!-- spring boot dependency --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- elastic-job dependency --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${elastic-job.version}</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${elastic-job.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> </project>
2、增加自動裝配
1) 建立類JobParserAutoConfigurartion ,用於解析Elastic-Job連線zk註冊中心的配置,並初始化配置資訊到zk註冊中心。
將配置屬性讀取到JobZookeeperProperties 類中
@Slf4j @Configuration @ConditionalOnProperty(prefix = "elastic.job.zk", name = {"namespace", "serverLists"}, matchIfMissing = false) @EnableConfigurationProperties(JobZookeeperProperties.class) public class JobParserAutoConfigurartion { @Bean(initMethod = "init") public ZookeeperRegistryCenter zookeeperRegistryCenter(JobZookeeperProperties jobZookeeperProperties) { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace()); zkConfig.setBaseSleepTimeMilliseconds(zkConfig.getBaseSleepTimeMilliseconds()); zkConfig.setMaxSleepTimeMilliseconds(zkConfig.getMaxSleepTimeMilliseconds()); zkConfig.setConnectionTimeoutMilliseconds(zkConfig.getConnectionTimeoutMilliseconds()); zkConfig.setSessionTimeoutMilliseconds(zkConfig.getSessionTimeoutMilliseconds()); zkConfig.setMaxRetries(zkConfig.getMaxRetries()); zkConfig.setDigest(zkConfig.getDigest()); log.info("初始化job註冊中心配置成功, zkaddress : {}, namespace : {}", jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace()); return new ZookeeperRegistryCenter(zkConfig); } @Bean public ElasticJobConfParser elasticJobConfParser(JobZookeeperProperties jobZookeeperProperties, ZookeeperRegistryCenter zookeeperRegistryCenter) { return new ElasticJobConfParser(jobZookeeperProperties, zookeeperRegistryCenter); } }
2)、在resources資料夾下建立META-INF
在META-INF資料夾下建立spring.factories
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.example.task.autoconfigure.JobParserAutoConfigurartion
3)、JobZookeeperProperties 類中
@ConfigurationProperties(prefix = "elastic.job.zk") @Data public class JobZookeeperProperties { private String namespace; private String serverLists; private int maxRetries = 3; private int connectionTimeoutMilliseconds = 15000; private int sessionTimeoutMilliseconds = 60000; private int baseSleepTimeMilliseconds = 1000; private int maxSleepTimeMilliseconds = 3000; private String digest = ""; }
3、增加模組裝配
1) 增加註解。 匯入JobParserAutoConfigurartion配置
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(JobParserAutoConfigurartion.class) public @interface EnableElasticJob { }
2) 增加Job配置註解ElasticJobConfig
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface ElasticJobConfig { String name(); //elasticjob的名稱 String cron() default ""; int shardingTotalCount() default 1; String shardingItemParameters() default ""; String jobParameter() default ""; boolean failover() default false; boolean misfire() default true; String description() default ""; boolean overwrite() default false; boolean streamingProcess() default false; String scriptCommandLine() default ""; boolean monitorExecution() default false; public int monitorPort() default -1; //must public int maxTimeDiffSeconds() default -1; //must public String jobShardingStrategyClass() default ""; //must public int reconcileIntervalMinutes() default 10; //must public String eventTraceRdbDataSource() default ""; //must public String listener() default ""; //must public boolean disabled() default false; //must public String distributedListener() default ""; public long startedTimeoutMilliseconds() default Long.MAX_VALUE; //must public long completedTimeoutMilliseconds() default Long.MAX_VALUE; //must public String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler"; public String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"; }
3、增加解析註解(ElasticJobConfig)的資訊類ElasticJobConfParser
@Slf4j public class ElasticJobConfParser implements ApplicationListener<ApplicationReadyEvent> { private JobZookeeperProperties jobZookeeperProperties; private ZookeeperRegistryCenter zookeeperRegistryCenter; public ElasticJobConfParser(JobZookeeperProperties jobZookeeperProperties, ZookeeperRegistryCenter zookeeperRegistryCenter) { this.jobZookeeperProperties = jobZookeeperProperties; this.zookeeperRegistryCenter = zookeeperRegistryCenter; } @Override public void onApplicationEvent(ApplicationReadyEvent event) { try { ApplicationContext applicationContext = event.getApplicationContext(); Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class); for(Iterator<?> it = beanMap.values().iterator(); it.hasNext();) { Object confBean = it.next(); Class<?> clazz = confBean.getClass(); if(clazz.getName().indexOf("$") > 0) { String className = clazz.getName(); clazz = Class.forName(className.substring(0, className.indexOf("$"))); } // 獲取介面型別 用於判斷是什麼型別的任務 String jobTypeName = clazz.getInterfaces()[0].getSimpleName(); // 獲取配置項 ElasticJobConfig ElasticJobConfig conf = clazz.getAnnotation(ElasticJobConfig.class); String jobClass = clazz.getName(); String jobName = this.jobZookeeperProperties.getNamespace() + "." + conf.name(); String cron = conf.cron(); String shardingItemParameters = conf.shardingItemParameters(); String description = conf.description(); String jobParameter = conf.jobParameter(); String jobExceptionHandler = conf.jobExceptionHandler(); String executorServiceHandler = conf.executorServiceHandler(); String jobShardingStrategyClass = conf.jobShardingStrategyClass(); String eventTraceRdbDataSource = conf.eventTraceRdbDataSource(); String scriptCommandLine = conf.scriptCommandLine(); boolean failover = conf.failover(); boolean misfire = conf.misfire(); boolean overwrite = conf.overwrite(); boolean disabled = conf.disabled(); boolean monitorExecution = conf.monitorExecution(); boolean streamingProcess = conf.streamingProcess(); int shardingTotalCount = conf.shardingTotalCount(); int monitorPort = conf.monitorPort(); int maxTimeDiffSeconds = conf.maxTimeDiffSeconds(); int reconcileIntervalMinutes = conf.reconcileIntervalMinutes(); // 把噹噹網的esjob的相關configuration JobCoreConfiguration coreConfig = JobCoreConfiguration .newBuilder(jobName, cron, shardingTotalCount) .shardingItemParameters(shardingItemParameters) .description(description) .failover(failover) .jobParameter(jobParameter) .misfire(misfire) .jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler) .jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler) .build(); // 到底要建立什麼樣的任務. JobTypeConfiguration typeConfig = null; if(ElasticJobTypeEnum.SIMPLE.getType().equals(jobTypeName)) { typeConfig = new SimpleJobConfiguration(coreConfig, jobClass); } if(ElasticJobTypeEnum.DATAFLOW.getType().equals(jobTypeName)) { typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess); } if(ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) { typeConfig = new ScriptJobConfiguration(coreConfig, scriptCommandLine); } // LiteJobConfiguration LiteJobConfiguration jobConfig = LiteJobConfiguration .newBuilder(typeConfig) .overwrite(overwrite) .disabled(disabled) .monitorPort(monitorPort) .monitorExecution(monitorExecution) .maxTimeDiffSeconds(maxTimeDiffSeconds) .jobShardingStrategyClass(jobShardingStrategyClass) .reconcileIntervalMinutes(reconcileIntervalMinutes) .build(); // 建立一個Spring的beanDefinition BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class); factory.setInitMethodName("init"); factory.setScope("prototype"); // 1.新增bean構造引數,相當於新增自己的真實的任務實現類 if (!ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) { factory.addConstructorArgValue(confBean); } // 2.添加註冊中心 factory.addConstructorArgValue(this.zookeeperRegistryCenter); // 3.新增LiteJobConfiguration factory.addConstructorArgValue(jobConfig); // 4.如果有eventTraceRdbDataSource 則也進行新增 if (StringUtils.hasText(eventTraceRdbDataSource)) { BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class); rdbFactory.addConstructorArgReference(eventTraceRdbDataSource); factory.addConstructorArgValue(rdbFactory.getBeanDefinition()); } // 5.新增監聽 List<?> elasticJobListeners = getTargetElasticJobListeners(conf); factory.addConstructorArgValue(elasticJobListeners); // 接下來就是把factory 也就是 SpringJobScheduler注入到Spring容器中 DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory(); String registerBeanName = conf.name() + "SpringJobScheduler"; defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition()); SpringJobScheduler scheduler = (SpringJobScheduler)applicationContext.getBean(registerBeanName); scheduler.init(); log.info("啟動elastic-job作業: " + jobName); } log.info("共計啟動elastic-job作業數量為: {} 個", beanMap.values().size()); } catch (Exception e) { log.error("elasticjob 啟動異常, 系統強制退出", e); System.exit(1); } } private List<BeanDefinition> getTargetElasticJobListeners(ElasticJobConfig conf) { List<BeanDefinition> result = new ManagedList<BeanDefinition>(2); String listeners = conf.listener(); if (StringUtils.hasText(listeners)) { BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners); factory.setScope("prototype"); result.add(factory.getBeanDefinition()); } String distributedListeners = conf.distributedListener(); long startedTimeoutMilliseconds = conf.startedTimeoutMilliseconds(); long completedTimeoutMilliseconds = conf.completedTimeoutMilliseconds(); if (StringUtils.hasText(distributedListeners)) { BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners); factory.setScope("prototype"); factory.addConstructorArgValue(Long.valueOf(startedTimeoutMilliseconds)); factory.addConstructorArgValue(Long.valueOf(completedTimeoutMilliseconds)); result.add(factory.getBeanDefinition()); } return result; } }
實現了ApplicationListener<ApplicationReadyEvent>介面,等Bean都初始化完成,應用起來後執行onApplicationEvent方法。
2) 增加列舉ElasticJobTypeEnum
public enum ElasticJobTypeEnum { SIMPLE("SimpleJob", "簡單型別job"), DATAFLOW("DataflowJob", "流式型別job"), SCRIPT("ScriptJob", "指令碼型別job"); private String type; private String desc; private ElasticJobTypeEnum(String type, String desc) { this.type = type; this.desc = desc; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } }
三、分散式定時任務測試
1、建立工程rabbit-task-test工程
工程結構如下:
1、application.properties 配置如下
server.port=8881 elastic.job.zk.namespace=elastic-job elastic.job.zk.serverLists=47.xx.xx.120:2181
2、啟動ElasticJob。
增加註解EnableElasticJob
@EnableElasticJob @SpringBootApplication @ServletComponentScan(basePackages = {"com.example.rabbittasktest.esjob"}) public class RabbitTaskTestApplication { public static void main(String[] args) { SpringApplication.run(RabbitTaskTestApplication.class, args); } }
3、建立定時任務
1 ) 建立定時任務1 ,5秒鐘執行一次
@Component @ElasticJobConfig( name = "com.example.rabbittasktest.esjob.TestJob", cron = "0/5 * * * * ?", //5秒鐘一次 description = "測試定時任務", overwrite = true, shardingTotalCount = 5 ) @Slf4j public class TestJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info("執行測試job"); } }
2) 建立定時任務2
10秒鐘一次執行一次
@Component @ElasticJobConfig( name = "com.example.rabbittasktest.esjob.TestJob2", cron = "0/10 * * * * ?", //10秒鐘一次 description = "測試定時任務2", overwrite = true, shardingTotalCount = 2 ) @Slf4j public class TestJob2 implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info("執行測試job2"); } }
啟動工程,說明已經成功了。