dubbo心跳機制 (3)
阿新 • • 發佈:2018-11-19
此文已由作者趙計剛授權網易雲社群釋出。
歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。
二、consumer端心跳機制
//建立ExchangeClient,對第一次服務發現providers路徑下的相關url建立長連線 -->getClients(URL url) -->getSharedClient(URL url) -->ExchangeClient exchangeClient = initClient(url) -->Exchangers.connect(url, requestHandler) -->HeaderExchanger.connect(URL url, ExchangeHandler handler) -->new DecodeHandler(new HeaderExchangeHandler(handler))) -->Transporters.connect(URL url, ChannelHandler... handlers) -->NettyTransporter.connect(URL url, ChannelHandler listener) -->new NettyClient(url, listener) -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler))) -->getChannelCodec(url)//獲取Codec2,這裡是DubboCountCodec例項 -->doOpen()//開啟netty客戶端 -->doConnect()//連線服務端,建立長連線 -->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient例項,needHeartbeat:true -->startHeatbeatTimer()//啟動心跳計數器
客戶端在initClient(url)中設定了heartbeat引數(預設為60s,使用者自己設定的方式見“一”中所講),如下:
1 /** 2 * Create new connection 3 */ 4 private ExchangeClient initClient(URL url) { 5 ... 6 // enable heartbeat by default 7 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); 8 9 ... 10 11 ExchangeClient client; 12 try { 13 // connection should be lazy 14 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { 15 client = new LazyConnectExchangeClient(url, requestHandler);16 } else { 17 client = Exchangers.connect(url, requestHandler); 18 } 19 } catch (RemotingException e) { 20 throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); 21 } 22 return client; 23 }
與provider類似,來看一下最後開啟心跳檢測的地方。
1 public class HeaderExchangeClient implements ExchangeClient { 2 private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); 3 private final Client client; 4 private final ExchangeChannel channel; 5 // heartbeat timer 6 private ScheduledFuture<?> heartbeatTimer; 7 // heartbeat(ms), default value is 0 , won't execute a heartbeat. 8 private int heartbeat; 9 private int heartbeatTimeout; 10 11 public HeaderExchangeClient(Client client, boolean needHeartbeat) { 12 if (client == null) { 13 throw new IllegalArgumentException("client == null"); 14 } 15 this.client = client; 16 this.channel = new HeaderExchangeChannel(client); 17 String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); 18 this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); 19 this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); 20 if (heartbeatTimeout < heartbeat * 2) { 21 throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); 22 } 23 if (needHeartbeat) { 24 startHeatbeatTimer(); 25 } 26 } 27 28 private void startHeatbeatTimer() { 29 stopHeartbeatTimer(); 30 if (heartbeat > 0) { 31 heartbeatTimer = scheduled.scheduleWithFixedDelay( 32 new HeartBeatTask(new HeartBeatTask.ChannelProvider() { 33 public Collection<Channel> getChannels() { 34 return Collections.<Channel>singletonList(HeaderExchangeClient.this); 35 } 36 }, heartbeat, heartbeatTimeout), 37 heartbeat, heartbeat, TimeUnit.MILLISECONDS); 38 } 39 } 40 41 private void stopHeartbeatTimer() { 42 if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) { 43 try { 44 heartbeatTimer.cancel(true); 45 scheduled.purge(); 46 } catch (Throwable e) { 47 if (logger.isWarnEnabled()) { 48 logger.warn(e.getMessage(), e); 49 } 50 } 51 } 52 heartbeatTimer = null; 53 } 54 }
主要看一下startHeartbeatTimer()方法,與provider相同,只是provider是獲取NettyServer的所有的NettyChannel,而consumer只是獲取當前的物件。
consumer的handler處理鏈與provider完全相同。
最後來看一下consumer的重連機制:AbstractClient#reconnect
1 public void reconnect() throws RemotingException { 2 disconnect(); 3 connect(); 4 } 5 6 public void disconnect() { 7 connectLock.lock(); 8 try { 9 destroyConnectStatusCheckCommand(); 10 try { 11 Channel channel = getChannel(); 12 if (channel != null) { 13 channel.close(); 14 } 15 } catch (Throwable e) { 16 logger.warn(e.getMessage(), e); 17 } 18 try { 19 doDisConnect(); 20 } catch (Throwable e) { 21 logger.warn(e.getMessage(), e); 22 } 23 } finally { 24 connectLock.unlock(); 25 } 26 } 27 28 protected void connect() throws RemotingException { 29 connectLock.lock(); 30 try { 31 if (isConnected()) { 32 return; 33 } 34 initConnectStatusCheckCommand(); 35 doConnect(); 36 if (!isConnected()) { 37 throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " 38 + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() 39 + ", cause: Connect wait timeout: " + getTimeout() + "ms."); 40 } else { 41 if (logger.isInfoEnabled()) { 42 logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " 43 + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() 44 + ", channel is " + this.getChannel()); 45 } 46 } 47 reconnect_count.set(0); 48 reconnect_error_log_flag.set(false); 49 } catch (RemotingException e) { 50 throw e; 51 } catch (Throwable e) { 52 throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " 53 + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() 54 + ", cause: " + e.getMessage(), e); 55 } finally { 56 connectLock.unlock(); 57 } 58 }
程式碼比較簡單,先斷連,再連線。
更多網易技術、產品、運營經驗分享請點選。
相關文章:
【推薦】 BRVAH(讓RecyclerView變得更高效)(1)