OpenStack Neutron原始碼分析:ovs-neutron-agent啟動原始碼解析
宣告:
本部落格歡迎轉載,但請保留原作者資訊!
作者:華為雲端計算工程師 林凱
團隊:華為杭州研發中心OpenStack社群團隊
本文是在個人學習過程中整理和總結,由於時間和個人能力有限,錯誤之處在所難免,歡迎指正!
OpenStack Neutron,是專注於為OpenStack提供網路服務的專案。對Neutron各個元件的介紹請看這一篇部落格:http://www.openstack.cn/p1745.html。
引用其中對L2 Agent的元件的介紹:L2Agent通常執行在Hypervisor,與neutron-server通過RPC通訊,監聽並通知裝置的變化,建立新的裝置來確保網路segment的正確性,應用security groups規則等。例如,OVS Agent,使用Open vSwitch來實現VLAN, GRE,VxLAN來實現網路的隔離,還包括了網路流量的轉發控制。
本篇部落格將對Neutron中的OVS Agent元件啟動原始碼進行解析。
OVS Agent元件啟動大致流程如下圖所示:
接下來,讓我們真正開始OVS Agent元件啟動原始碼的解析
(1) /neutron/plugins/openvswitch/agent/ovs-neutron-agent.py中的main()
<span style="font-size:14px;">def main(): cfg.CONF.register_opts(ip_lib.OPTS) common_config.init(sys.argv[1:]) common_config.setup_logging(cfg.CONF) q_utils.log_opt_values(LOG) try: agent_config = create_agent_config_map(cfg.CONF) except ValueError as e: LOG.error(_('%s Agent terminated!'), e) sys.exit(1) is_xen_compute_host = 'rootwrap-xen-dom0' in agent_config['root_helper'] if is_xen_compute_host: # Force ip_lib to always use the root helper to ensure that ip # commands target xen dom0 rather than domU. cfg.CONF.set_default('ip_lib_force_root', True) <span style="color:#ff0000;">agent = OVSNeutronAgent(**agent_config) (1)</span> signal.signal(signal.SIGTERM, agent._handle_sigterm) # Start everything. LOG.info(_("Agent initialized successfully, now running... ")) <span style="color:#ff0000;">agent.daemon_loop() (2)</span> </span>
上述程式碼中,最重要的函式是(1)函式和(2)函式,(1)函式主要的工作是例項化一個OVSAgent,並完成OVS Agent的一系列初始化工作,(2)函式一直在迴圈檢查一些狀態,發現狀態發生變化,執行相應的操作。
接下來,首先仔細分析(1)函式中例項化OVS Agent,那麼在例項化這個OVS Agent時,它做了哪些初始化工作。<span style="font-size:14px;">def __init__(self, integ_br, tun_br, local_ip, bridge_mappings, root_helper, polling_interval, tunnel_types=None, veth_mtu=None, l2_population=False, enable_distributed_routing=False, minimize_polling=False, ovsdb_monitor_respawn_interval=( constants.DEFAULT_OVSDBMON_RESPAWN), arp_responder=False, use_veth_interconnection=False): super(OVSNeutronAgent, self).__init__() self.use_veth_interconnection = use_veth_interconnection self.veth_mtu = veth_mtu self.root_helper = root_helper self.available_local_vlans = set(moves.xrange(q_const.MIN_VLAN_TAG, q_const.MAX_VLAN_TAG)) self.tunnel_types = tunnel_types or [] self.l2_pop = l2_population # TODO(ethuleau): Change ARP responder so it's not dependent on the # ML2 l2 population mechanism driver. # enable_distributed_routing是否使能分散式路由 self.enable_distributed_routing = enable_distributed_routing self.arp_responder_enabled = arp_responder and self.l2_pop self.agent_state = { 'binary': 'neutron-openvswitch-agent', 'host': cfg.CONF.host, 'topic': q_const.L2_AGENT_TOPIC, 'configurations': {'bridge_mappings': bridge_mappings, 'tunnel_types': self.tunnel_types, 'tunneling_ip': local_ip, 'l2_population': self.l2_pop, 'arp_responder_enabled': self.arp_responder_enabled, 'enable_distributed_routing': self.enable_distributed_routing}, 'agent_type': q_const.AGENT_TYPE_OVS, 'start_flag': True} # Keep track of int_br's device count for use by _report_state() self.int_br_device_count = 0 self.int_br = ovs_lib.OVSBridge(integ_br, self.root_helper) # setup_integration_br:安裝整合網橋——int_br # 建立patch ports,並移除所有現有的流規則 # 新增基本的流規則 <span style="color:#ff0000;">self.setup_integration_br() (1)</span> # Stores port update notifications for processing in main rpc loop self.updated_ports = set() # setup_rpc完成以下任務: # 設定plugin_rpc,這是用來與neutron-server通訊的 # 設定state_rpc,用於agent狀態資訊上報 # 設定connection,用於接收neutron-server的訊息 # 啟動狀態週期上報 <span style="color:#ff0000;">self.setup_rpc() (2)</span> self.bridge_mappings = bridge_mappings # 建立物理網路網橋,並用veth與br-int連線起來 <span style="color:#ff0000;">self.setup_physical_bridges(self.bridge_mappings) <span style="white-space:pre"> </span>(3)</span> self.local_vlan_map = {} self.tun_br_ofports = {p_const.TYPE_GRE: {}, p_const.TYPE_VXLAN: {}} self.polling_interval = polling_interval self.minimize_polling = minimize_polling self.ovsdb_monitor_respawn_interval = ovsdb_monitor_respawn_interval if tunnel_types: self.enable_tunneling = True else: self.enable_tunneling = False self.local_ip = local_ip self.tunnel_count = 0 self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port self.dont_fragment = cfg.CONF.AGENT.dont_fragment self.tun_br = None self.patch_int_ofport = constants.OFPORT_INVALID self.patch_tun_ofport = constants.OFPORT_INVALID if self.enable_tunneling: # The patch_int_ofport and patch_tun_ofport are updated # here inside the call to setup_tunnel_br self.setup_tunnel_br(tun_br) <span style="color:#ff0000;">self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent( self.context, self.plugin_rpc, self.int_br, self.tun_br, self.patch_int_ofport, self.patch_tun_ofport, cfg.CONF.host, self.enable_tunneling, self.enable_distributed_routing) (4)</span> self.dvr_agent.setup_dvr_flows_on_integ_tun_br() # Collect additional bridges to monitor self.ancillary_brs = self.setup_ancillary_bridges(integ_br, tun_br) # Security group agent support <span style="color:#ff0000;">self.sg_agent = OVSSecurityGroupAgent(self.context, self.plugin_rpc, root_helper) <span style="white-space:pre"> </span>(5)</span> # Initialize iteration counter self.iter_num = 0 <span style="color:#ff0000;">self.run_daemon_loop = True <span style="white-space:pre"> </span>(6)</span> </span>
在建構函式中,有(1)-(6)等函式完成了重要的初始化工作。首先來看(1)函式self.setup_integration_br()中的內容
<span style="font-size:14px;">def setup_integration_br(self):
"""
安裝integration網橋
建立patch ports,並移除所有現有的流規則
新增基本的流規則
"""
# Ensure the integration bridge is created.
# ovs_lib.OVSBridge.create() will run
# ovs-vsctl -- --may-exist add-br BRIDGE_NAME
# which does nothing if bridge already exists.
# 通過執行ovs-vsctl中add-br建立int_br
self.int_br.create()
self.int_br.set_secure_mode()
# del-port刪除patch
self.int_br.delete_port(cfg.CONF.OVS.int_peer_patch_port)
# 通過ovs-ofctl移除所有流規則
self.int_br.remove_all_flows()
# switch all traffic using L2 learning
# 增加actions為normal,優先順序為1的流規則
# 用L2學習來交換所有通訊內容
self.int_br.add_flow(priority=1, actions="normal")
# Add a canary flow to int_br to track OVS restarts
# 新增canary流規則給int_br來跟蹤OVS的重啟 優先順序0級,actions drop
self.int_br.add_flow(table=constants.CANARY_TABLE, priority=0,
actions="drop")
</span>
函式的內容很明顯,就是完成安裝integration網橋br-int,具體操作內容可以參考程式碼中的註釋。br-int建立完成之後,將原有的流規則刪除,並會新增兩條基礎的流規則,我們來看下這兩條流規則的作用是什麼?第一條流規則是優先順序為1、actions為normal的流規則,這個規則是用來將連線到br-int的網路裝置的通訊內容進行轉發給所有其他網路裝置;第二條流規則是優先順序為0、actions為drop的流規則,用來跟蹤OVS的重啟,這個功能在後面迴圈中會分析到。
之後,我們來看第二個函式self.setup_rpc()的具體內容。
<span style="font-size:14px;">def setup_rpc(self):
self.agent_id = 'ovs-agent-%s' % cfg.CONF.host
self.topic = topics.AGENT
# 設定plugin_rpc,用來與neutron-server通訊的
self.plugin_rpc = OVSPluginApi(topics.PLUGIN)
# 設定state_rpc,用於agent狀態資訊上報
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)
# 設定connection,並新增consumers,用於接收neutron-server的訊息
# RPC network init
self.context = context.get_admin_context_without_session()
# Handle updates from service
self.endpoints = [self]
# Define the listening consumers for the agent
consumers = [[topics.PORT, topics.UPDATE],
[topics.NETWORK, topics.DELETE],
[constants.TUNNEL, topics.UPDATE],
[topics.SECURITY_GROUP, topics.UPDATE],
[topics.DVR, topics.UPDATE]]
if self.l2_pop:
consumers.append([topics.L2POPULATION,
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
consumers)
# 啟動心跳週期上報
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
heartbeat.start(interval=report_interval)
</span>
通過程式碼的分析,我們可以看到這個函式中分別設定用來與neutron-server通訊的plugin_rpc,設定了用於agent狀態資訊上報的state_rpc,設定用於接收neutron-server的訊息connection, 並且啟動心跳的週期上報,週期預設為30s。Neutron server端啟動了rpc_listeners,對agent發過來的訊息進行監聽,對於心跳的監聽,是如果接收到心跳訊號,就會對資料庫中的時間戳進行更新,如果一直不更新時間戳,當前時間減去更新的時間戳,如果超過預設的agent_down_time=75s,則認為agent處於down的狀態。
接下來解析(3)函式self.setup_physical_bridges(self.bridge_mappings),具體內容如下:
<span style="font-size:14px;">def setup_physical_bridges(self, bridge_mappings):
'''Setup the physical network bridges.
Creates physical network bridges and links them to the
integration bridge using veths.
:param bridge_mappings: map physical network names to bridge names.
'''
"""
安裝物理網路網橋
建立物理網路網橋,並用veth/patchs與br-int連線起來
"""
self.phys_brs = {}
self.int_ofports = {}
self.phys_ofports = {}
ip_wrapper = ip_lib.IPWrapper(self.root_helper)
ovs_bridges = ovs_lib.get_bridges(self.root_helper)
for physical_network, bridge in bridge_mappings.iteritems():
LOG.info(_("Mapping physical network %(physical_network)s to "
"bridge %(bridge)s"),
{'physical_network': physical_network,
'bridge': bridge})
# setup physical bridge
if bridge not in ovs_bridges:
LOG.error(_("Bridge %(bridge)s for physical network "
"%(physical_network)s does not exist. Agent "
"terminated!"),
{'physical_network': physical_network,
'bridge': bridge})
sys.exit(1)
br = ovs_lib.OVSBridge(bridge, self.root_helper)
br.remove_all_flows()
br.add_flow(priority=1, actions="normal")
self.phys_brs[physical_network] = br
# 使用veth/patchs使br-eth1與br-int互聯
# 刪除原有的patchs,建立int-br-eth1和phy-br-eth1
# 使用ovs-vsctl show
# interconnect physical and integration bridges using veth/patchs
int_if_name = self.get_peer_name(constants.PEER_INTEGRATION_PREFIX,
bridge)
phys_if_name = self.get_peer_name(constants.PEER_PHYSICAL_PREFIX,
bridge)
self.int_br.delete_port(int_if_name)
br.delete_port(phys_if_name)
if self.use_veth_interconnection:
if ip_lib.device_exists(int_if_name, self.root_helper):
ip_lib.IPDevice(int_if_name,
self.root_helper).link.delete()
# Give udev a chance to process its rules here, to avoid
# race conditions between commands launched by udev rules
# and the subsequent call to ip_wrapper.add_veth
utils.execute(['/sbin/udevadm', 'settle', '--timeout=10'])
# 通過ip netns exec 'namespace' ip link add veth命令新增veth
int_veth, phys_veth = ip_wrapper.add_veth(int_if_name,
phys_if_name)
int_ofport = self.int_br.add_port(int_veth)
phys_ofport = br.add_port(phys_veth)
else:
# Create patch ports without associating them in order to block
# untranslated traffic before association
int_ofport = self.int_br.add_patch_port(
int_if_name, constants.NONEXISTENT_PEER)
phys_ofport = br.add_patch_port(
phys_if_name, constants.NONEXISTENT_PEER)
self.int_ofports[physical_network] = int_ofport
self.phys_ofports[physical_network] = phys_ofport
# 封鎖橋樑之間的所有通訊翻譯
# block all untranslated traffic between bridges
self.int_br.add_flow(priority=2, in_port=int_ofport,
actions="drop")
br.add_flow(priority=2, in_port=phys_ofport, actions="drop")
if self.use_veth_interconnection:
# 使能veth傳遞通訊
# enable veth to pass traffic
int_veth.link.set_up()
phys_veth.link.set_up()
if self.veth_mtu:
# set up mtu size for veth interfaces
int_veth.link.set_mtu(self.veth_mtu)
phys_veth.link.set_mtu(self.veth_mtu)
else:
# 關聯patch ports傳遞通訊
# associate patch ports to pass traffic
self.int_br.set_db_attribute('Interface', int_if_name,
'options:peer', phys_if_name)
br.set_db_attribute('Interface', phys_if_name,
'options:peer', int_if_name)
</span>
在setup_physical_bridges這個函式中,完成了物理網橋br-eth*的建立,建立好網橋之後,與安裝br-int一樣,首先刪除了現有的所有流規則,並添加了同樣為normal的流規則,用以轉發訊息,接下來是與br-int不同的地方,根據use_veth_interconnection決定是否使用veth與br-int進行連線,並配置veth或者patch port,然後通過設定drop流規則,封鎖橋之間的通訊,然後使能veth或者patch ports進行通訊。
(4)函式與(5)函式分別是對DVR Agent(分散式路由代理)和Security Group Agent(安全組代理)的初始化工作,用於處理DVR和security group,這部分的內容將在之後的部落格介紹。
最後把run_daemon_loop變數置為True,開始迴圈查詢的工作。當run_daemon_loop變數置為True,main函式呼叫daemon_loop函式,之後呼叫rpc_loop函式,我們來看下rpc_loop函式都完成了哪些工作。<span style="font-size:14px;">def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
# 初始化設定
sync = True
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
# 進入迴圈
while self.run_daemon_loop:
start = time.time()
port_stats = {'regular': {'added': 0,
'updated': 0,
'removed': 0},
'ancillary': {'added': 0,
'removed': 0}}
LOG.debug(_("Agent rpc_loop - iteration:%d started"),
self.iter_num)
if sync:
LOG.info(_("Agent out of sync with plugin!"))
ports.clear()
ancillary_ports.clear()
sync = False
polling_manager.force_polling()
# 根據之前在br-int中設定canary flow的有無判斷是否進行restart操作
ovs_restarted = self.check_ovs_restart()
if ovs_restarted:
......
# Notify the plugin of tunnel IP
if self.enable_tunneling and tunnel_sync:
......
if self._agent_has_updates(polling_manager) or ovs_restarted:
try:
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
updated_ports_copy = self.updated_ports
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
<span style="color:#ff0000;">port_info = self.scan_ports(reg_ports, updated_ports_copy) (1)</span>
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
self.sg_agent.firewall_refresh_needed() or
ovs_restarted):
LOG.debug(_("Starting to process devices in:%s"),
port_info)
# If treat devices fails - must resync with plugin
<span style="color:#ff0000;">sync = self.process_network_ports(port_info,
ovs_restarted) (2)</span>
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
"ports processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
port_stats['regular']['added'] = (
len(port_info.get('added', [])))
port_stats['regular']['updated'] = (
len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
ports = port_info['current']
# Treat ancillary devices if they exist
if self.ancillary_brs:
port_info = self.update_ancillary_ports(
ancillary_ports)
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
"ancillary port info retrieved. "
"Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
if port_info:
rc = self.process_ancillary_network_ports(
port_info)
LOG.debug(_("Agent rpc_loop - iteration:"
"%(iter_num)d - ancillary ports "
"processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
ancillary_ports = port_info['current']
port_stats['ancillary']['added'] = (
len(port_info.get('added', [])))
port_stats['ancillary']['removed'] = (
len(port_info.get('removed', [])))
sync = sync | rc
polling_manager.polling_completed()
except Exception:
LOG.exception(_("Error while processing VIF ports"))
# Put the ports back in self.updated_port
self.updated_ports |= updated_ports_copy
sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d "
"completed. Processed ports statistics: "
"%(port_stats)s. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'port_stats': port_stats,
'elapsed': elapsed})
if (elapsed < self.polling_interval):
time.sleep(self.polling_interval - elapsed)
else:
LOG.debug(_("Loop iteration exceeded interval "
"(%(polling_interval)s vs. %(elapsed)s)!"),
{'polling_interval': self.polling_interval,
'elapsed': elapsed})
self.iter_num = self.iter_num + 1
</span>
rpc_loop做的工作很明顯就是進行迴圈地查詢一些狀態,根據這些狀態,進行相應的操作,其中最重要的工作就是掃描資料庫中的ports資訊,然後對這些資訊進行處理,所以我們來看(1)函式,看下它是怎麼提取這些ports資訊
<span style="font-size:14px;">def scan_ports(self, registered_ports, updated_ports=None):
# 通過ovs-vsctl命令獲取資料庫中port設定資訊
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
port_info = {'current': cur_ports}
if updated_ports is None:
updated_ports = set()
# 獲取已經註冊的port的更新資訊
updated_ports.update(self.check_changed_vlans(registered_ports))
if updated_ports:
# Some updated ports might have been removed in the
# meanwhile, and therefore should not be processed.
# In this case the updated port won't be found among
# current ports.
updated_ports &= cur_ports
# 更新updated_ports的數量
if updated_ports:
port_info['updated'] = updated_ports
# FIXME(salv-orlando): It's not really necessary to return early
# if nothing has changed.
if cur_ports == registered_ports:
# No added or removed ports to set, just return here
return port_info
# 更新added_ports的數量
port_info['added'] = cur_ports - registered_ports
# Remove all the known ports not found on the integration bridge
# 更新removed_ports的數量,移除所有沒有在br-int上發現的已知ports
port_info['removed'] = registered_ports - cur_ports
return port_info
</span>
獲取到port_info之後就要根據這些資訊,對port進行真正的操作,真正的操作就在(2)函式process_network_ports中進行。<span style="font-size:14px;">def process_network_ports(self, port_info, ovs_restarted):
resync_a = False
resync_b = False
# 取出更新和新增的prot資訊
devices_added_updated = (port_info.get('added', set()) |
port_info.get('updated', set()))
if devices_added_updated:
start = time.time()
try:
# treat_devices_added_or_updated根據是否已經存在這個port分別進行新增和更新的操作
# 新增:skipped_devices.append(device)進行新增之後,將做與update一樣的操作
# 更新:通過treat_vif_port將port新增並且繫結到net_uuid/lsw_id並且 為沒有繫結的通訊設定流規則
skipped_devices = self.treat_devices_added_or_updated(
devices_added_updated, ovs_restarted)
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Time elapsed: %(elapsed).3f"),
{'iter_num': self.iter_num,
'num_skipped': len(skipped_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
# Update the list of current ports storing only those which
# have been actually processed.
port_info['current'] = (port_info['current'] -
set(skipped_devices))
except DeviceListRetrievalError:
# Need to resync as there was an error with server
# communication.
LOG.exception(_("process_network_ports - iteration:%d - "
"failure while retrieving port details "
"from server"), self.iter_num)
resync_a = True
if 'removed' in port_info:
start = time.time()
# 完成移除port的功能,通過傳送RPC命令給Neutron server完成
resync_b = self.treat_devices_removed(port_info['removed'])
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
"treat_devices_removed completed in %(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
</span>
從程式碼的解釋可以看到,process_network_ports完成了port的新增,刪除和更新的操作。之後迴圈檢測是否已經到了迴圈間隔,如果還沒有到間隔時間就sleep到那個時間,然後繼續迴圈工作。
至此,我們也就完成OVS Agent的啟動原始碼解析。