1. 程式人生 > 其它 >python 遠端操作svn

python 遠端操作svn

SVN操作指令碼

安裝模組

pip install pywinrm

指令碼如下


#!/usr/bin/env python3
# coding=utf-8
# author:LJX
# describe:倉庫授權
# createdate:2021.5.27

import winrm
import json


class SVN(object):
    '''
    # 1、檢視winrm服務狀態。預設沒有啟動 winrm enumerate winrm/config/listener
    # 2、啟動伺服器(網路型別改為專有網路) winrm quickconfig
    # 3、開啟防火牆 netsh advfirewall firewall set rule group="Windows 遠端管理" new enable=yes
    # 4、啟動 winrm quickconfig
    # 5、檢視 winrm enumerate winrm/config/listener
    # 5、為winrm service 配置auth winrm set winrm/config/service/auth  @{Basic="true"}
    # 7、為winrm service 配置加密方式為允許非加密 winrm set winrm/config/service  @{AllowUnencrypted="true"}
    '''

    def __init__(self, host, port, user, passwd):
        self.host = host
        self.port = port
        self.user = user
        self.passwd = passwd
        self.winConn = winrm.Session('http://{0}:{1}/wsman'.format(self.host, self.port), auth=(self.user, self.passwd))

    def exec_shell(self, command, msg):
        ret = self.winConn.run_ps(command)
        if ret.status_code == 0:
            print(msg + "成功")
            return ret.std_out.decode()
        elif ret.status_code == 1:
            raise Exception(msg + "失敗:" + str(ret.std_err, "utf-8"))

    def parse_data(self, data, isAll=True):
        info_list = []
        if type(data) is list or isAll:
            if isAll:
                if type(data) is list:
                    for info in data:
                        info_list.append(info["Name"])
                    return info_list
                else:
                    return [data["Name"]]
            else:
                return [data["Name"]]
        elif type(data) is dict:
            return True

    def get_SvnUser(self, name=''):
        '''
        獲取所有使用者
        :return:
        '''
        try:
            data = json.loads(self.exec_shell("Get-SvnLocalUser {0} | ConvertTo-Json".format(name), "查詢使用者"))
            result = self.parse_data(data, 1 - bool(name))
            return result
        except Exception as e:
            print("使用者不存在")
            return []

    def get_SvnGroup(self, name=''):
        '''
        查詢使用者組
        :return: 使用者組列表
        '''
        try:
            data = json.loads(self.exec_shell("""Get-SvnLocalGroup {0}| ConvertTo-Json""".format(name), "查詢使用者組"))
            result = self.parse_data(data, 1 - bool(name))
            return result
        except Exception as e:
            print("使用者組不存在")
            return []

    def get_SvnGroupUser(self, name):
        try:
            data = json.loads(self.exec_shell("Get-SvnLocalGroupMember {0} | ConvertTo-Json".format(name), "使用者組中使用者查詢"))
            result = self.parse_data(data, isAll=True)
            return result
        except Exception as e:
            print("使用者組不存在")
            return []

    def get_Repository(self, name=''):
        '''
        查詢倉庫
        :return:返回倉庫列表
        '''
        try:
            data = json.loads(self.exec_shell("""Get-SvnRepository {0}| ConvertTo-Json""".format(name), "查詢倉庫"))
            result = self.parse_data(data, 1 - bool(name))
            return result
        except Exception as e:
            print("倉庫不存在")
            return []

    def add_SvnUser(self, username, password):
        '''
        新增使用者
        :param username: 使用者名稱
        :param password: 密碼
        :return: 返回true 建立成功 or False 建立失敗
        '''
        try:
            if self.get_SvnUser(name=username) == []:
                data = self.exec_shell(
                    '''New-SvnLocalUser -Name {0} -Password (ConvertTo-SecureString -String "{1}" -AsPlainText -Force)'''.format(
                        username, password), "建立使用者")
            else:
                if input("使用者已存在是否重置密碼?Y/N").lower().split(' ') == "Y":
                    data = self.exec_shell(
                        '''Set-SvnLocalUser -Name {0} -Password (ConvertTo-SecureString -String "{1}" -AsPlainText -Force)'''.format(
                            username, password), "重置密碼")
            return True
        except Exception as e:
            print(e)

    def add_SvnGroup(self, GroupName):
        '''
        :param GroupName:
        :return: True is successful or False is Fail
        '''
        try:
            if self.get_SvnGroup(GroupName) == []:
                data = self.exec_shell("""New-SvnLocalGroup {0}""".format(GroupName), "使用者組新增")
            else:
                print("使用者組已存在")
            return True
        except Exception as e:
            print(e)

    def add_UserGroup(self, GroupName, Username):
        try:
            if self.get_SvnGroup(GroupName) != []:
                if username in self.get_SvnGroupUser(GroupName):
                    print("使用者已存在組中")
                else:
                    self.exec_shell("Add-SvnLocalGroupMember {0} (Get-SvnLocalUser {1})".format(GroupName, Username),
                                    "使用者組新增使用者")
            else:
                print("使用者組不存在")
            return True
        except Exception as e:
            print("新增失敗")

    def add_Repository(self, RepoName):
        '''
        新增倉庫
        :param RepoName:
        :return:
        '''
        try:
            if self.get_Repository(RepoName) == []:
                data = self.exec_shell("""New-SvnRepository {0}""".format(RepoName), "倉庫新增")
            else:
                print("倉庫已存在")
            return True
        except Exception as e:
            print(e)

    def add_GrantUser(self, Repository, username, access):
        '''
        倉庫授權使用者
        :param Repository
        :param username
        :param access: ReadOnly   NoAccess ReadWrite
        :return: True is successful or False is fail
        '''
        command = """Add-SvnAccessRule -AuthorizationProfile SubversionLocal -Repository {0} -Path /  -AccountName {1} -Access {2} -Force""".format(
            Repository, username, access)
        if self.get_user(username):
            print("使用者已存在")
            self.exec_shell(command, "倉庫授權")
        else:
            print("使用者不存在,建立使用者")
            data = self.Add_SvnUser(username)
            if data:
                print("使用者建立成功")
                self.exec_shell(command, "倉庫授權")
            else:
                print("使用者建立失敗")

    def add_GrantGroup(self, Repository, GroupName, access):
        '''
        倉庫授權使用者組
        :param Repository:
        :param GroupName:
        :param access:
        :return: True is successful or False is fail
        '''
        command = '''Add-SvnAccessRule {0} -Path  /  -AccountId "@{1}" -Access {2}'''.format(Repository, GroupName,
                                                                                             access)
        try:
            if self.get_SvnGroup(GroupName):
                self.exec_shell(command, "倉庫授權使用者組")
            else:
                print("使用者不存在,建立使用者")
                data = self.add_SvnGroup(GroupName)
                if data:
                    print("使用者建立成功")
                    self.exec_shell(command, "倉庫授權")
                else:
                    print("使用者建立失敗")
        except Exception as e:
            print("使用者組授權失敗")


def createObj():
    svn = SVN(host="ip", port="port", user="user", passwd="password")
    return svn


if __name__ == "__main__":
    svn = createObj()
    params = {
        "Repository": "xxxx",
        "username": "xxxx",
        # 許可權分別是access: ReadOnly   NoAccess ReadWrite
        "access": "ReadOnly",
    }
    # 授權倉庫使用者許可權
    data = svn.add_GrantUser(Repository=params["Repository"],username=params["username"],access=params["access"])
    # 授權使用者使用者組許可權
    # data = svn.add_GrantGroup(Repository=params["Repository"],GroupName=params["username"],access=params["access"])
    print(data)