1. 程式人生 > 其它 >【spark】讀取高版本的elasticsearch

【spark】讀取高版本的elasticsearch

出現異常棧:

Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
SortAggregate(key=[template_id#166], functions=[last(template_content#167, false), last(highlight_index_template#169, false), last(template_pattern#170, false)], output=[template_id#166, template_content#281, highlight_index#283, template_pattern#285])
+- *(2) Sort [template_id#166 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(template_id#166, 200) +- SortAggregate(key=[template_id#166], functions=[partial_last(template_content#167, false), partial_last(highlight_index_template#169, false), partial_last(template_pattern#170, false)], output=[template_id#166, last#476, valueSet#477, last#478, valueSet#479, last#480, valueSet#481])
+- *(1) Sort [template_id#166 ASC NULLS FIRST], false, 0 +- InMemoryTableScan [template_id#166, template_content#167, highlight_index_template#169, template_pattern#170] +- InMemoryRelation [@hostname#160, @message#161, @path#162, @rownumber#163L, @timestamp#164, _metadata#165, template_id#166, template_content#167, highlight_index#168, highlight_index_template#169, template_pattern#170, @@oId#171, @@oIndex#172, @@oEs#173, @@extractedVars#174], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @hostname), StringType), true, false) AS @hostname#160, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, @message), StringType), true, false) AS @message#161, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, @path), StringType), true, false) AS @path#162, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, @rownumber), LongType) AS @rownumber#163L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, @timestamp), TimestampType), true, false) AS @timestamp#164, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#165, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, template_id), StringType), true, false) AS template_id#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, template_content), StringType), true, false) AS template_content#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, highlight_index), StringType), true, false) AS highlight_index#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, highlight_index_template), StringType), true, false) AS highlight_index_template#169, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, template_pattern), StringType), true, false) AS template_pattern#170, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, @@oId), StringType), true, false) AS @@oId#171, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, @@oIndex), StringType), true, false) AS @@oIndex#172, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, @@oEs), StringType), true, false) AS @@oEs#173, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, @@extractedVars), StringType), true, false) AS @@extractedVars#174] +- *(1) MapElements <function1>, obj#159: org.apache.spark.sql.Row +- *(1) DeserializeToObject createexternalrow(@hostname#0.toString, @message#127.toString, @path#2.toString, @rownumber#3L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, @timestamp#12, true, false), staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, lambdavariable(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, true).toString, _metadata#5.keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, lambdavariable(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, true).toString, _metadata#5.valueArray, None).array, true, false), true, false), template_id#19.toString, template_content#27.toString, highlight_index#36.toString, highlight_index_template#46.toString, template_pattern#57.toString, @@oId#69.toString, @@oIndex#82.toString, @@oEs#96.toString, @@extractedVars#111.toString, StructField(@hostname,StringType,true), StructField(@message,StringType,true), StructField(@path,StringType,true), StructField(@rownumber,LongType,true), StructField(@timestamp,TimestampType,true), StructField(_metadata,MapType(StringType,StringType,true),true), StructField(template_id,StringType,false), StructField(template_content,StringType,false), StructField(highlight_index,StringType,false), ... 6 more fields), obj#158: org.apache.spark.sql.Row +- *(1) Project [@hostname#0, substring(@message#1, 0, 1000) AS @message#127, @path#2, @rownumber#3L, UDF(@timestamp#4, yyyy-MM-dd'T'HH:mm:ss.SSSZ) AS @timestamp#12, _metadata#5, -3 AS template_id#19, AS template_content#27, AS highlight_index#36, AS highlight_index_template#46, AS template_pattern#57, AS @@oId#69, AS @@oIndex#82, 192.168.101.65:9200 AS @@oEs#96, AS @@extractedVars#111] +- *(1) Scan ElasticsearchRelation(Map(es.query -> {"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"2020-04-01T00:00:00.000+0800"}}},{"range":{"@timestamp":{"lte":"2021-05-31T00:00:00.000+0800"}}}]}}}, es.resource.read -> jiankong.data_eoi_2021_04/, es.read.field.include -> @timestamp,@message,@@oId,@@oIndex,@@oEs,@rownumber,@path,@hostname, es.resource -> jiankong.data_eoi_2021_04/, es.read.metadata -> true, es.nodes -> http://192.168.101.65:9200, es.scroll.size -> 5000),org.apache.spark.sql.SQLContext@62ded874,None) [@rownumber#3L,@message#1,@timestamp#4,@hostname#0,@path#2,_metadata#5] PushedFilters: [], ReadSchema: struct<@rownumber:bigint,@message:string,@timestamp:timestamp,@hostname:string,@path:string,_meta... at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.aggregate.SortAggregateExec.doExecute(SortAggregateExec.scala:75) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3037) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:3035) at org.elasticsearch.spark.sql.EsSparkSQL$.saveToEs(EsSparkSQL.scala:101) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:620) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:107) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at com.eoi.jax.job.spark.sink.ElasticsearchSinkDFJob.build(ElasticsearchSinkDFJob.scala:26) at com.eoi.jax.job.spark.sink.ElasticsearchSinkDFJob.build(ElasticsearchSinkDFJob.scala:18) at com.eoi.jax.core.SparkJobDAGBuilder.buildResultsOfSink(SparkJobDAGBuilder.java:284) ... 13 more Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(template_id#166, 200) +- SortAggregate(key=[template_id#166], functions=[partial_last(template_content#167, false), partial_last(highlight_index_template#169, false), partial_last(template_pattern#170, false)], output=[template_id#166, last#476, valueSet#477, last#478, valueSet#479, last#480, valueSet#481]) +- *(1) Sort [template_id#166 ASC NULLS FIRST], false, 0 +- InMemoryTableScan [template_id#166, template_content#167, highlight_index_template#169, template_pattern#170] +- InMemoryRelation [@hostname#160, @message#161, @path#162, @rownumber#163L, @timestamp#164, _metadata#165, template_id#166, template_content#167, highlight_index#168, highlight_index_template#169, template_pattern#170, @@oId#171, @@oIndex#172, @@oEs#173, @@extractedVars#174], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, @hostname), StringType), true, false) AS @hostname#160, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, @message), StringType), true, false) AS @message#161, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, @path), StringType), true, false) AS @path#162, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, @rownumber), LongType) AS @rownumber#163L, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, @timestamp), TimestampType), true, false) AS @timestamp#164, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData) AS _metadata#165, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, template_id), StringType), true, false) AS template_id#166, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 7, template_content), StringType), true, false) AS template_content#167, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, highlight_index), StringType), true, false) AS highlight_index#168, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, highlight_index_template), StringType), true, false) AS highlight_index_template#169, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 10, template_pattern), StringType), true, false) AS template_pattern#170, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 11, @@oId), StringType), true, false) AS @@oId#171, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 12, @@oIndex), StringType), true, false) AS @@oIndex#172, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 13, @@oEs), StringType), true, false) AS @@oEs#173, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 14, @@extractedVars), StringType), true, false) AS @@extractedVars#174] +- *(1) MapElements <function1>, obj#159: org.apache.spark.sql.Row +- *(1) DeserializeToObject createexternalrow(@hostname#0.toString, @message#127.toString, @path#2.toString, @rownumber#3L, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, @timestamp#12, true, false), staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface scala.collection.Map), toScalaMap, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, lambdavariable(MapObjects_loopValue46, MapObjects_loopIsNull46, StringType, true).toString, _metadata#5.keyArray, None).array, true, false), staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, lambdavariable(MapObjects_loopValue47, MapObjects_loopIsNull47, StringType, true).toString, _metadata#5.valueArray, None).array, true, false), true, false), template_id#19.toString, template_content#27.toString, highlight_index#36.toString, highlight_index_template#46.toString, template_pattern#57.toString, @@oId#69.toString, @@oIndex#82.toString, @@oEs#96.toString, @@extractedVars#111.toString, StructField(@hostname,StringType,true), StructField(@message,StringType,true), StructField(@path,StringType,true), StructField(@rownumber,LongType,true), StructField(@timestamp,TimestampType,true), StructField(_metadata,MapType(StringType,StringType,true),true), StructField(template_id,StringType,false), StructField(template_content,StringType,false), StructField(highlight_index,StringType,false), ... 6 more fields), obj#158: org.apache.spark.sql.Row +- *(1) Project [@hostname#0, substring(@message#1, 0, 1000) AS @message#127, @path#2, @rownumber#3L, UDF(@timestamp#4, yyyy-MM-dd'T'HH:mm:ss.SSSZ) AS @timestamp#12, _metadata#5, -3 AS template_id#19, AS template_content#27, AS highlight_index#36, AS highlight_index_template#46, AS template_pattern#57, AS @@oId#69, AS @@oIndex#82, 192.168.101.65:9200 AS @@oEs#96, AS @@extractedVars#111] +- *(1) Scan ElasticsearchRelation(Map(es.query -> {"query":{"bool":{"must":[{"range":{"@timestamp":{"gte":"2020-04-01T00:00:00.000+0800"}}},{"range":{"@timestamp":{"lte":"2021-05-31T00:00:00.000+0800"}}}]}}}, es.resource.read -> jiankong.data_eoi_2021_04/, es.read.field.include -> @timestamp,@message,@@oId,@@oIndex,@@oEs,@rownumber,@path,@hostname, es.resource -> jiankong.data_eoi_2021_04/, es.read.metadata -> true, es.nodes -> http://192.168.101.65:9200, es.scroll.size -> 5000),org.apache.spark.sql.SQLContext@62ded874,None) [@rownumber#3L,@message#1,@timestamp#4,@hostname#0,@path#2,_metadata#5] PushedFilters: [], ReadSchema: struct<@rownumber:bigint,@message:string,@timestamp:timestamp,@hostname:string,@path:string,_meta... at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391) at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1.apply(SortAggregateExec.scala:77) at org.apache.spark.sql.execution.aggregate.SortAggregateExec$$anonfun$doExecute$1.apply(SortAggregateExec.scala:75) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 57 more Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:159) at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:224) at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:79) at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:78) at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:48) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:321) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 76 more

原因:

低版本的es的node有個角色是data,高版本的node角色又多了data-hot和data-cold之類的屬性。低版本的elasticsearch-hadoop判斷依據是 "data".equals(node),而高版本的判斷條件是node.contains("data")。

解決方案:

spark-es配置es.nodes.data.only:false 即可