1. 程式人生 > >RabbitMQ原始碼分析之Connection

RabbitMQ原始碼分析之Connection

版本宣告

  1. com.rabbitmq:amqp-client:4.3.0
  2. RabbitMQ版本宣告: 3.6.15

Connection

  1. 這裡的連線其實就表示的是TCP/IP socket連線,一個Connection有多個Channel,意味Connection會被設計為長連線,並且每個Channel一定有一個唯一標識,客戶端請求和服務端響應都會攜帶這個標識,以區分是哪個通道。

連線過程分析

  1. client開啟與伺服器的TCP/IP連線併發送一個協議頭(protocol header).這只是client傳送的資料,而不是作為方法格式的資料.
  2. server使用其協議版本和其它屬性,包括它支援安全機制列表(Start方法)進行響應.
  3. client選擇一種安全機制(Start-Ok).
  4. server開始認證過程, 它使用SASL的質詢-響應模型(challenge-response model). 它向客戶端傳送一個質詢(Secure).
  5. client向server傳送一個認證響應(Secure-Ok). 例如,對於使用"plain"機制,響應會包含登入使用者名稱和密碼. server 重複質詢(Secure) 或轉到協商,傳送一系列引數,如最大幀大小(Tune).
  6. client接受或降低這些引數(Tune-Ok).
  7. client 正式開啟連線並選擇一個虛擬主機(Open).
  8. 伺服器確認虛擬主機是一個有效的選擇 (Open-Ok).
  9. 客戶端現在使用希望的連線.
  10. 一個節點(client 或 server) 結束連線(Close).
  11. 另一個節點對連線結束握手(Close-Ok).
  12. server 和 client關閉它們的套接字連線.
  13. 連線流轉圖

連線過程抓包分析

  1. 整個連線過程,我們從抓包結果來分析,只關注AMQP協議即可

  2. 第一步客戶端傳送Protocol-Hearder

  3. 服務端響應Connection.start

  4. 客戶端傳送Connection.Start-Ok

  5. 服務端傳送Connection.Tune

  6. 客戶端傳送Connection.Tune-Ok

  7. 客戶端傳送Connection.Open

  8. 服務端傳送Connection.Open-Ok

原始碼分析

  1. uml圖
  2. 建立連線時序圖

ConnectionFactory

  1. 核心方法newConnection()

    
        public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
                throws IOException, TimeoutException {
                if(this.metricsCollector == null) {
                    //這是一個空的收集,沒有任何操作
                    this.metricsCollector = new NoOpMetricsCollector();
                }
                // make sure we respect the provided thread factory
                //根據引數建立FrameHandlerFactory,BIO和NIO
                FrameHandlerFactory fhFactory = createFrameHandlerFactory();
                //此executor是用於消費者消費使用的(consumerWorkServiceExecutor)
                ConnectionParams params = params(executor);
                // set client-provided via a client property
                if (clientProvidedName != null) {
                    Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
                    properties.put("connection_name", clientProvidedName);
                    params.setClientProperties(properties);
                }
                //預設自動恢復連線
                if (isAutomaticRecoveryEnabled()) {
                    // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
                    AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
        
                    conn.init();
                    return conn;
                } else {
                    List<Address> addrs = addressResolver.getAddresses();
                    Exception lastException = null;
                    for (Address addr : addrs) {
                        try {
                            FrameHandler handler = fhFactory.create(addr);
                            AMQConnection conn = createConnection(params, handler, metricsCollector);
                            conn.start();
                            this.metricsCollector.newConnection(conn);
                            return conn;
                        } catch (IOException e) {
                            lastException = e;
                        } catch (TimeoutException te) {
                            lastException = te;
                        }
                    }
                    if (lastException != null) {
                        if (lastException instanceof IOException) {
                            throw (IOException) lastException;
                        } else if (lastException instanceof TimeoutException) {
                            throw (TimeoutException) lastException;
                        }
                    }
                    throw new IOException("failed to connect");
                }
            }
       
    
  2. createFrameHandlerFactory, FrameHandlerFactory

      protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException {
               if(nio) {
                   if(this.frameHandlerFactory == null) {
                       if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) {
                           this.nioParams.setThreadFactory(getThreadFactory());
                       }
                       this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContext);
                   }
                   return this.frameHandlerFactory;
               } else {
                   return new SocketFrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor);
               }
       
           }    
    
    

AMQConnection分析

  1. 起來啟動的核心程式碼

         /**
             * Start up the connection, including the MainLoop thread.
             * Sends the protocol
             * version negotiation header, and runs through
             * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
             * calls Connection.Open and waits for the OpenOk. Sets heart-beat
             * and frame max values after tuning has taken place.
             * @throws IOException if an error is encountered
             * either before, or during, protocol negotiation;
             * sub-classes {@link ProtocolVersionMismatchException} and
             * {@link PossibleAuthenticationFailureException} will be thrown in the
             * corresponding circumstances. {@link AuthenticationFailureException}
             * will be thrown if the broker closes the connection with ACCESS_REFUSED.
             * If an exception is thrown, connection resources allocated can all be
             * garbage collected when the connection object is no longer referenced.
             *
             *
             * client收到Connection.Tune方法後,必須要開始傳送心跳,
             * 並在收到Connection.Open後,必須要開始監控.server在收到Connection.Tune-Ok後,
             * 需要開始傳送和監控心跳.
             */
            public void start()
                    throws IOException, TimeoutException {
                //建立Consumer服務
                initializeConsumerWorkService();
                //建立長連線心跳
                initializeHeartbeatSender();
                //判斷主迴圈是否在執行中
                this._running = true;
                // Make sure that the first thing we do is to send the header,
                // which should cause any socket errors to show up for us, rather
                // than risking them pop out in the MainLoop
        
                /**
                 * 先發送header,確保socket是否會發生錯誤,比在MainLoop(主事件迴圈)去確保要好。
                 */
                AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
                    new AMQChannel.SimpleBlockingRpcContinuation();
                // We enqueue an RPC continuation here without sending an RPC
                // request, since the protocol specifies that after sending
                // the version negotiation header, the client (connection
                // initiator) is to wait for a connection.start method to
                // arrive.
                _channel0.enqueueRpc(connStartBlocker);
                try {
                    // The following two lines are akin to AMQChannel's
                    // transmit() method for this pseudo-RPC.
                    _frameHandler.setTimeout(handshakeTimeout);
                    //傳送一個協議頭開始新的連線,格式為'AMQP0091'
                    _frameHandler.sendHeader();
                } catch (IOException ioe) {
                    _frameHandler.close();
                    throw ioe;
                }
                /**
                 * 此處就是啟動MainLoop(原始碼是 connection.startMainLoop())
                 * 把連線啟動放入到framehandler的  initialize()方法中,這樣的設計是否合理??
                 */
        
                this._frameHandler.initialize(this);
        
                AMQP.Connection.Start connStart;
                AMQP.Connection.Tune connTune = null;
                try {
                    connStart =
                            (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();
        
                    _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());
        
                    Version serverVersion =
                            new Version(connStart.getVersionMajor(),
                                               connStart.getVersionMinor());
        
                    if (!Version.checkVersion(clientVersion, serverVersion)) {
                        throw new ProtocolVersionMismatchException(clientVersion,
                                                                          serverVersion);
                    }
                    //mechanisms(機制),返回的資料形如:AMQPLAIN PLAIN
                    String[] mechanisms = connStart.getMechanisms().toString().split(" ");
                    SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
                    if (sm == null) {
                        throw new IOException("No compatible authentication mechanism found - " +
                                                      "server offered [" + connStart.getMechanisms() + "]");
                    }
        
                    LongString challenge = null;
                    LongString response = sm.handleChallenge(null, this.username, this.password);
        
                    do {
                        //構建Start-OK(認證機制)
                        Method method = (challenge == null)
                                                ? new AMQP.Connection.StartOk.Builder()
                                                          .clientProperties(_clientProperties)
                                                          .mechanism(sm.getName())
                                                          .response(response)
                                                          .build()
                                                : new AMQP.Connection.SecureOk.Builder().response(response).build();
        
                        try {
                            Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
                            if (serverResponse instanceof AMQP.Connection.Tune) {
                                connTune = (AMQP.Connection.Tune) serverResponse;
                            } else {
                                challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                                response = sm.handleChallenge(challenge, this.username, this.password);
                            }
                        } catch (ShutdownSignalException e) {
                            Method shutdownMethod = e.getReason();
                            if (shutdownMethod instanceof AMQP.Connection.Close) {
                                AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
                                if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                                    throw new AuthenticationFailureException(shutdownClose.getReplyText());
                                }
                            }
                            throw new PossibleAuthenticationFailureException(e);
                        }
                    } while (connTune == null);
                } catch (TimeoutException te) {
                    _frameHandler.close();
                    throw te;
                } catch (ShutdownSignalException sse) {
                    _frameHandler.close();
                    throw AMQChannel.wrap(sse);
                } catch(IOException ioe) {
                    _frameHandler.close();
                    throw ioe;
                }
        
                try {
                    //協商通道最大編號,協商規則如下
                    // (clientValue == 0 || serverValue == 0) ?Math.max(clientValue, serverValue) :Math.min(clientValue, serverValue);
                    int channelMax =
                        negotiateChannelMax(this.requestedChannelMax,
                                            connTune.getChannelMax());
                    //建立通道管理器
                    _channelManager = instantiateChannelManager(channelMax, threadFactory);
        
                    //協商Frame的最大長度
                    int frameMax =
                        negotiatedMaxValue(this.requestedFrameMax,
                                           connTune.getFrameMax());
                    this._frameMax = frameMax;
        
                    //協商出心跳時間
                    int heartbeat =
                        negotiatedMaxValue(this.requestedHeartbeat,
                                           connTune.getHeartbeat());
        
                    //傳送心跳
                    setHeartbeat(heartbeat);
        
                    //傳送TuneOk
                    _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                                        .channelMax(channelMax)
                                        .frameMax(frameMax)
                                        .heartbeat(heartbeat)
                                      .build());
                    //傳送Open開啟連線
                    _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                              .virtualHost(_virtualHost)
                                            .build());
                } catch (IOException ioe) {
                    _heartbeatSender.shutdown();
                    _frameHandler.close();
                    throw ioe;
                } catch (ShutdownSignalException sse) {
                    _heartbeatSender.shutdown();
                    _frameHandler.close();
                    throw AMQChannel.wrap(sse);
                }
        
                // We can now respond to errors having finished tailoring the connection
                this._inConnectionNegotiation = false;
        }
        
    

AutorecoveringConnection 分析

  1. TODO 待續…