1. 程式人生 > >Storm排錯調優之SessionTimeout

Storm排錯調優之SessionTimeout

                                                                              Storm排錯調優之SessionTimeout

 

在Storm的日誌和zk的日誌中均有如下連線超時資訊:

Unable to read additional data from client sessionid 0x364f4b88098081e, likely client has closed socket

zk:

2018-08-15 22:44:36,552 [myid:2] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:[email protected]] - caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x264f4b839520805, likely client has closed socket
	at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
	at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
	at java.lang.Thread.run(Thread.java:748)
2018-08-15 22:44:36,552 [myid:2] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:
[email protected]
] - Closed socket connection for client /xxx.xxx.xxx.xxx:40611 which had sessionid 0x264f4b839520805 2018-08-15 22:44:36,552 [myid:2] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:[email protected]] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x364f4b88098081e, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) at java.lang.Thread.run(Thread.java:748) 2018-08-15 22:44:36,552 [myid:2] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerC
[email protected]
] - Closed socket connection for client /xxx.xxx.xxx.xxx:57330 which had sessionid 0x364f4b88098081e 2018-08-15 22:44:36,552 [myid:2] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:[email protected]] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x264f4b839520805, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) at java.lang.Thread.run(Thread.java:748) 2018-08-15 22:44:36,552 [myid:2] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:[email protected]] - Closed socket connection for client /xxx.xxx.xxx.83:40611 which had sessionid 0x264f4b839520805 2018-08-15 22:44:36,552 [myid:2] - WARN [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:[email protected]] - caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x364f4b88098081e, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203) at java.lang.Thread.run(Thread.java:748) 2018-08-15 22:44:36,552 [myid:2] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:[email protected]] - Closed socket connection for client /xxx.xxx.xxx.83:57330 which had sessionid 0x364f4b88098081e

zk端和客戶端失去session連線,session號為0x364f4b88098081e   連線機器為xxx.xxx.xxx.83

相應的storm的日誌中,也會輸出同樣的unable資訊,Unable to read additional data from server xxxx ...

由此可以看到storm 和 zk之間的session超時。

那如何設定超時時間?  設定超時時間為多大?

首先看zk端的配置:

#設定一個時間單元為8000
tickTime=8000
initLimit=20
syncLimit=10

設定時間單元為8000 ,如果不設定zk的另外兩個引數:

minSessionTimeout

maxSessionTimeout

不設定這兩個引數,預設的session超時時間為2*  tickTime ~ 20 * tickTime ,也就是16000ms --- 160000ms

此時如果客戶端設定的session超時時間不在這個範圍內(<16000ms   或者  >160000ms),則會重置為最大或者最小超時時間。

檢視storm-core-1.1.0.jar下面的org.apache.storm.shade.org.apache.zookeeper.ClientCnxn類

void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException {
            ClientCnxn.this.negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if(ClientCnxn.this.negotiatedSessionTimeout <= 0) {
                ClientCnxn.this.state = States.CLOSED;
                ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.Expired, (String)null));
                ClientCnxn.this.eventThread.queueEventOfDeath();
                throw new ClientCnxn.SessionExpiredException("Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(ClientCnxn.this.sessionId) + " has expired");
            } else {
                if(!ClientCnxn.this.readOnly && isRO) {
                    ClientCnxn.LOG.error("Read/write client got connected to read-only server");
                }

                ClientCnxn.this.readTimeout = ClientCnxn.this.negotiatedSessionTimeout * 2 / 3;
                ClientCnxn.this.connectTimeout = ClientCnxn.this.negotiatedSessionTimeout / ClientCnxn.this.hostProvider.size();
                ClientCnxn.this.hostProvider.onConnected();
                ClientCnxn.this.sessionId = _sessionId;
                ClientCnxn.this.sessionPasswd = _sessionPasswd;
                ClientCnxn.this.state = isRO?States.CONNECTEDREADONLY:States.CONNECTED;
                ClientCnxn.this.seenRwServerBefore |= !isRO;
                ClientCnxn.LOG.info("Session establishment complete on server " + this.clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x" + Long.toHexString(ClientCnxn.this.sessionId) + ", negotiated timeout = " + ClientCnxn.this.negotiatedSessionTimeout + (isRO?" (READ-ONLY mode)":""));
                KeeperState eventState = isRO?KeeperState.ConnectedReadOnly:KeeperState.SyncConnected;
                ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, eventState, (String)null));
            }
        }
public int getSessionTimeout() {
        return this.negotiatedSessionTimeout;
    }

此時已經斷開連線negotiatedSessionTimeout<= 0 所以就會丟擲Unable to reconnect to ZooKeeper service, session 0x 的異常 

類中run方法:

        public void run() {
            this.clientCnxnSocket.introduce(this, ClientCnxn.this.sessionId);
            this.clientCnxnSocket.updateNow();
            this.clientCnxnSocket.updateLastSendAndHeard();
            long lastPingRwServer = System.currentTimeMillis();
            boolean var3 = true;

            while(ClientCnxn.this.state.isAlive()) {
                try {
                    if(!this.clientCnxnSocket.isConnected()) {
                        if(!this.isFirstConnect) {
                            try {
                                Thread.sleep((long)this.r.nextInt(1000));
                            } catch (InterruptedException var11) {
                                ClientCnxn.LOG.warn("Unexpected exception", var11);
                            }
                        }

                        if(ClientCnxn.this.closing || !ClientCnxn.this.state.isAlive()) {
                            break;
                        }

                        this.startConnect();
                        this.clientCnxnSocket.updateLastSendAndHeard();
                    }

                    int to;
                    if(ClientCnxn.this.state.isConnected()) {
                        if(ClientCnxn.this.zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if(ClientCnxn.this.zooKeeperSaslClient.getSaslState() == SaslState.INITIAL) {
                                try {
                                    ClientCnxn.this.zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException var10) {
                                    ClientCnxn.LOG.error("SASL authentication with Zookeeper Quorum member failed: " + var10);
                                    ClientCnxn.this.state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }

                            KeeperState authState = ClientCnxn.this.zooKeeperSaslClient.getKeeperState();
                            if(authState != null) {
                                if(authState == KeeperState.AuthFailed) {
                                    ClientCnxn.this.state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else if(authState == KeeperState.SaslAuthenticated) {
                                    sendAuthEvent = true;
                                }
                            }

                            if(sendAuthEvent) {
                                ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, authState, (String)null));
                            }
                        }

                        to = ClientCnxn.this.readTimeout - this.clientCnxnSocket.getIdleRecv();
                    } else {
                        to = ClientCnxn.this.connectTimeout - this.clientCnxnSocket.getIdleRecv();
                    }

                    if(to <= 0) {
                        throw new ClientCnxn.SessionTimeoutException("Client session timed out, have not heard from server in " + this.clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
                    }

                    if(ClientCnxn.this.state.isConnected()) {
                        int timeToNextPing = ClientCnxn.this.readTimeout / 2 - this.clientCnxnSocket.getIdleSend() - (this.clientCnxnSocket.getIdleSend() > 1000?1000:0);
                        if(timeToNextPing > 0 && this.clientCnxnSocket.getIdleSend() <= 10000) {
                            if(timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        } else {
                            this.sendPing();
                            this.clientCnxnSocket.updateLastSend();
                        }
                    }

                    if(ClientCnxn.this.state == States.CONNECTEDREADONLY) {
                        long now = System.currentTimeMillis();
                        int idlePingRwServer = (int)(now - lastPingRwServer);
                        if(idlePingRwServer >= this.pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            this.pingRwTimeout = Math.min(2 * this.pingRwTimeout, '\uea60');
                            this.pingRwServer();
                        }

                        to = Math.min(to, this.pingRwTimeout - idlePingRwServer);
                    }

                    this.clientCnxnSocket.doTransport(to, ClientCnxn.this.pendingQueue, ClientCnxn.this.outgoingQueue, ClientCnxn.this);
                } catch (Throwable var12) {
                    if(ClientCnxn.this.closing) {
                        if(ClientCnxn.LOG.isDebugEnabled()) {
                            ClientCnxn.LOG.debug("An exception was thrown while closing send thread for session 0x" + Long.toHexString(ClientCnxn.this.getSessionId()) + " : " + var12.getMessage());
                        }
                        break;
                    }

                    if(var12 instanceof ClientCnxn.SessionExpiredException) {
                        ClientCnxn.LOG.info(var12.getMessage() + ", closing socket connection");
                    } else if(var12 instanceof ClientCnxn.SessionTimeoutException) {
                        ClientCnxn.LOG.info(var12.getMessage() + ", closing socket connection and attempting reconnect");
                    } else if(var12 instanceof ClientCnxn.EndOfStreamException) {
                        ClientCnxn.LOG.info(var12.getMessage() + ", closing socket connection and attempting reconnect");
                    } else if(var12 instanceof ClientCnxn.RWServerFoundException) {
                        ClientCnxn.LOG.info(var12.getMessage());
                    } else {
                        ClientCnxn.LOG.warn("Session 0x" + Long.toHexString(ClientCnxn.this.getSessionId()) + " for server " + this.clientCnxnSocket.getRemoteSocketAddress() + ", unexpected error" + ", closing socket connection and attempting reconnect", var12);
                    }

                    this.cleanup();
                    if(ClientCnxn.this.state.isAlive()) {
                        ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.Disconnected, (String)null));
                    }

                    this.clientCnxnSocket.updateNow();
                    this.clientCnxnSocket.updateLastSendAndHeard();
                }
            }

            this.cleanup();
            this.clientCnxnSocket.close();
            if(ClientCnxn.this.state.isAlive()) {
                ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.Disconnected, (String)null));
            }

            ZooTrace.logTraceMessage(ClientCnxn.LOG, ZooTrace.getTextTraceLevel(), "SendThread exitedloop.");
        }

在to<0的情況下就會輸出 Client session timed out, have not heard from server in .....

由此:

storm端的超時時間設定為120000ms 

storm.zookeeper.session.timeout: 120000

storm.zookeeper.connection.timeout: 90000

kafka端的超時時間設定為120000ms

zookeeper.session.timeout.ms=120000
zookeeper.connection.timeout.ms=60000
原始碼中:to = ClientCnxn.this.readTimeout - this.clientCnxnSocket.getIdleRecv();

其中 readTimeout = sessionTimeout * 2 / 3      //也就是我們設定的120000 * 2 / 3  = 80000ms

其中getIdleRecv  //也就是當前時間減去上一次心跳時間。

如果to小於0 就會丟擲異常Client session timed out, have not heard from server in .....

org.apache.storm.shade.org.apache.zookeeper.ClientCnxnSocket

int getIdleRecv() {
        return (int)(this.now - this.lastHeard);
}

 

生效之後會輸出:

               Established session 0x164f4b8390b0b88 with negotiated timeout 120000 for client /xxx.xxx.xxxx.83:44569