基於Ryu的流量採集程式碼實現
import time
import math
import xlwt
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib import hub
FEATURE = {}
class FlowFeatureExaction(RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
def __init__(self, *args, **kwargs):
super(FlowFeatureExaction, self).__init__(*args, **kwargs)
self.name = 'flow_feature_exaction'
self.datapaths = {}
self.src_to_dst = {}
self.flows = []
self.flow_feature = {}
self.monitor_thread = hub.spawn(self._monitor)
@set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER])
def _state_change_handler(self, ev):
global FEATURE
datapath = ev.datapath
dpid = datapath.id
if ev.state == MAIN_DISPATCHER:
if datapath.id not in self.datapaths:
self.logger.debug('register datapath: %016x', datapath.id)
self.datapaths[datapath.id] = datapath
FEATURE.setdefault(dpid, {})
FEATURE[dpid].setdefault('flow_feature', {})
elif ev.state == DEAD_DISPATCHER:
if datapath.id in self.datapaths:
self.logger.debug('unregister datapath: %016x', datapath.id)
del self.datapaths[datapath.id]
FEATURE.pop(dpid)
def _monitor(self):
while True:
for dp in self.datapaths.values():
self.send_flow_stats_request(dp)
self.flows = []
hub.sleep(10)
# self.print_feature()
def send_flow_stats_request(self, datapath):
ofproto = datapath.ofproto
parser = datapath.ofproto_parser
req = parser.OFPFlowStatsRequest(datapath)
datapath.send_msg(req)
@set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER)
def flow_stats_reply_handler(self, ev):
flow_counter = 0
current_time = time.time()
dpid = ev.msg.datapath.id
self.flow_feature.setdefault(dpid, {})
for stat in ev.msg.body:
self.flows.append('time=%s datapath=%d table_id=%s '
'duration_sec=%d duration_nsec=%d '
'priority=%d '
'idle_timeout=%d hard_timeout=%d flags=0x%04x '
'cookie=%d packet_count=%d byte_count=%d '
'match=%s instructions=%s ' %
(time.time(), dpid, stat.table_id,
stat.duration_sec, stat.duration_nsec,
stat.priority,
stat.idle_timeout, stat.hard_timeout, stat.flags,
stat.cookie, stat.packet_count, stat.byte_count,
stat.match, stat.instructions))
# flow_no = stat.match['eth_type'] + stat.match['ipv4_src'] + stat.match['in_port'] + stat.match['ipv4_dst']
flow_no = flow_counter
if (dpid in self.flow_feature) and (flow_no not in self.flow_feature[dpid]):
self.flow_feature[dpid].setdefault(flow_no, {})
self.flow_feature[dpid][flow_no] = {'datapath': 0, 'duration': 0,'priority': 0,
'packet_count': 0, 'byte_count': 0,
'ipv4_src': 0, 'ipv4_dst': 0,
'in_port': 0,'output': 0,
'pkt_rate': 0, 'byte_rate': 0,
'pkt_asym': 0, 'byte_asym': 0,
'growth_flow_interval': 0, 'switch_flow_count': 0,
'mean_bytes_per_pkt': 0, 'time': 0}
time_interval = current_time-self.flow_feature[dpid][flow_no]['time']
pkt_rate = (stat.packet_count-self.flow_feature[dpid][flow_no]['packet_count'])/time_interval
byte_rate = (stat.byte_count-self.flow_feature[dpid][flow_no]['byte_count'])/time_interval
self.flow_feature[dpid][flow_no] = {'datapath': dpid, 'duration': stat.duration_sec, 'priority': stat.priority,
'packet_count': stat.packet_count, 'byte_count': stat.byte_count,
'ipv4_src': 0, 'ipv4_dst': 0,
'in_port': 0,'output': stat.instructions[0].actions[0].port,
'pkt_rate': pkt_rate,'byte_rate': byte_rate,
'pkt_asym': 0, 'byte_asym': 0,
'growth_flow_interval': 0, 'switch_flow_count': 0,
'mean_bytes_per_pkt': 0, 'time': current_time}
for key, value in stat.match.items():
self.flow_feature[dpid][flow_no][key] = value
if pkt_rate != 0:
self.flow_feature[dpid][flow_no]['mean_bytes_per_pkt'] = byte_rate/pkt_rate
if self.flow_feature[dpid][flow_no]['ipv4_src'] != 0:
src = self.flow_feature[dpid][flow_no]['ipv4_src']
dst = self.flow_feature[dpid][flow_no]['ipv4_dst']
self.src_to_dst.setdefault(src, {})
self.src_to_dst[src].setdefault(dst, {})
self.src_to_dst[src][dst] = {'pkt_count': stat.packet_count, 'byte_count': stat.byte_count,}
flow_counter += 1
for flow_no in self.flow_feature[dpid].keys():
self.flow_feature[dpid][flow_no]['growth_flow_interval'] = \
flow_counter - self.flow_feature[dpid][flow_no]['switch_flow_count']
self.flow_feature[dpid][flow_no]['switch_flow_count'] = flow_counter
if 'ipv4_src' in self.flow_feature[dpid][flow_no].keys():
for src in self.src_to_dst.keys():
if src == self.flow_feature[dpid][flow_no]['ipv4_dst']:
for dst in self.src_to_dst[src].keys():
if dst == self.flow_feature[dpid][flow_no]['ipv4_src']:
self.flow_feature[dpid][flow_no]['pkt_asym'] = \
self.src_to_dst[src][dst]['pkt_count']/(1+self.flow_feature[dpid][flow_no]['packet_count'])
self.flow_feature[dpid][flow_no]['bytes_asym'] = \
self.src_to_dst[src][dst]['byte_count'] / (1+self.flow_feature[dpid][flow_no]['byte_count'])
global FEATURE
FEATURE[dpid]['flow_feature'] = self.flow_feature[dpid]
# data = open("switch_flows", mode='w+')
# for flow in self.flows:
# data.write(str(flow) + '\n')
# data.close()
data = open("feature", 'w')
for dpid in FEATURE.keys():
data.write('DatapathId ' + str(dpid) + ':{')
for feature_type in FEATURE[dpid].keys():
data.write('\n '+str(feature_type)+':')
data.write(str(FEATURE[dpid][feature_type]) + '}\n')
data.write('\n')
data.close()
# workbook = xlwt.Workbook(encoding='utf-8')
# worksheet = workbook.add_sheet('Flow Feature')
# worksheet.write()
# workbook.save('flow_feature.xls')
pass