1. 程式人生 > >storm 報錯:java.nio.channels.ClosedChannelException: null

storm 報錯:java.nio.channels.ClosedChannelException: null

STORM叢集 + KAFKA單機效能測試


文章來源:http://www.cnphp6.com/archives/51272

簡述storm叢集資料處理效能與kafka整合時效能瓶頸點及解決方式。

    storm與kafka單機功能整合很順利,但是到了storm叢集環境和資料處理效能時則出現了一些問題,現將測試過程和問題簡單記錄如下:

    效能指標:每分鐘處理至少100萬的資訊(csv格式,100bytes左右),資訊解析後持久化到DB中。

    架構設計:flume讀取檔案快取到kafka佇列後消費到storm中

    問題:

    一、storm叢集任務排程時出現如下問題,具體日誌見下:

2014-09-24 16:47:38 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-computer7-62/ip:6706... [8]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-computer7-62/ip:6706, [id: 0x0b596170, /ip:34836 => computer7-62/ip:6706]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-computer7-60/ip:6706
2014-09-24 16:47:38 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-computer7-60/ip:6706
java.nio.channels.ClosedChannelException: null
        at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:632) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:611) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:578) [netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) [netty-3.2.2.Final.jar:na]
        at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
        at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] failed to send requests to computer7-60/ip:6706: 
java.nio.channels.ClosedChannelException: null
        at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:649) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:370) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:117) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:632) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:611) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.Channels.write(Channels.java:578) ~[netty-3.2.2.Final.jar:na]
        at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:259) ~[netty-3.2.2.Final.jar:na]
        at backtype.storm.messaging.netty.Client.flushRequest(Client.java:328) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.messaging.netty.Client.close(Client.java:272) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.6.0_24]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.6.0_24]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.6.0_24]
        at java.lang.reflect.Method.invoke(Method.java:616) ~[na:1.6.0_24]
        at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) [clojure-1.5.1.jar:na]
        at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:298) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:284) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_refresh_connections$this__5887.invoke(worker.clj:250) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$schedule_recurring$this__1134.invoke(timer.clj:99) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117$fn__1118.invoke(timer.clj:50) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.timer$mk_timer$fn__1117.invoke(timer.clj:42) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
2014-09-24 16:47:38 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-computer7-60/ip:6706..., timeout: 600000ms, pendings: 0
2014-09-24 16:47:38 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$consume_loop_STAR_$fn__758.invoke(disruptor.clj:94) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:679) [na:1.6.0_24]
Caused by: java.lang.RuntimeException: Client is being closed, and does not take requests any more
        at backtype.storm.messaging.netty.Client.send(Client.java:194) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927$fn__5928.invoke(worker.clj:322) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__5927.invoke(worker.clj:320) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
        ... 6 common frames omitted
2014-09-24 16:47:38 b.s.m.n.Client [INFO] New Netty Client, connect to computer7-60, 6706, config: , buffer_size: 52

    解決方式:問題比較低階,解決過程也是比較曲折,最終發現hosts檔案配置錯誤(機器名中的小寫L寫成了數字1),導致worker節點間資料通訊出現問題,影響任務排程。

二、kafka效能瓶頸

    kafka與storm整合時資料處理效能不是很好,未達到預期要求。一開始懷疑是kafkaspout程式碼問題,但是storm external中已經將其收錄進來,感覺問題應該不是出在這裡。後來看了一下kafkaspout實現,找到了可能的效能瓶頸點。kafka在設計時,為了增加併發訪問及處理效能,在topic中加入了partitions屬性,也就是將資料打散,提高併發與處理效能。由於佇列資訊offset是在客戶端維護,kafkaspout在解決併發互斥時採用task與partitions一一對應的方式來解決互斥訪問。topology在使用時,kafkaspout的併發度可以根據具體topic的partitions屬性來設定。這樣通過增加topic partitions和併發度(8),達到了預期的處理效能。

    由此聯想,之前遇到的flume快取到kafka佇列的問題也可能是partitions設定方式問題導致,後續再測試驗證一下。

相關推薦

storm java.nio.channels.ClosedChannelException: null

STORM叢集 + KAFKA單機效能測試 文章來源:http://www.cnphp6.com/archives/51272 簡述storm叢集資料處理效能與kafka整合時效能瓶頸點及解決方式。     storm與kafka單機功能整合很順利,但是到了st

解決kylinjava.lang.IllegalStateException

util bstr map dex apach default current con str 一個kylin build job執行到第三步Extract Fact Table Distinct Columns時報錯: 2017-05-24 20:04:07,930

對象逆序列化java.lang.ClassNotFoundException

http 上網 security ref targe lin 存在 lan dcl 簡單的想從保存的對象中又一次解析出對象。用了逆序列化,但是報錯: java.lang.ClassNotFoundException: xxxxxxxxxxxx at java.net.U

解決sqoopjava.lang.OutOfMemoryError: Java heap space

keep image ces use ati size tex 問題 -- 報錯棧: 2017-06-15 16:24:50,449 INFO [main] org.apache.sqoop.mapreduce.db.DBRecordReader: Executing

Maven項目下update maven後Eclipsejava.lang.ClassNotFoundException: ContextLoaderL

loader 問題 cor ase web-inf release dep 解決 executor Maven項目下update maven後Eclipse報錯:java.lang.ClassNotFoundException: ContextLoaderL

Maven項目下updatemaven後Eclipsejava.lang.ClassNotFoundException:ContextLoaderL

cow cs6 lpc doc win ext cli smf ml2 322稼韌黨7赫一刻撞http://shequ.docin.com/txqq_fb1eac74eb F90MUKA虐擦3M5稼鑰http://jz.docin.com/rvo654 慌V1V閱q07

spring MVC java.lang.ClassNotFoundException: org.springframework.web.servlet.DispatcherServlet

重新 chain http1 accesslog instance report ppc springmvc bstr 嚴重: Allocate exception for servlet SpringMVCjava.lang.ClassNotFoundExceptio

openfire 部署後 java.lang.IllegalArgumentException: interface xx is not visible from class loader

exceptio color 本地 java creat 根據 ret 打印 exception 該異常是創建代理時加載接口的類加載器與創建時傳入的不一致。 在本地eclipse做openfire二次開發,本地運行沒錯,部署到服務器上後報異常:      java.lang

maven項目中使用redis集群 java.lang.NumberFormatException: For input string: "7006@17006"

body group fail ted XML enc beans mat art Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [redis.c

java.net.bindexception: address already in use: jvm_bind:8080

lips RoCE ava ESS build 命令 ips 顯示 exception 原因:8080端口被占用 這說明80端口(該端口是Tomcat的監聽端口)已經被其他程序占用,先用命令提示符 " netstat -ano " 命令顯示端口狀態,再在結果中找到端口,然

mysqljava.sql.SQLException: Incorrect string value: ‘xE4xB8x80xE6xACxA1...‘ for column ‘excelName‘ at row 1

play order none png cor lin mysql報錯 mys splay 一、問題 用Eclipse做項目時候報錯 java.sql.SQLException: Incorrect string value: ‘\xE4\xB8\x80\xE6\

解決webservice介面呼叫java.lang.ClassFormatError: Absent Code ... javax/mail/internet/MimeMultip

今天使用java axis呼叫.net釋出的webservice介面報了個錯,排查半天,感覺程式碼邏輯沒問題,最後發現是jar包衝突!!! 呼叫介面相關程式碼: String url="http://xxxx/WebService/ForDxService.asmx?wsdl";

tomcat啟動java.lang.ClassCastException: org.apache.jasper.runtime.ELContextImpl cannot be cast to..

啟動tomcat的專案報錯如下: 解決思路: 1.看到這個錯誤,java.lang.ClassCastException: org.apache.jasper.runtime.ELContextImpl cannot be cast to org.apache.jasper.el.ELC

java.net.bindexception: address already in use: jvm_bind

exceptio exc 原因 報錯 pan use ddr net exce 原因:8080端口被占用 報錯:java.net.bindexception: address already in use: jvm_bind

Javajava.math.BigDecimal cannot be cast to java.lang.String

從資料庫取count、sum等函式的值需要轉化成Integer的時候出現 java.math.BigDecimal cannot be cast to java.lang.String的報錯 錯誤程式碼: Integer.parseInt((String)map.get("id"

【已解決!】spark程式java.lang.IndexOutOfBoundsException: toIndex = 9

該篇文章意於記錄報錯,也給遇到相同問題的小夥伴提供排錯思路!但是本人也沒有什麼好的解決方法,如果有,我會更新此文章 問題已經解決,請大家拉到最下面↓↓↓↓↓ 記錄下報錯: 寫了段spark程式碼,然後報錯了 2018-07-30 17:19:28,854 WARN [task-re

Spark2 Failed to send RPC 5346982634 to /ns1:58312: java.nio.channels.ClosedChannelException

將spark任務執行與yarn上出現以下錯誤: scala> 18/11/21 16:20:11 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHE

sqoopjava.io.IOException: SQLException in nextKeyValu

sqoop從mysql導資料到hive的時候,報錯: java.io.IOException: SQLException in nextKeyValue at org.apache.sqoop.mapreduce.db.DBRecordReader.nextKeyValue(DBRecor

datanodejava.io.IOException:Premature EOF from inputStream

HDSF datanode報錯如下:   原因:檔案操作超租期,實際上就是data stream操作過程中檔案被刪除了,通常是因為Mapred多個task操作同一個檔案,一個task完成後刪掉檔案導致。這個錯誤跟dfs.datanode.max.transfer.thread