1. 程式人生 > >Storm-HDFS整合過程中問題解決

Storm-HDFS整合過程中問題解決

前面提到了部署Hadoop的叢集環境,因為我們需要用到HDFS,將Storm過來的資料離線存入到HDFS中,然後使用Hadoop從HDFS中取資料進行分析處理。

於是乎我們需要整合Storm-HDFS,在整合過程中遇到了許多問題,有的問題可以在網上找到,但是解決方法不一定實用,於是這裡分享出來,以便自己學習,同時也為同樣遇到相同問題處於困惑中的夥伴提供解決方法。

首先, 整合Storm-HDFS,需要編寫拓撲結構(topology),然後放到Strom上去執行,這裡原始碼,我參考的是http://shiyanjun.cn/archives/934.html

然後我打包部署到Storm上去,部署倒是成功了,可以檢視Storm的ui發現報錯了,於是查詢從機日誌發現報如下錯誤:

2015-11-13T15:58:13.119+0800 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
        at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:109) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.bolt.HdfsBolt.doPrepare(HdfsBolt.java:86) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:105) ~[stormjar.jar:na]
        ... 4 common frames omitted
2015-11-13T15:58:13.120+0800 b.s.d.executor [ERROR] 
java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
        at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:109) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.io.IOException: No FileSystem for scheme: hdfs
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2421) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2428) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) ~[stormjar.jar:na]
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.bolt.HdfsBolt.doPrepare(HdfsBolt.java:86) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:105) ~[stormjar.jar:na]
        ... 4 common frames omitted
2015-11-13T15:58:13.194+0800 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__5102$fn__5103.invoke(worker.clj:495) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_executor_data$fn__4555$fn__4556.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]

看這個報錯的意思,大概是沒有Hdfs的檔案系統,看到這個,一開始我還以為是我的Hadoop叢集部署的問題,導致HDFS檔案系統啟動的時候有問題,我還檢查了半天,後面我發現Hadoop叢集部署沒有問題,排出了叢集部署的問題,於是我在網上搜尋了好久,後面終於找到了一個解決方案:https://github.com/ptgoetz/storm-hdfs

這裡面說的是:

我們在打包拓撲結構(topology)的時候,使用的是maven-assembly-plugin這個maven外掛,因為我們需要把相關的依賴包都打進去,但是這種打包方式會對META-INF 中的同名檔案採取覆蓋的方式,於是打包後執行會出問題,於是給瞭解決辦法,使用maven-shade-plugin打包:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <configuration>
        <createDependencyReducedPom>true</createDependencyReducedPom>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass></mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>
然後我將這一段打包的程式碼新增到了pom.xml中,這裡注意<mainClass></mainClass>中間加你的main方法的路徑

打包完畢後,本以為問題能徹底解決了,但是,天註定沒有那麼順利,這個時候部署的時候又報錯誤了:

Exception in thread "main" java.lang.ExceptionInInitializerError
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:191)
        at backtype.storm.config__init.__init5(Unknown Source)
        at backtype.storm.config__init.<clinit>(Unknown Source)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:274)
        at clojure.lang.RT.loadClassForName(RT.java:2098)
        at clojure.lang.RT.load(RT.java:430)
        at clojure.lang.RT.load(RT.java:411)
        at clojure.core$load$fn__5018.invoke(core.clj:5530)
        at clojure.core$load.doInvoke(core.clj:5529)
        at clojure.lang.RestFn.invoke(RestFn.java:408)
        at clojure.core$load_one.invoke(core.clj:5336)
        at clojure.core$load_lib$fn__4967.invoke(core.clj:5375)
        at clojure.core$load_lib.doInvoke(core.clj:5374)
        at clojure.lang.RestFn.applyTo(RestFn.java:142)
        at clojure.core$apply.invoke(core.clj:619)
        at clojure.core$load_libs.doInvoke(core.clj:5417)
        at clojure.lang.RestFn.applyTo(RestFn.java:137)
        at clojure.core$apply.invoke(core.clj:621)
        at clojure.core$use.doInvoke(core.clj:5507)
        at clojure.lang.RestFn.invoke(RestFn.java:408)
        at backtype.storm.command.config_value$loading__4910__auto__.invoke(config_value.clj:16)
        at backtype.storm.command.config_value__init.load(Unknown Source)
        at backtype.storm.command.config_value__init.<clinit>(Unknown Source)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:274)
        at clojure.lang.RT.loadClassForName(RT.java:2098)
        at clojure.lang.RT.load(RT.java:430)
        at clojure.lang.RT.load(RT.java:411)
        at clojure.core$load$fn__5018.invoke(core.clj:5530)
        at clojure.core$load.doInvoke(core.clj:5529)
        at clojure.lang.RestFn.invoke(RestFn.java:408)
        at clojure.lang.Var.invoke(Var.java:415)
        at backtype.storm.command.config_value.<clinit>(Unknown Source)
Caused by: java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
        at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286)
        at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239)
        at java.util.jar.JarVerifier.processEntry(JarVerifier.java:317)
        at java.util.jar.JarVerifier.update(JarVerifier.java:228)
        at java.util.jar.JarFile.initializeVerifier(JarFile.java:348)
        at java.util.jar.JarFile.getInputStream(JarFile.java:415)
        at sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775)
        at sun.misc.Resource.cachedInputStream(Resource.java:77)
        at sun.misc.Resource.getByteBuffer(Resource.java:160)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at backtype.storm.utils.LocalState.<clinit>(LocalState.java:35)
        ... 35 more

這裡糾結了半天,沒有新增maven-shade-plugin的時候還能順利部署,新增後,直接部署的時候就報錯了,找了半天原因,發現還是打包的問題

因為我們這裡使用了maven-shade-plugin外掛進行打包,這裡打包的時候會對META-INF 目錄下的檔案以追加的方式進行打包,所以會導致打包後的 META-INF 目錄多出了一些 *.SF,從而導致包重複引用,因而報錯:java.lang.SecurityException: Invalid signature file digest for Manifest main attributes

解決辦法:

<filters>
	<filter>
	<artifact>*:*</artifact>
	<excludes>
		<exclude>META-INF/*.SF</exclude>
		<exclude>META-INF/*.DSA</exclude>
		<exclude>META-INF/*.RSA</exclude>
	</excludes>
	</filter>
</filters>

新增這個以後,問題解決。

這次打包部署,部署成功,但是老天總是這麼折騰人,檢視Storm的ui發現又報錯了,然後進入從機日誌檢視詳細報錯資訊:

java.lang.NoSuchFieldError: IBM_JAVA
        at org.apache.hadoop.security.UserGroupInformation.getOSLoginModuleName(UserGroupInformation.java:303) ~[stormjar.jar:na]
        at org.apache.hadoop.security.UserGroupInformation.<clinit>(UserGroupInformation.java:348) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.common.security.HdfsSecurityUtil.login(HdfsSecurityUtil.java:36) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:104) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
2015-11-13T16:55:45.732+0800 b.s.d.executor [ERROR] 
java.lang.NoSuchFieldError: IBM_JAVA
        at org.apache.hadoop.security.UserGroupInformation.getOSLoginModuleName(UserGroupInformation.java:303) ~[stormjar.jar:na]
        at org.apache.hadoop.security.UserGroupInformation.<clinit>(UserGroupInformation.java:348) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.common.security.HdfsSecurityUtil.login(HdfsSecurityUtil.java:36) ~[stormjar.jar:na]
        at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:104) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4722$fn__4734.invoke(executor.clj:692) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:461) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
2015-11-13T16:55:45.823+0800 b.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__5102$fn__5103.invoke(worker.clj:495) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_executor_data$fn__4555$fn__4556.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]

一開始,完全不知道這個錯誤是什麼鬼,然後再網上查了查,有人說報這個錯誤是缺少hadoop-auth這個jar包,詳情可參考:http://stackoverflow.com/questions/22278620/the-ibm-java-error-for-running-jobs-in-hadoop-2-2-0

於是在pom.xml引入了這個包(這裡包的版本只有1.x和2.x會有影響,Storm中引入的hadoop是2.x的版本的話,這個包也必須要2.x的版本,這裡我引入的是2.7.1的):

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-auth</artifactId>
	<version>2.7.1</version>
 </dependency>		

原以為問題該解決了吧,可是發現報錯更多了,不只是之前的那個錯誤還在,又多了一個錯誤:
java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.Log4jLoggerFactory
        at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
        at kafka.utils.Logging$class.logger(Logging.scala:24) ~[stormjar.jar:na]
        at kafka.network.BlockingChannel.logger$lzycompute(BlockingChannel.scala:35) ~[stormjar.jar:na]
        at kafka.network.BlockingChannel.logger(BlockingChannel.scala:35) ~[stormjar.jar:na]
        at kafka.utils.Logging$class.debug(Logging.scala:51) ~[stormjar.jar:na]
        at kafka.network.BlockingChannel.debug(BlockingChannel.scala:35) ~[stormjar.jar:na]
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:64) ~[stormjar.jar:na]
        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) ~[stormjar.jar:na]
        at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) ~[stormjar.jar:na]
        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) ~[stormjar.jar:na]
        at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124) ~[stormjar.jar:na]
        at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[stormjar.jar:na]
        at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[stormjar.jar:na]
        at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[stormjar.jar:na]
        at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[stormjar.jar:na]
        at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[stormjar.jar:na]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
2015-11-13T17:06:50.012+0800 b.s.d.executor [ERROR] 

於是又找原因,網上搜索了半天,說是log4j和slf4j-log4j12包衝突的問題,詳情見:http://erichua.iteye.com/blog/1182090

於是修改pom檔案,在剛加的hadoop-auth包中去除slf4j-log4j12依賴,如下:

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-auth</artifactId>
	<version>2.7.1</version>
	<exclusions>
		<exclusion>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		</exclusion>
	</exclusions>
</dependency>

然後打包部署,果然這個slf4j-log4j12這個包衝突的問題解決了,但是之前的java.lang.NoSuchFieldError: IBM_JAVA錯誤還在,繼續尋找原因,後面發現,新增hadoop-auth並沒有錯,但是在hadoop-auth這個包中的org.apache.hadoop.util.PlatformName中有一個IBM_JAVA這個東西需要用到,但是在hadoop-core這個包中也有org.apache.hadoop.util.PlatformName這個類,所以程式執行的時候它會跑到hadoop-core這個裡面的org.apache.hadoop.util.PlatformName去尋找IBM_JAVA,但是卻找不到,所以報錯,於是在hadoop-auth中去掉hadoop-core這個包,同時在可能依賴到hadoop-core這個包地方都去掉,所以修改pom.xml檔案:
<dependency>
		  <groupId>org.apache.hadoop</groupId>
		  <artifactId>hadoop-auth</artifactId>
		  <version>2.7.1</version>
		  <exclusions>
				<exclusion>
					<groupId>org.slf4j</groupId>
					<artifactId>slf4j-log4j12</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.hadoop</groupId>
					<artifactId>hadoop-core</artifactId>
				</exclusion>
			</exclusions>
</dependency>
然後順利打包,部署上去,終於所有問題解決,真是一把辛酸淚,皇天不負有心人,Storm-HDFS整合完畢。