1. 程式人生 > >python saltstack api

python saltstack api

# -*- coding:utf-8 -*-
import sys
import json
import pycurl
from io import BytesIO

class PyCurl(object):
    def __init__(self, url, **kwargs):
        # 傳入url地址
        self.url = url
        # 取出header相關資訊
        self.header = kwargs.get("header", None)
        # 建立一個curl物件
        self.curl = pycurl.Curl()
        # setopt 來設定一些請求選項
        # 指定請求的URL
        self.curl.setopt(self.curl.URL, self.url)
        # 設定代理瀏覽器
        self.curl.setopt(self.curl.HEADER, False)
        # 設定請求方式
        self.curl.setopt(self.curl.POST, True)
        # 設定https方式
        self.curl.setopt(pycurl.SSL_VERIFYPEER, 0)
        self.curl.setopt(pycurl.SSL_VERIFYHOST, 0)
        # 判斷header是否存在
        if self.header:
            # 設定模擬瀏覽器
            self.curl.setopt(self.curl.HTTPHEADER, self.header)

    def request(self, data=None, timeout=None):
        # 判斷物件型別 是否為 str
        if isinstance(data, str):
            # 將資料提交
            self.curl.setopt(pycurl.POSTFIELDS, data)
        header_buf = BytesIO()
        body_buf = BytesIO()
        # 強制獲取新的連線,即替代快取中的連線
        self.curl.setopt(self.curl.FRESH_CONNECT, True)
        # 完成互動後強制斷開連線,不重用
        self.curl.setopt(self.curl.FORBID_REUSE, True)
        if str(timeout).isdigit() and timeout > 0:
            # 設定timeout超時時間
            self.curl.setopt(self.curl.TIMEOUT, timeout)
        # 將返回的HTTP HEADER定向到回撥函式header_buf
        self.curl.setopt(self.curl.HEADERFUNCTION, header_buf.write)
        # 將返回的內容定向到回撥函式body_buf
        self.curl.setopt(self.curl.WRITEFUNCTION, body_buf.write)
        try:
            # 伺服器返回資訊
            self.curl.perform()
        except pycurl.error:
            return False
        # 狀態碼
        http_code = self.curl.getinfo(self.curl.HTTP_CODE)
        # 關閉連線
        self.curl.close()
        # 返回狀態碼 header body
        return {"http_code": http_code, "header": header_buf.getvalue(), "body": body_buf.getvalue(), "url": self.url}

class SaltApi(object):

    def __init__(self, **kwargs):

        # 設定超時時間
        self.timeout = kwargs.get("timeout", 300)
        # 設定頭資訊
        self.header = kwargs.get("header", ["Content-Type:application/json"])
        # 獲取url
        self.__url = "https://192.168.44.20:8888"

        # 獲取
        self.__username = "saltapi"
        self.__password = "password"

    # token id 獲取
    def token_id(self):
        obj = {'eauth': 'pam', 'username': self.__username, 'password': self.__password}
        result = self.post(prefix="/login/", **obj)
        if result:
            try:
                self.__token_id = result['return'][0]['token']
            except KeyError:
                raise KeyError
        print('##',self.__token_id,'##')
        return self.__token_id

    def post(self, prefix="/", token=None, **data):

        # url拼接
        url = self.__url + prefix
        print (data)
        # 例項化
        self.header.append(str(token))
        curl = PyCurl(url, header=self.header)
        # 發起請求
        result = curl.request(data=json.dumps(data), timeout=self.timeout)

        # 判斷值
        if not result:
            return result
        # 判斷狀態碼是否等於200
        if result["http_code"] != 200:
            self.response = "response code %s".format(result["info"]["http_code"])
            return self.response
        result = json.loads(result["body"].decode())

        # 判斷是否有error
        if "error" in result and result["error"]:
            self.response = "%s(%s)" % (result["error"]["data"], result["error"]["code"])
            return self.response
        # 返回正確的資料
        return result

    def all_key(self):
        '''
        獲取所有的minion_key
        '''
        token = 'X-Auth-Token:%s' % self.token_id()
        obj = {'client': 'wheel', 'fun': 'key.list_all'}
        content = self.post(token=token, **obj)
        # 取出認證已經通過的
        minions = content['return'][0]['data']['return']['minions']
        # print('已認證',minions)
        # 取出未通過認證的
        minions_pre = content['return'][0]['data']['return']['minions_pre']
        # print('未認證',minions_pre)
        return minions, minions_pre

    def accept_key(self, node_name):
        '''
        如果你想認證某個主機 那麼呼叫此方法
        '''
        token = 'X-Auth-Token:%s' % self.token_id()
        obj = {'client': 'wheel', 'fun': 'key.accept', 'match': node_name}
        content = self.post(token=token, **obj)
        print (content)
        ret = content['return'][0]['data']['success']
        return ret

    # 刪除認證方法
    def delete_key(self, node_name):
        obj = {'client': 'wheel', 'fun': 'key.delete', 'match': node_name}
        token = 'X-Auth-Token:%s' % self.token_id()
        content = self.post(token=token, **obj)

        ret = content['return'][0]['data']['success']
        return ret

    '''
    tgt_type --
    The type of tgt. Allowed values:
        glob - Bash glob completion - Default
        pcre - Perl style regular expression
        list - Python list of hosts
        grain - Match based on a grain comparison
        grain_pcre - Grain comparison with a regex
        pillar - Pillar data comparison
        pillar_pcre - Pillar data comparison with a regex
        nodegroup - Match on nodegroup
        range - Use a Range server for matching
        compound - Pass a compound match string
        ipcidr - Match based on Subnet (CIDR notation) or IPv4 address.

    Changed in version 2017.7.0: Renamed from expr_form to tgt_type

    可以針對各種型別進行執行響應的命令 

    sapi = SaltApi()
    ret = sapi.host_remote_func(tgt="ba*",fun='test.ping',tgt_type='pcre')
    print('%s\n%s'%('pcre',ret))
    ret = sapi.host_remote_func(tgt="bamboo,Min-Centos1",fun='test.ping',tgt_type='list')
    print('%s\n%s' % ('list', ret))
    ret = sapi.host_remote_func(tgt="*",fun='test.ping')
    print('%s\n%s' % ('glob', ret))
    ret = sapi.host_remote_func(tgt="os:CentOS",fun='test.ping',tgt_type='grain')
    print('%s\n%s' % ('grain', ret))
    ret = sapi.host_remote_func(tgt="
[email protected]
or [email protected]:Ubuntu",fun='test.ping',tgt_type='compound') print('%s\n%s' % ('compound', ret)) ret = sapi.host_remote_func(tgt="192.168.44.0/24",fun='test.ping',tgt_type='ipcidr') ''' # 針對主機遠端執行模組 # def host_remote_func(self, tgt, fun,expr_from='glob'): def host_remote_func(self, tgt, fun, tgt_type='glob'): ''' tgt是主機 fun是模組 寫上模組名 返回 可以用來呼叫基本的資產 例如 curl -k https://ip地址:8080/ \ > -H "Accept: application/x-yaml" \ > -H "X-Auth-Token:b50e90485615309de0d83132cece2906f6193e43" \ > -d client='local' \ > -d tgt='*' \ > -d fun='test.ping' 要執行的模組 return: - iZ28r91y66hZ: true node2.minion: true ''' obj = {'client': 'local', 'tgt': tgt, 'fun': fun, 'tgt_type': tgt_type} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) ret = content['return'][0] return ret def group_remote_func(self, tgt, fun): obj = {'client': 'local', 'tgt': tgt, 'fun': fun, 'expr_form': 'nodegroup'} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) print (content) ret = content['return'][0] return ret def host_remote_execution_module(self, tgt, fun, arg, tgt_type='glob'): '執行fun 傳入傳入引數arg ' obj = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg, 'tgt_type': tgt_type} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) ret = content['return'][0] return ret # print(salt_aa.host_remote_execution_module('*', 'cmd.run', 'ifconfig')) # 基於分組來執行 def group_remote_execution_module(self, tgt, fun, arg): ''' 根據分組來執行 tgt = ''' obj = {'client': 'local', 'tgt': tgt, 'fun': fun, 'arg': arg, 'expr_form': 'nodegroup'} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) jid = content['return'][0] return jid def host_sls(self, tgt, arg, tgt_type='glob'): '''主機進行sls''' obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'tgt_type': tgt_type} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) return content def group_sls(self, tgt, arg): ''' 分組進行sls ''' obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup'} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) jid = content['return'][0]['jid'] return jid def host_sls_async(self, tgt, arg, tgt_type='glob'): '''主機非同步sls ''' obj = {'client': 'local_async', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'tgt_type': tgt_type} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) jid = content['return'][0]['jid'] return jid def group_sls_async(self, tgt, arg): '''分組非同步sls ''' obj = {'client': 'local_async', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup'} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) jid = content['return'][0]['jid'] return jid def server_group_pillar(self, tgt, arg, **kwargs): '''分組進行sls and pillar''' obj = {'client': 'local', 'tgt': tgt, 'fun': 'state.sls', 'arg': arg, 'expr_form': 'nodegroup', 'kwarg': kwargs} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) jid = content['return'][0] print (jid) def server_hosts_pillar(self, tgt, arg, **kwargs): '''針對主機執行sls and pillar ''' obj = {"client": "local", "tgt": tgt, "fun": "state.sls", "arg": arg, "kwarg": kwargs} token = 'X-Auth-Token:%s' % self.token_id() content = self.post(token=token, **obj) jid = content['return'][0] print content return jid def jobs_all_list(self): '''列印所有jid快取''' token = 'X-Auth-Token:%s' % self.token_id() obj = {"client": "runner", "fun": "jobs.list_jobs"} content = self.post(token=token, **obj) print (content) def jobs_jid_status(self, jid): '''檢視jid執行狀態''' token = 'X-Auth-Token:%s' % self.token_id() obj = {"client": "runner", "fun": "jobs.lookup_jid", "jid": jid} content = self.post(token=token, **obj) print (content) return content # if __name__ == '__main__': # sa = SaltApi() # print (sa.host_remote_func("bamboo", 'test.ping', 'list'))