1. 程式人生 > 實用技巧 >SDN實驗---Ryu的應用開發(六)網路拓撲時延探測

SDN實驗---Ryu的應用開發(六)網路拓撲時延探測

一:預備知識

SDN實驗---Ryu的應用開發(五)網路拓撲發現

Ryu原始碼之模組功能分析

Ryu原始碼之拓撲發現原理分析

二:實驗原理

網路時延探測應用利用了Ryu自帶的Switches模組的資料,獲取到了LLDP資料傳送時的時間戳,然後和收到的時間戳進行相減,得到了LLDP資料包從控制器下發到交換機A,然後從交換機A到交換機B,再上報給控制器的時延T1,示例見圖1的藍色箭頭。

同理反向的時延T2由綠色的箭頭組成。

此外,控制器到交換機的往返時延由一個藍色箭頭和一個綠色箭頭組成,此部分時延由echo報文測試,分別為Ta,Tb。最後鏈路的前向後向平均時延T=(T1+T2-Ta-Tb)/2。

三:時延探測程式碼實現

(一)拓撲發現模組

from ryu.base import app_manager

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER #只是表示datapath資料路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from
ryu.lib.packet import packet,ethernet from ryu.topology import event,switches from ryu.topology.api import get_switch,get_link,get_host import threading,time,random DELAY_MONITOR_PERIOD = 5 LOCK = threading.RLock() class TopoDetect(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] def __init__(self,
*args,**kwargs): super(TopoDetect,self).__init__(*args,**kwargs) self.topology_api_app = self self.name = "topology" self.link_list = None self.switch_list = None self.host_list = None self.dpid2id = {} self.id2dpid = {} self.dpid2switch = {} self.ip2host = {} self.ip2switch = {} self.net_size = 0 self.net_topo = [] self.net_flag = False self.net_arrived = 0 self.monitor_thread = hub.spawn(self._monitor) def _monitor(self): """ 協程實現偽併發,探測拓撲狀態 """ while True: #print("------------------_monitor") self._host_add_handler(None) #主機單獨提取處理 self.get_topology(None) if self.net_flag: try: self.show_topology() except Exception as err: print("Please use cmd: pingall to detect topology and wait a moment") hub.sleep(DELAY_MONITOR_PERIOD) #5秒一次 @set_ev_cls(ofp_event.EventOFPSwitchFeatures,CONFIG_DISPATCHER) def switch_feature_handle(self,ev): """ datapath中有配置訊息到達 """ LOCK.acquire() self.net_arrived += 1 #表示有1個交換機到達 LOCK.release() print("------XXXXXXXXXXX------%d------XXXXXXXXXXX------------switch_feature_handle"%self.net_arrived) msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser match = ofp_parser.OFPMatch() actions = [ofp_parser.OFPActionOutput(ofproto.OFPP_CONTROLLER,ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath=datapath,priority=0,match=match,actions=actions,extra_info="config infomation arrived!!") def add_flow(self,datapath,priority,match,actions,idle_timeout=0,hard_timeout=0,extra_info=None): #print("------------------add_flow:") if extra_info != None: print(extra_info) ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser inst = [ofp_parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS,actions)] mod = ofp_parser.OFPFlowMod(datapath=datapath,priority=priority, idle_timeout=idle_timeout, hard_timeout=hard_timeout, match=match,instructions=inst) datapath.send_msg(mod); @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER) def packet_in_handler(self,ev): #print("------------------packet_in_handler") msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto ofp_parser = datapath.ofproto_parser dpid = datapath.id in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) eth_pkt = pkt.get_protocol(ethernet.ethernet) dst = eth_pkt.dst src = eth_pkt.src #self.logger.info("------------------Controller %s get packet, Mac address from: %s send to: %s , send from datapath: %s,in port is: %s" # ,dpid,src,dst,dpid,in_port) #self.get_topology(None) @set_ev_cls([event.EventHostAdd]) def _host_add_handler(self,ev): #主機資訊單獨處理,不屬於網路拓撲 self.host_list = get_host(self.topology_api_app) #3.需要使用pingall,主機通過與邊緣交換機連線,才能告訴控制器 #獲取主機資訊字典ip2host{ipv4:host object} ip2switch{ipv4:dpid} for i,host in enumerate(self.host_list): self.ip2switch["%s"%host.ipv4] = host.port.dpid self.ip2host["%s"%host.ipv4] = host events = [event.EventSwitchEnter, event.EventSwitchLeave, event.EventSwitchReconnected, event.EventPortAdd, event.EventPortDelete, event.EventPortModify, event.EventLinkAdd, event.EventLinkDelete] @set_ev_cls(events) def get_topology(self,ev): if not self.net_arrived: return #print("------+++++++++++------%d------+++++++++++------------get_topology"%self.net_arrived) LOCK.acquire() self.net_arrived -= 1 if self.net_arrived < 0: self.net_arrived = 0 LOCK.release() self.net_flag = False self.net_topo = [] print("-----------------get_topology") #獲取所有的交換機、鏈路 self.switch_list = get_switch(self.topology_api_app) #1.只要交換機與控制器聯通,就可以獲取 self.link_list = get_link(self.topology_api_app) #2.在ryu啟動時,加上--observe-links即可用於拓撲發現 #獲取交換機字典id2dpid{id:dpid} dpid2switch{dpid:switch object} for i,switch in enumerate(self.switch_list): self.id2dpid[i] = switch.dp.id self.dpid2id[switch.dp.id] = i self.dpid2switch[switch.dp.id] = switch #根據鏈路資訊,開始獲取拓撲資訊 self.net_size = len(self.id2dpid) #表示網路中交換機個數 for i in range(self.net_size): self.net_topo.append([0]*self.net_size) for link in self.link_list: src_dpid = link.src.dpid src_port = link.src.port_no dst_dpid = link.dst.dpid dst_port = link.dst.port_no try: sid = self.dpid2id[src_dpid] did = self.dpid2id[dst_dpid] except KeyError as e: print("--------------Error:get KeyError with link infomation(%s)"%e) return self.net_topo[sid][did] = [src_port,1] #注意:這裡1表示存在鏈路,後面可以修改為時延 self.net_topo[did][sid] = [dst_port,1] #注意:修改為列表,不要用元組,元組無法修改,我們後面要修改時延 self.net_flag = True #表示網路拓撲建立成功 def show_topology(self): print("-----------------show_topology") print("----------switch network----------") line_info = " " for i in range(self.net_size): line_info+=" s%-5d "%self.id2dpid[i] print(line_info) for i in range(self.net_size): line_info = "s%d "%self.id2dpid[i] for j in range(self.net_size): if self.net_topo[i][j] == 0: line_info+="%-22d"%0 else: line_info+="(%d,%.12f) "%tuple(self.net_topo[i][j]) print(line_info) print("----------host 2 switch----------") for key,val in self.ip2switch.items(): print("%s---s%d"%(key,val))
View Code

(二)模組匯入

from ryu.base import app_manager
from ryu.base.app_manager import lookup_service_brick

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath資料路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology.switches import Switches
from ryu.topology.switches import LLDPPacket

import time

(三)資料結構

ECHO_REQUEST_INTERVAL = 0.05
DELAY_DETECTING_PERIOD = 5

class DelayDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(DelayDetect,self).__init__(*args,**kwargs)
        self.name = "delay"

        self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick載入模組例項時,對於我們自己定義的app,我們需要在類中定義self.name。
        self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模組DelayDetect時,必須同時啟動自定義的模組!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links

        self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會新增進去
        self.dpid2echoDelay = {} #記錄echo時延

        self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這裡單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay}

        self.detector_thread = hub.spawn(self._detector)

(四)協程獲取鏈路時延

    def _detector(self):
        """
        協程實現偽併發,探測鏈路時延
        """
        while True:
            if self.topology == None:
                self.topology = lookup_service_brick("topology")
            if self.topology.net_flag:
                print("------------------_detector------------------")
                self._send_echo_request()
                self.get_link_delay()
                if self.topology.net_flag:
                    try:
                        self.show_delay()
                    except Exception as err:
                        print("------------------Detect delay failure!!!------------------")
            hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次

(五)獲取Echo時延

    def _send_echo_request(self):
        """
        發生echo報文到datapath
        """
        for datapath in self.dpid2switch.values():
            parser = datapath.ofproto_parser
            echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間

            datapath.send_msg(echo_req)

            #重要!不要同時傳送echo請求,因為它幾乎同時會生成大量echo回覆。
            #在echo_reply_處理程式中處理echo reply時,會產生大量佇列等待延遲。
            hub.sleep(ECHO_REQUEST_INTERVAL)

    @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER])
    def echo_reply_handler(self,ev):
        """
        處理echo響應報文,獲取控制器到交換機的鏈路往返時延

              Controller
                  |    
     echo latency |  
                 `|‘ 
                   Switch        
        """
        now_timestamp = time.time()
        try:
            echo_delay = now_timestamp - eval(ev.msg.data)
            self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay
        except:
            return

(六)獲取LLDP時延

    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延
        """
                      Controller
                    |        /|\    
                   \|/         |
                Switch----->Switch
        """
        msg = ev.msg
        try:
            src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的埠)
            dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的訊息是由第二個(目的)交換機上傳過來的
            dst_inport = msg.match['in_port']
            if self.switches is None:
                self.switches = lookup_service_brick("switches") #獲取交換機模組例項

            #獲得key(Port類例項)和data(PortData類例項)
            for port in self.switches.ports.keys(): #開始獲取對應交換機埠的傳送時間戳
                if src_dpid == port.dpid and src_outport == port.port_no: #匹配key
                    port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData例項,內部儲存了傳送LLDP報文時的timestamp資訊
                    timestamp = port_data.timestamp
                    if timestamp:
                        delay = time.time() - timestamp
                        self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay)
        except:
            return

    def _save_delay_data(self,src,dst,src_port,lldpdealy):
        key = "%s-%s-%s"%(src,src_port,dst)
        self.src_sport_dst2Delay[key] = lldpdealy

(七)根據LLDP和Echo時延,更新網路拓撲圖中的權值資訊

    def get_link_delay(self):
        """
        更新圖中的權值資訊
        """
        print("--------------get_link_delay-----------")
        for src_sport_dst in self.src_sport_dst2Delay.keys():
                src,sport,dst = tuple(map(eval,src_sport_dst.split("-")))
                if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys():
                    sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst]
                    if self.topology.net_topo[sid][did] != 0:
                        if self.topology.net_topo[sid][did][0] == sport:
                            s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2;
                            if s_d_delay < 0: #注意:可能出現單向計算時延導致最後小於0,這是不允許的。則不進行更新,使用上一次原始值
                                continue
                            self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2

(八)顯示網路拓撲圖和Echo、LLDP時延資訊

    @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if not datapath.id in self.dpid2switch:
                self.logger.debug('Register datapath: %016x', datapath.id)
                self.dpid2switch[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.dpid2switch:
                self.logger.debug('Unregister datapath: %016x', datapath.id)
                del self.dpid2switch[datapath.id]

        if self.topology == None:
            self.topology = lookup_service_brick("topology")
        print("-----------------------_state_change_handler-----------------------")
        print(self.topology.show_topology())
        print(self.switches)

    def show_delay(self):
        print("-----------------------show echo delay-----------------------")
        for key,val in self.dpid2echoDelay.items():
            print("s%d----%.12f"%(key,val))
        print("-----------------------show LLDP delay-----------------------")
        for key,val in self.src_sport_dst2Delay.items():
            print("%s----%.12f"%(key,val))

(九)全部程式碼

from ryu.base import app_manager
from ryu.base.app_manager import lookup_service_brick

from ryu.ofproto import ofproto_v1_3

from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,CONFIG_DISPATCHER,DEAD_DISPATCHER,HANDSHAKE_DISPATCHER #只是表示datapath資料路徑的狀態
from ryu.controller.handler import set_ev_cls

from ryu.lib import hub
from ryu.lib.packet import packet,ethernet

from ryu.topology.switches import Switches
from ryu.topology.switches import LLDPPacket

import time

ECHO_REQUEST_INTERVAL = 0.05
DELAY_DETECTING_PERIOD = 5

class DelayDetect(app_manager.RyuApp):
    OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]

    def __init__(self,*args,**kwargs):
        super(DelayDetect,self).__init__(*args,**kwargs)
        self.name = "delay"

        self.topology = lookup_service_brick("topology") #注意:我們使用lookup_service_brick載入模組例項時,對於我們自己定義的app,我們需要在類中定義self.name。
        self.switches = lookup_service_brick("switches") #此外,最重要的是:我們啟動本模組DelayDetect時,必須同時啟動自定義的模組!!! 比如:ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links

        self.dpid2switch = {} #或者直接為{},也可以。下面_state_change_handler也會新增進去
        self.dpid2echoDelay = {}

        self.src_sport_dst2Delay = {} #記錄LLDP報文測量的時延。實際上可以直接更新,這裡單獨記錄,為了單獨展示 {”src_dpid-srt_port-dst_dpid“:delay}

        self.detector_thread = hub.spawn(self._detector)

    def _detector(self):
        """
        協程實現偽併發,探測鏈路時延
        """
        while True:
            if self.topology == None:
                self.topology = lookup_service_brick("topology")
            if self.topology.net_flag:
                print("------------------_detector------------------")
                self._send_echo_request()
                self.get_link_delay()
                if self.topology.net_flag:
                    try:
                        self.show_delay()
                    except Exception as err:
                        print("------------------Detect delay failure!!!------------------")
            hub.sleep(DELAY_DETECTING_PERIOD) #5秒一次

    def get_link_delay(self):
        """
        更新圖中的權值資訊
        """
        print("--------------get_link_delay-----------")
        for src_sport_dst in self.src_sport_dst2Delay.keys():
                src,sport,dst = tuple(map(eval,src_sport_dst.split("-")))
                if src in self.dpid2echoDelay.keys() and dst in self.dpid2echoDelay.keys():
                    sid,did = self.topology.dpid2id[src],self.topology.dpid2id[dst]
                    if self.topology.net_topo[sid][did] != 0:
                        if self.topology.net_topo[sid][did][0] == sport:
                            s_d_delay = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2;
                            if s_d_delay < 0: #注意:可能出現單向計算時延導致最後小於0,這是不允許的。則不進行更新,使用上一次原始值
                                continue
                            self.topology.net_topo[sid][did][1] = self.src_sport_dst2Delay[src_sport_dst]-(self.dpid2echoDelay[src]+self.dpid2echoDelay[dst])/2


    def _send_echo_request(self):
        """
        發生echo報文到datapath
        """
        for datapath in self.dpid2switch.values():
            parser = datapath.ofproto_parser
            echo_req = parser.OFPEchoRequest(datapath,data=bytes("%.12f"%time.time(),encoding="utf8")) #獲取當前時間

            datapath.send_msg(echo_req)

            #重要!不要同時傳送echo請求,因為它幾乎同時會生成大量echo回覆。
            #在echo_reply_處理程式中處理echo reply時,會產生大量佇列等待延遲。
            hub.sleep(ECHO_REQUEST_INTERVAL)

    @set_ev_cls(ofp_event.EventOFPEchoReply,[MAIN_DISPATCHER,CONFIG_DISPATCHER,HANDSHAKE_DISPATCHER])
    def echo_reply_handler(self,ev):
        """
        處理echo響應報文,獲取控制器到交換機的鏈路往返時延

              Controller
                  |    
     echo latency |  
                 `|‘ 
                   Switch        
        """
        now_timestamp = time.time()
        try:
            echo_delay = now_timestamp - eval(ev.msg.data)
            self.dpid2echoDelay[ev.msg.datapath.id] = echo_delay
        except:
            return


    @set_ev_cls(ofp_event.EventOFPPacketIn,MAIN_DISPATCHER)
    def packet_in_handler(self,ev): #處理到達的LLDP報文,從而獲得LLDP時延
        """
                      Controller
                    |        /|\    
                   \|/         |
                Switch----->Switch
        """
        msg = ev.msg
        try:
            src_dpid,src_outport = LLDPPacket.lldp_parse(msg.data) #獲取兩個相鄰交換機的源交換機dpid和port_no(與目的交換機相連的埠)
            dst_dpid = msg.datapath.id #獲取目的交換機(第二個),因為來到控制器的訊息是由第二個(目的)交換機上傳過來的
            dst_inport = msg.match['in_port']
            if self.switches is None:
                self.switches = lookup_service_brick("switches") #獲取交換機模組例項

            #獲得key(Port類例項)和data(PortData類例項)
            for port in self.switches.ports.keys(): #開始獲取對應交換機埠的傳送時間戳
                if src_dpid == port.dpid and src_outport == port.port_no: #匹配key
                    port_data = self.switches.ports[port] #獲取滿足key條件的values值PortData例項,內部儲存了傳送LLDP報文時的timestamp資訊
                    timestamp = port_data.timestamp
                    if timestamp:
                        delay = time.time() - timestamp
                        self._save_delay_data(src=src_dpid,dst=dst_dpid,src_port=src_outport,lldpdealy=delay)
        except:
            return

    def _save_delay_data(self,src,dst,src_port,lldpdealy):
        key = "%s-%s-%s"%(src,src_port,dst)
        self.src_sport_dst2Delay[key] = lldpdealy

    @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER, DEAD_DISPATCHER])
    def _state_change_handler(self, ev):
        datapath = ev.datapath
        if ev.state == MAIN_DISPATCHER:
            if not datapath.id in self.dpid2switch:
                self.logger.debug('Register datapath: %016x', datapath.id)
                self.dpid2switch[datapath.id] = datapath
        elif ev.state == DEAD_DISPATCHER:
            if datapath.id in self.dpid2switch:
                self.logger.debug('Unregister datapath: %016x', datapath.id)
                del self.dpid2switch[datapath.id]

        if self.topology == None:
            self.topology = lookup_service_brick("topology")
        print("-----------------------_state_change_handler-----------------------")
        print(self.topology.show_topology())
        print(self.switches)

    def show_delay(self):
        print("-----------------------show echo delay-----------------------")
        for key,val in self.dpid2echoDelay.items():
            print("s%d----%.12f"%(key,val))
        print("-----------------------show LLDP delay-----------------------")
        for key,val in self.src_sport_dst2Delay.items():
            print("%s----%.12f"%(key,val))
View Code

四:實驗測試

(一)啟動Ryu

ryu-manager ./TopoDetect.py ./DelayDetect.py --verbose --observe-links

(二)啟動mininet

sudo mn --topo=linear,4 --switch=ovsk --controller=remote --link=tc

注意:需要在mininet中使用pingall,才能使得交換機獲得host存在,從而使得控制器獲取host訊息!!

(三)結果顯示