1. 程式人生 > >Python3.5 調用Ansible 執行命令

Python3.5 調用Ansible 執行命令

tuple med sin load nor gis sources usr 分享圖片

ansible.py
技術分享圖片
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import os
import tempfile
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import
Play from ansible.executor.playbook_executor import PlaybookExecutor from ansible.executor.task_queue_manager import TaskQueueManager from ansible.plugins.callback import CallbackBase class AnsibleHost: def __init__(self, host, port=None, connection=None, ssh_user=None, ssh_pass=None): self.host
= host self.port = port self.ansible_connection = connection self.ansible_ssh_user = ssh_user self.ansible_ssh_pass = ssh_pass def __str__(self): result = ansible_ssh_host= + str(self.host) if self.port: result += ansible_ssh_port=
+ str(self.port) if self.ansible_connection: result += ansible_connection= + str(self.ansible_connection) if self.ansible_ssh_user: result += ansible_ssh_user= + str(self.ansible_ssh_user) if self.ansible_ssh_pass: result += ansible_ssh_pass= + str(self.ansible_ssh_pass) return result class AnsibleTaskResultCallback(CallbackBase): def __init__(self, display=None, option=None): super().__init__(display, option) self.result = None self.error_msg = None def v2_runner_on_ok(self, result): res = getattr(result, _result) self.result = res self.error_msg = res.get(stderr) def v2_runner_on_failed(self, result, ignore_errors=None): if ignore_errors: return res = getattr(result, _result) self.error_msg = res.get(stderr, ‘‘) + res.get(msg) def runner_on_unreachable(self, host, result): if result.get(unreachable): self.error_msg = host + : + result.get(msg, ‘‘) def v2_runner_item_on_failed(self, result): res = getattr(result, _result) self.error_msg = res.get(stderr, ‘‘) + res.get(msg) class AnsibleTask: def __init__(self, hosts, extra_vars=None): self.hosts = hosts self._validate() self.hosts_file = None self._generate_hosts_file() Options = namedtuple(Options, [connection, module_path, forks, become, become_method, become_user, check, diff, host_key_checking, listhosts, listtasks, listtags, syntax]) self.options = Options(connection=ssh, module_path=None, forks=10, become=None, become_method=None, become_user=None, check=False, diff=False, host_key_checking=False, listhosts=None, listtasks=None, listtags=None, syntax=None) self.loader = DataLoader() self.passwords = dict(vault_pass=secret) self.inventory = InventoryManager(loader=self.loader, sources=[self.hosts_file]) self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory) if extra_vars: self.variable_manager.extra_vars = extra_vars def _generate_hosts_file(self): self.hosts_file = tempfile.mktemp() with open(self.hosts_file, w+, encoding=utf-8) as file: hosts = [] i_temp = 0 for host in self.hosts: hosts.append(server + str(i_temp) + + str(host)) i_temp += 1 file.write(\n.join(hosts)) def _validate(self): if not self.hosts: raise Exception(hosts不能為空) if not isinstance(self.hosts, list): raise Exception(hosts只能為list<AnsibleHost>數組) for host in self.hosts: if not isinstance(host, AnsibleHost): raise Exception(host類型必須為AnsibleHost) def exec_shell(self, command): source = {hosts: all, gather_facts: no, tasks: [ {action: {module: shell, args: command}, register: shell_out}]} play = Play().load(source, variable_manager=self.variable_manager, loader=self.loader) results_callback = AnsibleTaskResultCallback() tqm = None try: tqm = TaskQueueManager( inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords, stdout_callback=results_callback ) tqm.run(play) if results_callback.error_msg: raise Exception(results_callback.error_msg) return results_callback.result except: raise finally: if tqm is not None: tqm.cleanup() def exec_playbook(self, playbooks): results_callback = AnsibleTaskResultCallback() playbook = PlaybookExecutor(playbooks=playbooks, inventory=self.inventory, variable_manager=self.variable_manager, loader=self.loader, options=self.options, passwords=self.passwords) setattr(getattr(playbook, _tqm), _stdout_callback, results_callback) playbook.run() if results_callback.error_msg: raise Exception(results_callback.error_msg) return results_callback.result def __del__(self): if self.hosts_file: os.remove(self.hosts_file)
View Code

test.py
技術分享圖片
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from ansible import AnsibleTask, AnsibleHost

if __name__ == "__main__":
    task = AnsibleTask([AnsibleHost(127.0.0.1, 22, ssh, root, password)])
    task.exec_playbook([/install.yml, init.yml])
    task.exec_shell(echo "abc"))
View Code

Python3.5 調用Ansible 執行命令