定時任務管理中心(dubbo+spring)-我們到底能走多遠系列48
我們到底能走多遠系列47
扯淡:
又是一年新年時,不知道上一年你付出了多少,收穫了多少呢?也許你正想著老闆會發多少獎金,也許你正想著明年去哪家公司投靠。
這個時間點好好整理一下,思考總結一下,的確是個非常好的機會。
年終的時候各個公司總會評一下績效,拉出各位的成績單,你是不是想說:去你媽的成績單,我不是你的學生,老子努力工作不是為了看你臉色!當然啦,你想說這話的前提是:你很牛b,如果不是也可以想想,然後默默去變牛b。
我大多數的朋友同事都是漂在城市裡的人,我們努力的活得更好,想過自己想過的生活,打心裡佩服我們自己,選擇這個行業,正嘗試改變著世界。
所以,加油,各位!
另外,程式設計師過什麼新年?寫bug的時間都不夠呢!
最後還是祝看到這個文字的朋友:身體健康,闔家歡樂,雞年大吉,公司上市。
主題:
一般,開一個定時任務很簡單,spring寫個註解就能跑了,或者單應用的定時任務還有很多其他豐富jar支援。
常規的一個場景: 一個系統一般都會有很多業務模組組成,這些業務模組被封裝成一個個獨立部署的應用拆分出去,獨立維護。各個業務模組都會有自己定製的定時任務要跑,一般都會依賴自己業務資料和邏輯,很自然的寫在鴿子應用中。 那麼定時任務管理中心要做的是統一管理這些散落在各個業務模組中的定時任務。 統一管理的好處是: 1,全系統定時任務一目瞭然,便於排查 2,任務執行相關資訊統一到一起,比如日誌,而任務業務程式碼開發和任務配置解耦 3,針對任務功能的開發升級集中到一個應用中了 這裡粗略設計一個定時任務管理系統拋磚引玉。 大致劃分以下三個部分:任務管理系統用於配置任務的一些資訊,任務排程使用這些資訊實現對業務系統進行排程,實現定時任務。 拆解後各個元件的關係如下: 當然,為了跟好的描述這個系統,以上圖是一個簡化的設計圖。 如何實現呢? 這裡提供一個程式碼的方案,實際開發中結合實際場景和當時技術遺產還有很多的技術方案可以設計,還可以深度挖掘。 首先我們給每個應用提供任務管理中心的jar包,在業務應用啟動的時候我們要把任務service收集起來,放入一個map,然後統一提供出一個dubbo介面,用dubbo的group區分各個應用。當任務排程需要呼叫到這個應用的某個任務service時,再從map中拿出spring bean執行任務方法。 以上功能的jar的核心程式碼如下:
publicclass TaskSupport implements BeanPostProcessor, ApplicationListener<ApplicationContextEvent>, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(TaskSupport.class); private ApplicationContext applicationContext; private RegistryConfig registryConfig; private ApplicationConfig applicationConfig; private ProtocolConfig protocolConfig; // 儲存任務bean private Map<String, Object> taskBeanMap = new HashMap<String, Object>(); // dubbo config private ServiceConfig<Object> serviceConfig; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { collectTaskBean(bean, beanName); return bean; } private Object getTarget(Object bean) { Object target = bean; while (target instanceof Advised) { try { target = ((Advised) bean).getTargetSource().getTarget(); } catch (Exception e) { target = null; break; } } return target; } private void collectTaskBean(Object bean, String beanName) { Object target = getTarget(bean); if (target != null) { Class<?> clazz = target.getClass(); if (!clazz.isAnnotationPresent(Service.class) || !clazz.isAnnotationPresent(Task.class)) { return; } if (!taskBeanMap.containsKey(beanName)) { logger.info("add task bean {}", beanName); taskBeanMap.put(beanName, bean); } } } @Override public void onApplicationEvent(ApplicationContextEvent event) { if (isCurrentApplicationContextRefresh(event)) { exportTaskDispatcher(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } /** * 向Task暴露任務分發器服務 */ protected void exportTaskDispatcher() { if (serviceConfig != null && serviceConfig.isExported()) { return; } applicationConfig = applicationContext.getBean(ApplicationConfig.class); registryConfig = applicationContext.getBean("soaRegistryConfig", RegistryConfig.class); protocolConfig = applicationContext.getBean(ProtocolConfig.class); TaskDispatcherImpl taskServiceProxyImpl = wireTaskServiceProxy(); exportServiceConfig(taskServiceProxyImpl); } protected void unexportTaskDispatcher() { if (serviceConfig != null && serviceConfig.isExported()) { serviceConfig.unexport(); } } private TaskDispatcherImpl wireTaskServiceProxy() { AutowireCapableBeanFactory beanFactory = applicationContext.getAutowireCapableBeanFactory(); DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory; TaskDispatcherImpl taskServiceProxy = new TaskDispatcherImpl(); // ITaskDispatcher的實現bean載入到spring bean容器中,等待dubbo介面暴露 taskServiceProxy = (TaskDispatcherImpl) beanFactory.initializeBean(taskServiceProxy, "taskServiceProxy"); // 這裡把篩選出的taskBeanMap注入到TaskDispatcherImpl,直接可以使用 taskServiceProxy.setTaskBeanMap(taskBeanMap); taskServiceProxy.setApplicationConfig(applicationConfig); taskServiceProxy.setRegistryConfig(registryConfig); defaultListableBeanFactory.registerSingleton("taskServiceProxy", taskServiceProxy); return taskServiceProxy; } /** * dubbo介面暴露給任務排程系統 * */ private void exportServiceConfig(Object proxy) { serviceConfig = new ServiceConfig<Object>(); serviceConfig.setApplication(applicationConfig); serviceConfig.setRegistry(registryConfig); serviceConfig.setProtocol(protocolConfig); // 把這個介面暴露出去 serviceConfig.setInterface(ITaskDispatcher.class); serviceConfig.setRef(proxy); serviceConfig.setRetries(0); // 各個業務系統的group不同,這裡充分利用了dubbo的屬性 serviceConfig.setGroup(applicationConfig.getName()); serviceConfig.export(); } /** * 是否是當前上下文,防止重複載入和過早載入 * * @param event * @return */ private boolean isCurrentApplicationContextRefresh(ApplicationEvent event) { return event instanceof ContextRefreshedEvent && ((ContextRefreshedEvent) event).getApplicationContext() == applicationContext; } }
這樣一來,所有應用都會暴露一個ITaskDispatcher 類的方法出去,但是各個group不一樣。ITaskDispatcher定義的方法:
public interface ITaskDispatcher { public void dispatch(TaskInvokeInfoDto taskInvokeInfoDto); }
dispatch方法是排程中心排程觸發啟動任務的方法,根據TaskInvokeInfoDto這個引數裡的定義,需要定位到哪一個應用的哪一個類的那一個方法,這個方法的引數是什麼,定位到後執行它,這就是dispatch要實現的功能。
先看一下TaskInvokeInfoDto的定義:
private String appName;//定位到哪個應用,dubbo的group區分 private String beanName;//定位到哪個類 private String methodName;// 定位到哪個方法 private String[] parameterTypes;//方法的引數型別,有過載的情況 private String[] args;//引數值
那麼dispatch的核心程式碼:
public void dispatch(TaskInvokeInfoDto taskInvokeInfoDto) { try { Method method = findMethod(taskInvokeInfoDto); Class<?>[] parameterClazzs = method.getParameterTypes(); if (parameterClazzs.length == 0) { ReflectionUtils.invokeMethod(method, taskBeanMap.get(taskInvokeInfoDto.getBeanName())); } else { Object[] parameterObjs = new Object[parameterClazzs.length]; for (int i = 0; i < parameterClazzs.length; i++) { parameterObjs[i] = Jackson.base().readValue(taskInvokeInfoDto.getArgs()[i], parameterClazzs[i]); } ReflectionUtils.invokeMethod(method, taskBeanMap.get(taskInvokeInfoDto.getBeanName()), parameterObjs); } } catch (Exception e) { logger.error("execute error...", e); } } // 上面將的定位邏輯 private Method findMethod(TaskInvokeInfoDto taskInvokeInfoDto) { Object bean = taskBeanMap.get(taskInvokeInfoDto.getBeanName()); Method method = null; if (ArrayUtils.isEmpty(taskInvokeInfoDto.getParameterTypes())) { method = ReflectionUtils.findMethod(bean.getClass(), taskInvokeInfoDto.getMethodName()); } else { final int paramCount = taskInvokeInfoDto.getParameterTypes().length; Class<?>[] clazzArray = new Class<?>[paramCount]; for (int i = 0; i < paramCount; i++) { try { clazzArray[i] = ClassUtils.getClass(taskInvokeInfoDto.getParameterTypes()[i]); } catch (ClassNotFoundException e) { logger.info("根據引數型別的字串建立class物件時失敗", e); return null; } } method = ReflectionUtils.findMethod(bean.getClass(), taskInvokeInfoDto.getMethodName(), clazzArray); } return method; }
以上只要在排程中心處呼叫dubbo來控制任務執行就可以實現整個任務中心的核心功能。
當然,這裡只是簡單的嘗試性的實現,還有很多優化和擴充套件可以做,比如任務日誌列印收集,任務應用存活狀態心跳監控,等等。
以前看到過一篇去哪網的吹b文章,吹了半天,仔細看了他提到的功能和沒實現的功能,搞過的人都會覺得做一個其實不難,只是人家分享的時候感覺很厲害,其實他自己心裡清楚自己這個系統也是處處是坑。雖然吹b,不過也會給我們各種啟發。
總結:
1,程式碼中利用spring的BeanPostProcessor,篩選出自己需要的bean的方式又是一種新的技巧,我在《請求路由到業務方法設計(2)》中需要篩選bean map用了另一種方式。不知道網友還有其他的想法嗎?
2,反射相關的api還可以繼續深入學習。
讓我們繼續前行
----------------------------------------------------------------------
努力不一定成功,但不努力肯定不會成功。