Storm框架:Storm整合springboot
Storm主要的三個Component:Topology、Spout、Bolt。Topology作為主進程控制著spout、bolt線程的運行,他們相當於獨立運行的容器分布於storm集群中的各個機器節點。
SpringApplication:是配置Spring應用上下文的起點。通過調用SpringApplication.run()方法它將創建ApplicationContext實例,這是我們能夠使用Ioc容器的主要BeanFactory。之後Spring將會加載所有單例模式的beans,並啟動後臺運行的CommandLineRunner beans等。
實現原理
Storm框架中的每個Spout和Bolt都相當於獨立的應用,Strom在啟動spout和bolt時提供了一個open方法(spout)和prepare方法(bolt)。我們可以把初始化Spring應用的操作放在這裏,這樣可以保證每個spout/bolt應用在後續執行過程中都能獲取到Spring的ApplicationContext,有了ApplicationContext實例對象,Spring的所有功能就都能用上了。
Spout.open方法實現@Override
//啟動Springboot應用
SpringStormApplication.run();
this.map = map;
this.topologyContext = topologyContext;
this.spoutOutputCollector = spoutOutputCollector;
}Bolt.prepare方法實現
@Override
br/>Bolt.prepare方法實現
@Override
//啟動Springboot應用
SpringStormApplication.run();
this.map = map; this.topologyContext = topologyContext; this.outputCollector = outputCollector;
}SpringStormApplication啟動類
@SpringBootApplication
br/>SpringStormApplication啟動類
@SpringBootApplication
public class SpringStormApplication {
/**
- 非工程啟動入口,所以不用main方法
- 加上synchronized的作用是由於storm在啟動多個bolt線程實例時,如果Springboot用到Apollo分布式配置,會報ConcurrentModificationException錯誤
- 詳見:https://github.com/ctripcorp/apollo/issues/1658
- @param args
*/
public synchronized static void run(String ...args) {
SpringApplication app = new SpringApplication(SpringStormApplication.class);
//我們並不需要web servlet功能,所以設置為WebApplicationType.NONE
app.setWebApplicationType(WebApplicationType.NONE);
//忽略掉banner輸出
app.setBannerMode(Banner.Mode.OFF);
//忽略Spring啟動信息日誌
app.setLogStartupInfo(false);
app.run(args);
}
}
與我們傳統的Springboot應用啟動入口稍微有點區別,主要禁用了web功能,看下正常的啟動方式:
@SpringBootApplication
@ComponentScan(value = "com.xxx.web")
public class PlatformApplication {
public static void main(String[] args) {
SpringApplication.run(PlatformApplication.class, args);}
}
在spout/bolt中調用了SpringStormApplication.run方法後,我們還需要能夠拿到ApplicationContext容器對象,這時候我們還需要實現ApplicationContextAware接口,寫個工具類BeanUtils:
@Component
br/>}
}
在spout/bolt中調用了SpringStormApplication.run方法後,我們還需要能夠拿到ApplicationContext容器對象,這時候我們還需要實現ApplicationContextAware接口,寫個工具類BeanUtils:
@Component
private static ApplicationContext applicationContext = null;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (BeanUtils.applicationContext == null) {
BeanUtils.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
通過@Component註解使得Spring在啟動時能夠掃描到該bean,因為BeanUtils實現了ApplicationContextAware接口,Spring會在啟動成功時自動調用BeanUtils.setApplicationContext方法,將ApplicationContext對象保存到工具類的靜態變量中,之後我們就可以使用BeanUtils.getBean()去獲取Spring容器中的bean了。
寫個簡單例子
在FilterBolt的execute方法中獲取Spring bean@Override
br/>@Override
FilterService filterService = (FilterService) BeanUtils.getBean("filterService");
filterService.deleteAll();
}
定義FilterService類,這時候我們就可以使用Spring的相關註解,自動註入,Spring Jpa等功能了。@Service("filterService")
br/>@Service("filterService")
br/>@Autowired
public void deleteAll() {
userRepository.deleteAll();
}
}
將storm應用作為Springboot工程的一個子模塊
工程主目錄的pom文件還是springboot相關的依賴,在storm子模塊中引入storm依賴,這時候啟動Strom的topology應用會有一個日誌包依賴沖突。
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Applications/IntelliJ%20IDEA.app/Contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
我們需要在storm子模塊的pom文件中重寫org.springframework.boot:spring-boot-starter包依賴,將Springboot的相關日誌包排除掉,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j2</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic2</artifactId>
</exclusion>
</exclusions>
</dependency>
OK,完美整合!
Storm框架:Storm整合springboot