整合 KAFKA+Flink 例項(第一部分,趟坑記錄)
2017年後,一大波網路喧囂,說流式處理如何牛叉,如何高大上,抱歉,工作滿負荷,沒空玩那個;
今年疫情隔離在家,無聊,開始學習 KAFKA+Flink ,目前的打算是用爬蟲抓取網頁資料,傳遞到Kafka中,再用Flink計算。
個人性格原因,我不願意過分沉迷於紙質或者電子教程材料,也不是特別喜歡網上某些培訓機構已經過時了的所謂培訓視訊,
喜歡動手直接寫程式碼,所以簡單翻看一點PDF教程,看了兩集“培訓視訊”,也沒說Kafka、flink兩元件咋結合使用,不耐煩,直接開碼(碼農的糙性);
之前我寫過的隨筆已經有在windows上裝Kafka、flink元件了,之前寫了一個入門的Kafka使用程式碼;
算是有簡單的 Kafka使用成功案例;從昨天開始,我開始重新碼程式碼;
我先叨逼叨幾句,順便把趟坑的過程一併寫上,後續我會把相關程式碼以及整體思路,整理好後,一併再發出來。
- 一 叨逼叨
網上的KAFKA+Flink 的例子百分之90都是讀取文字檔案,或者弄個迴圈10000次,查記憶體資料,這兩種案例,來講解Flink的處理機制;
例如: https://www.cnblogs.com/huxi2b/p/7219792.html https://blog.csdn.net/weixin_44575542/article/details/88594773
我迅速瀏覽程式碼後,忍俊不禁,為什麼做10000次的迴圈,超過10000次就不跑了?這是什麼應用場景?
有人說,做一個while(true)迴圈不就得了,是,可以,還是為了達到寫例子而寫程式碼;請問,如果你用了無限迴圈,某天使用者說,我臨時決定,暫停下,過會再跑,你怎麼弄?停掉整個應用?有人又會說,接入前端訊號就行。
因為應用場景決定了應用的架構、功能以及開發的方向,對此我不想抬槓,這個話題就到此結束吧。
- 二 接著 叨逼叨
大多網友傳遞物件時,都是自己手動序列化物件,甚至直接用字串,中間用逗號分隔;例如
1 String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize()); 2 System.out.println("傳送資料-->"+value); 3 producer.send(new ProducerRecord<Object, String>("demo", value), new Callback() { 4 @Override 5 public void onCompletion(RecordMetadata metadata, Exception exception) { 6 if (exception != null) { 7 System.out.println("Failed to send message with exception " + exception); 8 } 9 } 10 });
我看了,就想問,你這物件挺簡單的哈,要是字串物件中的值,本身就有逗號,你咋辦?
另外,有些物件屬性型別複雜,既有String,又有 BigDecimal ,甚至裡面有嵌入 ArrayList物件,你這咋整啊?
不序列化了?或者都toString了,再整一起??
- 三 反省
網友大多使用物件 FlinkKafkaConsumer011 來接收處理Flink資料,說實話,我挺不屑的,感覺都在互相抄程式碼,沒勁;
我第一次弄的時候,用的是 FlinkKafkaConsumer082 ,我誤以為082比011版本高,直到我在某一階段全部報有關 FlinkKafkaConsumer082 的錯誤時,我才開始意識到我的錯誤;
Apache組織命名物件的版本時,真心會讓我混亂,難道82不比11大嗎?難道版本號不是越大越新嗎?最後,事實我告訴自己,真的,真的不是!
所以在抄別人程式碼前,還是自己去官網確定版本吧,甚至都不能信阿里的maven站,當以 artifactId 名稱來搜尋排序時,一定要多往下拉資料看,阿里的排序不是最高版本在最前面,往往既不在最前面,也不在最後面;
- 四 處理異常
截止到現在,我先寫取Kafka資料輸出到本地文字,使用了物件 SingleOutputStreamOperator 但是
出現異常:
17:48:59.084 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 17:48:59.084 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 17:48:59.558 [main] WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set. 17:48:59.558 [main] WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'. 17:48:59.646 [main] DEBUG org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Failed to load web based job submission extension. org.apache.flink.util.FlinkException: The module flink-runtime-web could not be found in the class path. Please add this jar in order to enable web based job submission. at org.apache.flink.runtime.webmonitor.WebMonitorUtils.loadWebSubmissionExtension(WebMonitorUtils.java:192) at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.initializeHandlers(DispatcherRestEndpoint.java:98) at org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:141) at org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:161) at org.apache.flink.runtime.minicluster.MiniCluster.createDispatcherResourceManagerComponents(MiniCluster.java:378) at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:313) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:114) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at com.kafkastudy.kafka01.FlinkDealWithKafka.main(FlinkDealWithKafka.java:93) 17:48:59.808 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - Platform: Windows 17:48:59.814 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false 17:48:59.815 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - Java version: 8 17:48:59.822 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available 17:48:59.826 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available 17:48:59.830 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available 17:48:59.834 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - direct buffer constructor: available 17:48:59.838 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true 17:48:59.838 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9 17:48:59.838 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): available 17:48:59.839 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available 17:48:59.840 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: C:\Users\ADMINI~1\AppData\Local\Temp (java.io.tmpdir) 17:48:59.841 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model) 17:48:59.848 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: 1888485376 bytes 17:48:59.849 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1 17:48:59.855 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.CleanerJava6 - java.nio.ByteBuffer.cleaner(): available 17:48:59.856 [main] DEBUG org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
。。。。此處省略10萬字。。。。。。。。。。。。
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 17:49:02.797 [Map -> Sink: Unnamed (3/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (3/4) (98cf4b7fc2b591cf937bc7a97aab620b). 17:49:02.797 [Map -> Sink: Unnamed (3/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (3/4) network resources (state: FAILED). 17:49:02.797 [Map -> Sink: Unnamed (3/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (3/4) (98cf4b7fc2b591cf937bc7a97aab620b): Releasing org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@18cbdfe5. 17:49:02.797 [Map -> Sink: Unnamed (3/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition 6bf24c587ed797fcbedd1d758500d61c@fda750c24ba9ee2e3576ba73b3fe76ef [PIPELINED_BOUNDED, 4 subpartitions, 3 pending consumptions]: Received consumed notification for subpartition 2. 17:49:02.805 [Map -> Sink: Unnamed (4/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 17:49:02.806 [Map -> Sink: Unnamed (4/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80). 17:49:02.806 [Map -> Sink: Unnamed (4/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (4/4) network resources (state: FAILED). 17:49:02.806 [Map -> Sink: Unnamed (4/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80): Releasing org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@4e47cea0. 17:49:02.806 [Map -> Sink: Unnamed (4/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition 6bf24c587ed797fcbedd1d758500d61c@fda750c24ba9ee2e3576ba73b3fe76ef [PIPELINED_BOUNDED, 4 subpartitions, 2 pending consumptions]: Received consumed notification for subpartition 3. 17:49:02.806 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 17:49:02.807 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7). 17:49:02.807 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (1/4) network resources (state: FAILED). 17:49:02.807 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7): Releasing org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@d04e29b. 17:49:02.807 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition 6bf24c587ed797fcbedd1d758500d61c@fda750c24ba9ee2e3576ba73b3fe76ef [PIPELINED_BOUNDED, 4 subpartitions, 1 pending consumptions]: Received consumed notification for subpartition 0. 17:49:02.811 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking Map -> Sink: Unnamed (2/4) 17:49:02.811 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(2/4) with empty state. 17:49:02.811 [Map -> Sink: Unnamed (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 17:49:02.812 [Map -> Sink: Unnamed (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2). 17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (2/4) network resources (state: FAILED). 17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate - Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2): Releasing org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@54ec612c. 17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - ReleaseOnConsumptionResultPartition 6bf24c587ed797fcbedd1d758500d61c@fda750c24ba9ee2e3576ba73b3fe76ef [PIPELINED_BOUNDED, 4 subpartitions, 0 pending consumptions]: Received consumed notification for subpartition 1. 17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Received consume notification from ReleaseOnConsumptionResultPartition 6bf24c587ed797fcbedd1d758500d61c@fda750c24ba9ee2e3576ba73b3fe76ef [PIPELINED_BOUNDED, 4 subpartitions, 0 pending consumptions]. 17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Releasing ReleaseOnConsumptionResultPartition 6bf24c587ed797fcbedd1d758500d61c@fda750c24ba9ee2e3576ba73b3fe76ef [PIPELINED_BOUNDED, 4 subpartitions, 0 pending consumptions]. 17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#0 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false]. 17:49:02.812 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#1 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false]. 17:49:02.813 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#2 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false]. 17:49:02.813 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef): Released PipelinedSubpartition#3 [number of buffers: 0 (0 bytes), number of buffers in backlog: 0, finished? false, read view? false]. 17:49:02.814 [Map -> Sink: Unnamed (2/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartitionManager - Released partition 6bf24c587ed797fcbedd1d758500d61c produced by fda750c24ba9ee2e3576ba73b3fe76ef. 17:49:02.828 [Map -> Sink: Unnamed (3/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (3/4) (98cf4b7fc2b591cf937bc7a97aab620b) [FAILED] 17:49:02.845 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 17:49:02.845 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef). 17:49:02.845 [Source: Custom Source (1/1)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Source: Custom Source (1/1) network resources (state: FAILED). 17:49:02.845 [Source: Custom Source (1/1)] DEBUG org.apache.flink.runtime.io.network.TaskEventDispatcher - unregistering 6bf24c587ed797fcbedd1d758500d61c@fda750c24ba9ee2e3576ba73b3fe76ef 17:49:02.854 [Map -> Sink: Unnamed (2/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2) [FAILED] 17:49:02.864 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (1/4) (617855a53333899f9ea43b4a5cbf89d7) [FAILED] 17:49:02.872 [Map -> Sink: Unnamed (4/4)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Map -> Sink: Unnamed (4/4) (6aac35ea1e046a7f786795f9f10aca80) [FAILED] 17:49:02.875 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed c54ff28542a5211674c06383230451d2. 17:49:02.882 [Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source (1/1) (fda750c24ba9ee2e3576ba73b3fe76ef) [FAILED] 17:49:02.916 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 617855a53333899f9ea43b4a5cbf89d7. 17:49:02.922 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 6aac35ea1e046a7f786795f9f10aca80. 17:49:02.925 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 98cf4b7fc2b591cf937bc7a97aab620b. 17:49:02.929 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source fda750c24ba9ee2e3576ba73b3fe76ef. 17:49:02.943 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (c54ff28542a5211674c06383230451d2) switched from RUNNING to FAILED. java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 17:49:02.944 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha (cf694450927f961839bafbb133deb26a) switched from state RUNNING to FAILING. java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:259) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:901) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:415) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 。。。。。。。。此處省略10萬字。。。。。。。。。 17:49:03.393 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547) ... 26 common frames omitted 17:49:03.397 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547) ... 26 common frames omitted 17:49:03.399 [FileCache shutdown hook] INFO org.apache.flink.runtime.filecache.FileCache - removed file cache directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-dist-cache-bb7b3455-c09d-4f72-b976-700dbe04fa8b 17:49:03.400 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547) ... 26 common frames omitted 17:49:03.402 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547) ... 26 common frames omitted 17:49:03.404 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) java.lang.reflect.InvocationTargetException: null at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547) ... 26 common frames omitted 17:49:03.405 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution ca30abbed6fe422e447a2317fd6acebd. 17:49:03.406 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 55c3c6e5f98f094b513124cfa9d2e535. 17:49:03.406 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution 55c3c6e5f98f094b513124cfa9d2e535. 17:49:03.406 [flink-akka.actor.default-dispatcher-2] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Cannot find task to stop for execution ca30abbed6fe422e447a2317fd6acebd. 17:49:03.412 [BlobServer shutdown hook] INFO org.apache.flink.runtime.blob.BlobServer - Stopped BLOB server at 0.0.0.0:60925 17:49:03.413 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (b80fb3c7568cd61fb19014367644963a) switched from DEPLOYING to FAILED. java.util.concurrent.CompletionException: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.actor.ActorRef.tell(ActorRef.scala:126) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:285) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) ... 21 common frames omitted 17:49:03.414 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha (cf694450927f961839bafbb133deb26a) switched from state RUNNING to FAILING. java.util.concurrent.CompletionException: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871) at akka.dispatch.OnComplete.internal(Future.scala:263) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.actor.ActorRef.tell(ActorRef.scala:126) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:285) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.IllegalStateException: TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore. at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.localStateStoreForSubtask(TaskExecutorLocalStateStoresManager.java:110) at org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(TaskExecutor.java:547) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) ... 21 common frames omitted 17:49:03.421 [IOManagerAsync shutdown hook] INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl - FileChannelManager removed spill file directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-io-d51c6c45-5dc5-4838-885c-57cba32545a8 Process finished with exit code 130
其實就兩個,其他先不著急關注;
注意:
java.lang.NoSuchMethodError: org.apache.commons.io.IOUtils.closeQuietly(Ljava/io/Closeable;)V
我看到它覺得有些親切,讓我回味以前我使用過的Apache的 IOUtils 包,它是那麼的粗壯,尤其是它的讀寫速度
處理方式:
因為原來我這個程式最早用於網路爬蟲,所以使用了Apache的IO包,我的pom.xml就有如下依賴
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency>
估計是我這個commons-io 版本太老了,沒有對應Flink需要的那個方法,所以,我乾脆去掉,讓maven自己獲取最新的依賴;
再跑一次,那個異常不在了;
另外,加入了
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>1.9.2</version> <scope>provided</scope> </dependency>
我想理論上能夠看到Flink的WEB管理頁面;暫時沒調通,以後玩;
接著出現如下問題
- 五 再次爆出異常
6) switched from CANCELING to CANCELED. 18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring transition of vertex Map -> Sink: Unnamed (3/4) - execution #64 to FAILED while being CANCELED. 18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Releasing slot [SlotRequestId{42b886c8702b236d3f8170ef7539f93a}] because: Release multi task slot because all children have been released. 18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Adding returned slot [e7657f78097e6f28e444f35fdc894b1f] to available slots 18:44:04.775 [flink-akka.actor.default-dispatcher-6] DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job haha haha haha (b2bddb8ca16ac234d435cdecc01659fa) if no longer possible. org.apache.kafka.common.config.ConfigException: Invalid value true for configuration auto.offset.reset: Expected value to be a string, but it was a java.lang.Boolean at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:664) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:473) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:466) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:544) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:644) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 18:44:04.775 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha (b2bddb8ca16ac234d435cdecc01659fa) switched from state FAILING to RESTARTING. 18:44:04.775 [flink-akka.actor.default-dispatcher-6] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Restarting the job haha haha haha (b2bddb8ca16ac234d435cdecc01659fa). ------。。。。。。。。。。。此處省略10萬字。。。。。。。。。。。。----------
18:44:04.787 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.io.network.partition.ResultPartition - Created PipelinedSubpartitionView(index: 0) of ResultPartition cc50ab4ffff5ddce300f27d054234a49@00d802a0655bc54031f3d5224b3cecde 18:44:04.787 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from DEPLOYING to RUNNING. 18:44:04.787 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing Map -> Sink: Unnamed (1/4). 18:44:04.787 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 18:44:04.786 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Found existing local state store for b2bddb8ca16ac234d435cdecc01659fa - 20ba6b65f97481d5570070de90e4e791 - 2 under allocation id e7657f78097e6f28e444f35fdc894b1f: org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@655e9c77 18:44:04.789 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory - Map -> Sink: Unnamed (3/4) (6c2a4ed7a7a5f3340c223b8ff8abde09): Created 1 input channels (local: 1, remote: 0, unknown: 0). 18:44:04.790 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Map -> Sink: Unnamed (3/4). 18:44:04.786 [Source: Custom Source (1/1)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing Source: Custom Source (1/1). 18:44:04.790 [Source: Custom Source (1/1)] INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 18:44:04.786 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (00d802a0655bc54031f3d5224b3cecde) switched from DEPLOYING to RUNNING. 18:44:04.790 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from DEPLOYING to RUNNING. 18:44:04.791 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager - Found existing local state store for b2bddb8ca16ac234d435cdecc01659fa - 20ba6b65f97481d5570070de90e4e791 - 3 under allocation id 9a9df885ea71c266458b3e8f0b91680b: org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@e529e4 18:44:04.791 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory - Map -> Sink: Unnamed (4/4) (7bced7a815a0d1395dd4ae86fbeb2d4d): Created 1 input channels (local: 1, remote: 0, unknown: 0). 18:44:04.791 [flink-akka.actor.default-dispatcher-4] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task Map -> Sink: Unnamed (4/4). 18:44:04.792 [Source: Custom Source (1/1)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking Source: Custom Source (1/1) 18:44:04.792 [Source: Custom Source (1/1)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1) with empty state. 18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking Map -> Sink: Unnamed (1/4) 18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/4) with empty state. 18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating operator state backend for StreamMap_20ba6b65f97481d5570070de90e4e791_(1/4) with empty state. 18:44:04.792 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.api.common.io.FileOutputFormat - Opening stream for output (1/4). WriteMode=NO_OVERWRITE, OutputDirectoryMode=PARONLY 18:44:04.792 [Source: Custom Source (1/1)] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic 18:44:04.792 [Source: Custom Source (1/1)] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 18:44:04.793 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from RUNNING to FAILED. java.nio.file.FileAlreadyExistsException: File already exists: D:/temp/flink.txt/1 at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:264) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248) at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 18:44:04.794 [Map -> Sink: Unnamed (1/4)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d). 18:44:04.794 [Map -> Sink: Unnamed (1/4)] DEBUG org.apache.flink.runtime.taskmanager.Task - Release task Map -> Sink: Unnamed (1/4) network resources (state: FAILED).
......................此處省略10萬字...........................
8:44:04.795 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (2/4) (eb60a3819acf31d006784b95bb003336) switched from DEPLOYING to RUNNING. 18:44:04.795 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and sending final execution state FAILED to JobManager for task Map -> Sink: Unnamed 4451b752c6d7962b0e1bc728e050935d. 18:44:04.796 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Map -> Sink: Unnamed (1/4) (4451b752c6d7962b0e1bc728e050935d) switched from RUNNING to FAILED. java.nio.file.FileAlreadyExistsException: File already exists: D:/temp/flink.txt/1 at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:264) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248) at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) 18:44:04.796 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job haha haha haha (b2bddb8ca16ac234d435cdecc01659fa) switched from state RUNNING to FAILING. java.nio.file.FileAlreadyExistsException: File already exists: D:/temp/flink.txt/1 at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:264) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248) at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:65) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:745) ......................此處省略10萬字...........................
18:44:06.042 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Deploying Map -> Sink: Unnamed (4/4) (attempt #86) to 7bd8e6bd-ed0d-4db4-89a3-13dbdfe20ea3 @ 127.0.0.1 (dataPort=-1) 18:44:06.045 [flink-akka.actor.default-dispatcher-4] DEBUG org.apache.flink.runtime.rpc.akka.AkkaRpcActor - Reporting back error thrown in remote procedure public java.util.concurrent.CompletableFuture org.apache.flink.runtime.taskexecutor.TaskExecutor.submitTask(org.apache.flink.runtime.deployment.TaskDeploymentDescriptor,org.apache.flink.runtime.jobmaster.JobMasterId,org.apache.flink.api.common.time.Time) java.lang.reflect.InvocationTargetException: null at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517) at akka.actor.Actor$class.aroundReceive(Actor.scala) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225)