solr SearchHandler擴充套件,解決httpclient連線問題以及對連線異常的容錯處理
阿新 • • 發佈:2019-01-30
solr 1.4在分散式搜尋時,如果有一臺機請求失敗,預設會重連3次,如果還是失敗,那麼整個結果就會丟擲異常。
異常如下:
2012-8-28 11:46:04 org.apache.commons.httpclient.HttpMethodDirector executeWithRetry 資訊: I/O exception (java.net.ConnectException) caught when processing request: Connection refused 2012-8-28 11:46:04 org.apache.commons.httpclient.HttpMethodDirector executeWithRetry 資訊: Retrying request 2012-8-28 11:46:04 org.apache.commons.httpclient.HttpMethodDirector executeWithRetry 資訊: I/O exception (java.net.ConnectException) caught when processing request: Connection refused 2012-8-28 11:46:04 org.apache.commons.httpclient.HttpMethodDirector executeWithRetry 資訊: Retrying request 2012-8-28 11:46:04 org.apache.commons.httpclient.HttpMethodDirector executeWithRetry 資訊: I/O exception (java.net.ConnectException) caught when processing request: Connection refused 2012-8-28 11:46:04 org.apache.commons.httpclient.HttpMethodDirector executeWithRetry 資訊: Retrying request 2012-8-28 11:46:04 org.apache.solr.common.SolrException log 嚴重: org.apache.solr.common.SolrException: org.apache.solr.client.solrj.SolrServerException: java.net.ConnectException: Connection refused at org.apache.solr.handler.component.SearchHandlerExt.handleRequestBody(SearchHandlerExt.java:320) at org.apache.solr.handler.RequestHandlerBase.handleRequest(RequestHandlerBase.java:131) at org.apache.solr.core.SolrCore.execute(SolrCore.java:1316) at org.apache.solr.servlet.SolrDispatchFilter.execute(SolrDispatchFilter.java:338) at org.apache.solr.servlet.SolrDispatchFilter.doFilter(SolrDispatchFilter.java:241) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1089) at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:365) at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:181) at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:712) at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:139) at org.mortbay.jetty.Server.handle(Server.java:285) at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:502) at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:821) at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:513) at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:208) at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:378) at org.mortbay.jetty.bio.SocketConnector$Connection.run(SocketConnector.java:226) at org.mortbay.thread.BoundedThreadPool$PoolThread.run(BoundedThreadPool.java:442) Caused by: org.apache.solr.client.solrj.SolrServerException: java.net.ConnectException: Connection refused at org.apache.solr.client.solrj.impl.CommonsHttpSolrServer.request(CommonsHttpSolrServer.java:486) at org.apache.solr.client.solrj.impl.CommonsHttpSolrServer.request(CommonsHttpSolrServer.java:244) at org.apache.solr.handler.component.HttpCommComponentExt$1.call(SearchHandlerExt.java:471) at org.apache.solr.handler.component.HttpCommComponentExt$1.call(SearchHandlerExt.java:1) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:680) Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:351) at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:213) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:200) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:432) at java.net.Socket.connect(Socket.java:529) at java.net.Socket.connect(Socket.java:478) at java.net.Socket.<init>(Socket.java:375) at java.net.Socket.<init>(Socket.java:249) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.open(MultiThreadedHttpConnectionManager.java:1361) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) at org.apache.solr.client.solrj.impl.CommonsHttpSolrServer.request(CommonsHttpSolrServer.java:430) ... 11 more 2012-8-28 11:46:04 org.apache.solr.core.SolrCore execute 資訊: [] webapp=/solr path=/select/ params={qt=standard2&tie=0&shards.tolerant=false&q=美女} status=500 QTime=486 2012-8-28 11:46:04 org.apache.solr.core.SolrCore execute 資訊: [] webapp=/solr path=/select params={tie=0&shards.qt=single&qf=title^2+tag^1&wt=javabin&shards.tolerant=false&rows=10&defType=dismax&version=1&debugQuery=false&fl=id,score&start=0&q=美女&bf=10&q.op=AND&timeAllowed=2000&qt=single&isShard=true&fsv=true} hits=9 status=0 QTime=50
所以後面我改為配置方式來設定重連次數,預設為1次,然後如果還丟擲異常,照樣返回結果,只是拋掉了異常的那部分。。當然結果肯定會少一些,但有結果比沒有結果更好。。
繼上一篇文章
寫的擴充套件,廢話少說,先將擴充套件的程式碼貼出來。。。。
package org.apache.solr.handler.component; import java.text.ParseException; import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.commons.httpclient.params.HttpMethodParams; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.client.solrj.SolrServer; import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.RTimer; import org.apache.solr.common.util.SimpleOrderedMap; import org.apache.solr.core.SolrConfig; import org.apache.solr.core.SolrCore; import org.apache.solr.handler.RequestHandlerBase; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryResponse; import org.apache.solr.util.plugin.SolrCoreAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author yzygenuine * */ public class SearchHandlerExt extends RequestHandlerBase implements SolrCoreAware { static final String INIT_COMPONENTS = "components"; static final String INIT_FIRST_COMPONENTS = "first-components"; static final String INIT_LAST_COMPONENTS = "last-components"; // socket timeout measured in ms, closes a socket if read // takes longer than x ms to complete. throws // java.net.SocketTimeoutException: Read timed out exception static final String INIT_SO_TIMEOUT = "shard-socket-timeout"; static final String INIT_MAX_CONNECTIONS_PERHOST = "maxConnectionsPerHost"; static final String INIT_MAX_TOTAL_CONNECTIONS = "maxTotalConnections"; // connection timeout measures in ms, closes a socket if connection // cannot be established within x ms. with a // java.net.SocketTimeoutException: Connection timed out static final String INIT_CONNECTION_TIMEOUT = "shard-connection-timeout"; static final String RETRY_COUNT = "retryCount"; static int soTimeout = 0; // current default values static int connectionTimeout = 0; // current default values protected static Logger log = LoggerFactory.getLogger(SearchHandlerExt.class); protected List<SearchComponent> components = null; /** * 新增加的引數 預設值為50 */ static Integer maxConnectionsPerHost = 50; static Integer maxTotalConnections = 10000; static Integer retryCount = 1; protected List<String> getDefaultComponents() { ArrayList<String> names = new ArrayList<String>(6); names.add(QueryComponent.COMPONENT_NAME); names.add(FacetComponent.COMPONENT_NAME); names.add(MoreLikeThisComponent.COMPONENT_NAME); names.add(HighlightComponent.COMPONENT_NAME); names.add(StatsComponent.COMPONENT_NAME); names.add(DebugComponent.COMPONENT_NAME); return names; } /** * Initialize the components based on name. Note, if using * {@link #INIT_FIRST_COMPONENTS} or {@link #INIT_LAST_COMPONENTS}, then the * {@link DebugComponent} will always occur last. If this is not desired, * then one must explicitly declare all components using the * {@link #INIT_COMPONENTS} syntax. */ @SuppressWarnings("unchecked") public void inform(SolrCore core) { Object declaredComponents = initArgs.get(INIT_COMPONENTS); List<String> first = (List<String>) initArgs.get(INIT_FIRST_COMPONENTS); List<String> last = (List<String>) initArgs.get(INIT_LAST_COMPONENTS); List<String> list = null; boolean makeDebugLast = true; if (declaredComponents == null) { // Use the default component list list = getDefaultComponents(); if (first != null) { List<String> clist = first; clist.addAll(list); list = clist; } if (last != null) { list.addAll(last); } } else { list = (List<String>) declaredComponents; if (first != null || last != null) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "First/Last components only valid if you do not declare 'components'"); } makeDebugLast = false; } // Build the component list components = new ArrayList<SearchComponent>(list.size()); DebugComponent dbgCmp = null; for (String c : list) { SearchComponent comp = core.getSearchComponent(c); if (comp instanceof DebugComponent && makeDebugLast == true) { dbgCmp = (DebugComponent) comp; } else { components.add(comp); log.info("Adding component:" + comp); } } if (makeDebugLast == true && dbgCmp != null) { components.add(dbgCmp); log.info("Adding debug component:" + dbgCmp); } SolrConfig config = core.getSolrConfig(); maxConnectionsPerHost = config.getInt(INIT_MAX_CONNECTIONS_PERHOST, 100); maxTotalConnections = config.getInt(INIT_MAX_TOTAL_CONNECTIONS, 10000); connectionTimeout = config.getInt(INIT_CONNECTION_TIMEOUT, 2000); soTimeout = config.getInt(INIT_SO_TIMEOUT, 2000); retryCount = config.getInt(RETRY_COUNT, 1); } public List<SearchComponent> getComponents() { return components; } @Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception, ParseException, InstantiationException, IllegalAccessException { // int sleep = req.getParams().getInt("sleep",0); // if (sleep > 0) {log.error("SLEEPING for " + sleep); // Thread.sleep(sleep);} ResponseBuilder rb = new ResponseBuilder(); rb.req = req; rb.rsp = rsp; rb.components = components; rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false)); final RTimer timer = rb.isDebug() ? new RTimer() : null; if (timer == null) { // non-debugging prepare phase for (SearchComponent c : components) { c.prepare(rb); } } else { // debugging prepare phase RTimer subt = timer.sub("prepare"); for (SearchComponent c : components) { rb.setTimer(subt.sub(c.getName())); c.prepare(rb); rb.getTimer().stop(); } subt.stop(); } if (rb.shards == null) { // a normal non-distributed request // The semantics of debugging vs not debugging are different enough // that // it makes sense to have two control loops if (!rb.isDebug()) { // Process for (SearchComponent c : components) { c.process(rb); } } else { // Process RTimer subt = timer.sub("process"); for (SearchComponent c : components) { rb.setTimer(subt.sub(c.getName())); c.process(rb); rb.getTimer().stop(); } subt.stop(); timer.stop(); // add the timing info if (rb.getDebugInfo() == null) { rb.setDebugInfo(new SimpleOrderedMap<Object>()); } rb.getDebugInfo().add("timing", timer.asNamedList()); } } else { // a distributed request HttpCommComponentExt comm = new HttpCommComponentExt(); if (rb.outgoing == null) { rb.outgoing = new LinkedList<ShardRequest>(); } rb.finished = new ArrayList<ShardRequest>(); int nextStage = 0; do { rb.stage = nextStage; nextStage = ResponseBuilder.STAGE_DONE; // call all components for (SearchComponent c : components) { // the next stage is the minimum of what all components // report nextStage = Math.min(nextStage, c.distributedProcess(rb)); } // check the outgoing queue and send requests while (rb.outgoing.size() > 0) { // submit all current request tasks at once while (rb.outgoing.size() > 0) { ShardRequest sreq = rb.outgoing.remove(0); sreq.actualShards = sreq.shards; if (sreq.actualShards == ShardRequest.ALL_SHARDS) { sreq.actualShards = rb.shards; } sreq.responses = new ArrayList<ShardResponse>(); // TODO: map from shard to address[] for (String shard : sreq.actualShards) { ModifiableSolrParams params = new ModifiableSolrParams(sreq.params); params.remove(ShardParams.SHARDS); // not a // top-level // request params.remove("indent"); params.remove(CommonParams.HEADER_ECHO_PARAMS); params.set(ShardParams.IS_SHARD, true); // a sub // (shard) // request String shardHandler = req.getParams().get(ShardParams.SHARDS_QT); if (shardHandler == null) { params.remove(CommonParams.QT); } else { params.set(CommonParams.QT, shardHandler); } comm.submit(sreq, shard, params); } } // now wait for replies, but if anyone puts more requests on // the outgoing queue, send them out immediately (by exiting // this loop) boolean tolerant = rb.req.getParams().getBool("shards.tolerant", false); while (rb.outgoing.size() == 0) { ShardResponse srsp = comm.takeCompletedOrError(!tolerant); if (srsp == null) break; // no more requests to wait for // Was there an exception? If so, abort everything and // rethrow if (srsp.getException() != null) { if (!tolerant) { comm.cancelAll(); if (srsp.getException() instanceof SolrException) { throw (SolrException) srsp.getException(); } else { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException()); } } } rb.finished.add(srsp.getShardRequest()); // let the components see the responses to the request for (SearchComponent c : components) { c.handleResponses(rb, srsp.getShardRequest()); } } } for (SearchComponent c : components) { c.finishStage(rb); } // we are done when the next stage is MAX_VALUE } while (nextStage != Integer.MAX_VALUE); } } // ////////////////////// SolrInfoMBeans methods ////////////////////// @Override public String getDescription() { StringBuilder sb = new StringBuilder(); sb.append("Search using components: "); if (components != null) { for (SearchComponent c : components) { sb.append(c.getName()); sb.append(","); } } return sb.toString(); } @Override public String getVersion() { return "$Revision: 766412 $"; } @Override public String getSourceId() { return "$Id: SearchHandler.java 766412 2009-04-19 01:31:02Z koji $"; } @Override public String getSource() { return "$URL: https://svn.apache.org/repos/asf/lucene/solr/branches/branch-1.4/src/java/org/apache/solr/handler/component/SearchHandler.java $"; } } // TODO: generalize how a comm component can fit into search component framework // TODO: statics should be per-core singletons class HttpCommComponentExt { // We want an executor that doesn't take up any resources if // it's not used, so it could be created statically for // the distributed search component if desired. // // Consider CallerRuns policy and a lower max threads to throttle // requests at some point (or should we simply return failure?) static Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, // terminate // idle // threads // after // 5 // sec new SynchronousQueue<Runnable>() // directly hand off tasks ); static HttpClient client; static { MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager(); mgr.getParams().setDefaultMaxConnectionsPerHost(SearchHandlerExt.maxConnectionsPerHost); mgr.getParams().setMaxTotalConnections(SearchHandlerExt.maxTotalConnections); mgr.getParams().setConnectionTimeout(SearchHandlerExt.connectionTimeout); mgr.getParams().setSoTimeout(SearchHandlerExt.soTimeout); client = new HttpClient(mgr); client.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(SearchHandlerExt.retryCount, true)); SearchHandlerExt.log.info("###################################httpclient.maxConnectionsPerHost:" + SearchHandlerExt.maxConnectionsPerHost + ",httpclient.maxTotalConnections:" + SearchHandlerExt.maxTotalConnections + ",httpclient.retrycount:" + SearchHandlerExt.retryCount); } CompletionService<ShardResponse> completionService = new ExecutorCompletionService<ShardResponse>(commExecutor); Set<Future<ShardResponse>> pending = new HashSet<Future<ShardResponse>>(); HttpCommComponentExt() { } private static class SimpleSolrResponse extends SolrResponse { long elapsedTime; NamedList<Object> nl; @Override public long getElapsedTime() { return elapsedTime; } @Override public NamedList<Object> getResponse() { return nl; } @Override public void setResponse(NamedList<Object> rsp) { nl = rsp; } } void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) { Callable<ShardResponse> task = new Callable<ShardResponse>() { public ShardResponse call() throws Exception { ShardResponse srsp = new ShardResponse(); srsp.setShardRequest(sreq); srsp.setShard(shard); SimpleSolrResponse ssr = new SimpleSolrResponse(); srsp.setSolrResponse(ssr); long startTime = System.currentTimeMillis(); try { // String url = "http://" + shard + "/select"; String url = "http://" + shard; params.remove(CommonParams.WT); // use default (currently // javabin) params.remove(CommonParams.VERSION); SolrServer server = new CommonsHttpSolrServer(url, client); // SolrRequest req = new // QueryRequest(SolrRequest.METHOD.POST, "/select"); // use generic request to avoid extra processing of queries QueryRequest req = new QueryRequest(params); req.setMethod(SolrRequest.METHOD.POST); // no need to set the response parser as binary is the // default // req.setResponseParser(new BinaryResponseParser()); // srsp.rsp = server.request(req); // srsp.rsp = server.query(sreq.params); ssr.nl = server.request(req); } catch (Throwable th) { srsp.setException(th); if (th instanceof SolrException) { srsp.setResponseCode(((SolrException) th).code()); } else { srsp.setResponseCode(-1); } } ssr.elapsedTime = System.currentTimeMillis() - startTime; return srsp; } }; pending.add(completionService.submit(task)); } /** * returns a ShardResponse of the last response correlated with a * ShardRequest */ ShardResponse take() { while (pending.size() > 0) { try { Future<ShardResponse> future = completionService.take(); pending.remove(future); ShardResponse rsp = future.get(); rsp.getShardRequest().responses.add(rsp); if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) { return rsp; } } catch (InterruptedException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } catch (ExecutionException e) { // should be impossible... the problem with catching the // exception // at this level is we don't know what ShardRequest it applied // to throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e); } } return null; } /** * returns a ShardResponse of the last response correlated with a * ShardRequest, or immediately returns a ShardResponse if there was an * error detected */ ShardResponse takeCompletedOrError(boolean bailOnError) { int i = 0; while (pending.size() > 0) { try { Future<ShardResponse> future = completionService.take(); pending.remove(future); ++i; ShardResponse rsp = future.get(); if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately // add response to the response list... we do this after the // take() and // not after the completion of "call" so we know when the last // response // for a request was received. Otherwise we might return the // same // request more than once. if (rsp.getException() == null) { rsp.getShardRequest().responses.add(rsp); } if (i == rsp.getShardRequest().actualShards.length) { return rsp; } // if (rsp.getShardRequest().responses.size() == // rsp.getShardRequest().actualShards.length) { // return rsp; // } } catch (InterruptedException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } catch (ExecutionException e) { // should be impossible... the problem with catching the // exception // at this level is we don't know what ShardRequest it applied // to throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e); } } return null; } void cancelAll() { for (Future<ShardResponse> future : pending) { // TODO: any issues with interrupting? shouldn't be if // there are finally blocks to release connections. future.cancel(true); } } }
然後在solrconfig.xml上配置 httpclient相關配置
<retryCount>0</retryCount> <maxConnectionsPerHost>10</maxConnectionsPerHost> <maxTotalConnections>100</maxTotalConnections> <shard-socket-timeout>2000</shard-socket-timeout> <shard-connection-timeout>2000</shard-connection-timeout>
配置一個擴充套件後的searchHandlerExt
<requestHandler name="standard2" class="solr.SearchHandlerExt" >
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="timeAllowed">2000</str>
<str name="defType">dismax</str>
<str name="qf">title^2 tag^1</str>
<str name="q.op">AND</str>
<str name="bf">10</str>
<str name="start">0</str>
<str name="rows">10</str>
<str name="debugQuery">on</str>
<str name="q">*:*</str>
<str name="shards">localhost:3456/solr,localhost:8080/solr</str>
<str name="shards.qt">single</str>
<str name="shards.tolerant">true</str>
</lst>
</requestHandler>