Spark向Elasticsearch批量導入數據,出現重復的問題定位
阿新 • • 發佈:2018-05-15
spark elasticsearch 超時重傳 看了下es-hadoop插件的源碼:
發現ES導入數據重試情況的發生,除了在es.batch.write.retry.policy參數默認開啟且es-hadoop插件向ES集群發送不bulk寫入請求接受到503響應碼會重試3次室外。
本身執行http請求時,也會存在重試(hadoop/rest/NetworkClient.java):
public Response execute(Request request) { Response response = null; boolean newNode; do { SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body()); newNode = false; try { response = currentTransport.execute(routedRequest); ByteSequence body = routedRequest.body(); if (body != null) { stats.bytesSent += body.length(); } } catch (Exception ex) { // configuration error - including SSL/PKI - bail out if (ex instanceof EsHadoopIllegalStateException) { throw (EsHadoopException) ex; } // issues with the SSL handshake, bail out instead of retry, for security reasons if (ex instanceof javax.net.ssl.SSLException) { throw new EsHadoopTransportException(ex); } // check for fatal, non-recoverable network exceptions if (ex instanceof BindException) { throw new EsHadoopTransportException(ex); } if (log.isTraceEnabled()) { log.trace( String.format( "Caught exception while performing request [%s][%s] - falling back to the next node in line...", currentNode, request.path()), ex); } String failed = currentNode; failedNodes.put(failed, ex); newNode = selectNextNode(); log.error(String.format("Node [%s] failed (%s); " + (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."), failed, ex.getMessage())); if (!newNode) { throw new EsHadoopNoNodesLeftException(failedNodes); } } } while (newNode); return response; }
當請求出現超時的情況時,es-hadoop插件會再請求一個ES節點發送寫入請求。即導入插件認為當前插入節點超時了(默認是一分鐘)就視為該節點不可用,就換下一個節點,其實是ES在一分鐘內沒有處理完插入任務。
將超時時間es.http.timeout參數調大之後,給ES留下充足的入庫時間,就不會再發生這個問題了。
Spark向Elasticsearch批量導入數據,出現重復的問題定位