【ODL-Openstack學習系列-03】-openflowplugin be版本連線分析
0 準備
- 版本號:0.4.2 be版本 openflowplugin
1、連線入口
//01-連線發起源於初始化 OpenFlowPluginProviderImpl
org/opendaylight/openflowplugin/impl/OpenFlowPluginProviderImpl.java
initialize() --> startSwitchConnections() --> switchConnectionPrv.startup()
-------------------------------------------- -------
//02-初始化伺服器
org/opendaylight/openflowjava/protocol/impl/core/SwitchConnectionProviderImpl.java
public ListenableFuture<Boolean> startup(); ---》
private ServerFacade createAndConfigureServer();
//03-用於訊息解析和通道處理的的TcpChannelInitializer
1.1 核心channelHandler註冊程式碼
此註冊位置,包含了序列化器註冊,連結成功配置、訊息解析處理的多重註冊:
-
ConnectionFacade和ConnectionConductor類建立和互相註冊引用;
-
註冊tls處理器
-
註冊編碼器;
-
註冊解碼器;
-
註冊訊息處理邏輯器;
@Override
protected void initChannel(final SocketChannel ch) {
if (ch.remoteAddress() != null) {
final InetAddress switchAddress = ch.remoteAddress().getAddress ();
final int port = ch.localAddress().getPort();
final int remotePort = ch.remoteAddress().getPort();
LOGGER.debug("Incoming connection from (remote address): {}:{} --> :{}",
switchAddress.toString(), remotePort, port);
if (!getSwitchConnectionHandler().accept(switchAddress)) {
ch.disconnect();
LOGGER.debug("Incoming connection rejected");
return;
}
}
LOGGER.debug("Incoming connection accepted - building pipeline");
allChannels.add(ch);
ConnectionFacade connectionFacade = null;
//---------------------------------------------
//1-ConnectionFacade建立
connectionFacade = connectionAdapterFactory.createConnectionFacade(ch, null, useBarrier());
try {
LOGGER.debug("calling plugin: {}", getSwitchConnectionHandler());
//2-ConnectionConductor初始化
getSwitchConnectionHandler().onSwitchConnected(connectionFacade);
connectionFacade.checkListeners();
ch.pipeline().addLast(PipelineHandlers.IDLE_HANDLER.name(), new IdleHandler(getSwitchIdleTimeout(), TimeUnit.MILLISECONDS));
boolean tlsPresent = false;
// If this channel is configured to support SSL it will only support SSL
if (getTlsConfiguration() != null) {
tlsPresent = true;
final SslContextFactory sslFactory = new SslContextFactory(getTlsConfiguration());
final SSLEngine engine = sslFactory.getServerContext().createSSLEngine();
engine.setNeedClientAuth(true);
engine.setUseClientMode(false);
List<String> suitesList = getTlsConfiguration().getCipherSuites();
if (suitesList != null && !suitesList.isEmpty()) {
LOGGER.debug("Requested Cipher Suites are: {}", suitesList);
String[] suites = suitesList.toArray(new String[suitesList.size()]);
engine.setEnabledCipherSuites(suites);
LOGGER.debug("Cipher suites enabled in SSLEngine are: {}", engine.getEnabledCipherSuites().toString());
}
final SslHandler ssl = new SslHandler(engine);
final Future<Channel> handshakeFuture = ssl.handshakeFuture();
final ConnectionFacade finalConnectionFacade = connectionFacade;
handshakeFuture.addListener(new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(final Future<? super Channel> future) throws Exception {
finalConnectionFacade.fireConnectionReadyNotification();
}
});
ch.pipeline().addLast(PipelineHandlers.SSL_HANDLER.name(), ssl);
}
ch.pipeline().addLast(PipelineHandlers.OF_FRAME_DECODER.name(),
new OFFrameDecoder(connectionFacade, tlsPresent));
ch.pipeline().addLast(PipelineHandlers.OF_VERSION_DETECTOR.name(), new OFVersionDetector());
final OFDecoder ofDecoder = new OFDecoder();
ofDecoder.setDeserializationFactory(getDeserializationFactory());
//註冊編碼器
ch.pipeline().addLast(PipelineHandlers.OF_DECODER.name(), ofDecoder);
final OFEncoder ofEncoder = new OFEncoder();
ofEncoder.setSerializationFactory(getSerializationFactory());
ch.pipeline().addLast(PipelineHandlers.OF_ENCODER.name(), ofEncoder);
//註冊訊息處理器
ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
if (!tlsPresent) {
connectionFacade.fireConnectionReadyNotification();
}
} catch (final Exception e) {
LOGGER.warn("Failed to initialize channel", e);
ch.close();
}
}
2 分析ConnectionFacade和ConnectionConductor
根據上面所示,當有裝置連線時候,通過netty框架呼叫initChannel會根據通道生成一個ConnectionFacade和ConnectionConductor,be2版本的程式碼中沒有藉助專門的map儲存這兩個根據裝置的類,目測是由netty的channelHandler註冊時候引用儲存;
-
ConnectionFacade為ConnectionAdapter介面的繼承介面,實現類為ConnectionAdapterImpl,含有連線通道的基本資訊,包括Channel、InetSocketAddress等,同時又新增瞭如下監聽器成員變數:
- ConnectionReadyListener
- OpenflowProtocolListener
- SystemNotificationsListener
-
ConnectionConductor則為刪除提到的監聽的實現類,此類實現瞭如下介面:
- OpenflowProtocolListener--接收訊息處理類,包含如下工作:
- multipart訊息回覆;
- 錯誤訊息處理;
- 埠狀態訊息處理;
- 心跳訊息回覆;
- Experimenter訊息處理;
- 流表移除訊息處理;
- 握手訊息處理;
- PacketIn訊息處理;
- SystemNotificationsListener--處理裝置事件,包括
- 裝置斷連處理;
- 裝置閒置處理;
- ConnectionConductor--連線引導器,提供基本連線方法;
- 初始化
- 獲取版本
- 獲取連線引導器狀態;
- 斷連裝置;
- 設定回話上下文;
- 獲取ConnectionAdapter
- 設定處理佇列器;
- 設定錯誤處理器;
- ConnectionReadyListener--連線成功監聽器
- 開啟心跳程序;
- HandshakeListener--握手訊息監聽器
- 處理握手成功;--註冊回話
- 處理握手失敗;
- 設定握手上下文;
- NotificationEnqueuer--訂閱訊息排隊;
如上闡述,ConnectionConductor類為實際的連接回話處理、訊息回覆的關鍵類,後續3章分析以下其訊息接收的相關處理流程;
- OpenflowProtocolListener--接收訊息處理類,包含如下工作:
3 訊息接受處理分析
3.1 註冊位置
//01-註冊處理handler
ch.pipeline().addLast(PipelineHandlers.DELEGATING_INBOUND_HANDLER.name(), new DelegatingInboundHandler(connectionFacade));
3.2 呼叫流程
org/opendaylight/openflowjava/protocol/impl/core/DelegatingInboundHandler.java
public void channelRead(final ChannelHandlerContext ctx, final Object msg);
-----------------------------------
public void consume(final DataObject message);
-----------------------------------
protected abstract void consumeDeviceMessage(DataObject message);
----------------------------------
實現類:
org/opendaylight/openflowjava/protocol/impl/core/connection/ConnectionAdapterImpl.java
根據訊息型別,進入各個訊息處理和事件處理的監聽器;
4 openflow連線連線建立流程
整個連線建立流程,可以分析如下所示:
如上流程圖所示,雖然整個連線會話通道的建立過程為非同步,但是仍然符合openflow通道建立的一般規則,按照下面順序完成:
1. 接收hello訊息;
2. 傳送反向hello訊息;
3. 協商版本;
4. 傳送心跳訊息;
5. 建立通道session;
6. 產生node;
幾個關鍵處理如下所示:
握手資訊處理
org/opendaylight/openflowplugin/openflow/md/core/HandshakeManagerImpl.java
監聽session生成node
org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java
5 連線通道介面
SessionManagerOFImpl-->ConjunctSessionManager
獲取類方式:
OFSessionUtil.getSessionManager();