python 遠端操作svn
阿新 • • 發佈:2021-09-28
SVN操作指令碼
安裝模組
pip install pywinrm
指令碼如下
#!/usr/bin/env python3 # coding=utf-8 # author:LJX # describe:倉庫授權 # createdate:2021.5.27 import winrm import json class SVN(object): ''' # 1、檢視winrm服務狀態。預設沒有啟動 winrm enumerate winrm/config/listener # 2、啟動伺服器(網路型別改為專有網路) winrm quickconfig # 3、開啟防火牆 netsh advfirewall firewall set rule group="Windows 遠端管理" new enable=yes # 4、啟動 winrm quickconfig # 5、檢視 winrm enumerate winrm/config/listener # 5、為winrm service 配置auth winrm set winrm/config/service/auth @{Basic="true"} # 7、為winrm service 配置加密方式為允許非加密 winrm set winrm/config/service @{AllowUnencrypted="true"} ''' def __init__(self, host, port, user, passwd): self.host = host self.port = port self.user = user self.passwd = passwd self.winConn = winrm.Session('http://{0}:{1}/wsman'.format(self.host, self.port), auth=(self.user, self.passwd)) def exec_shell(self, command, msg): ret = self.winConn.run_ps(command) if ret.status_code == 0: print(msg + "成功") return ret.std_out.decode() elif ret.status_code == 1: raise Exception(msg + "失敗:" + str(ret.std_err, "utf-8")) def parse_data(self, data, isAll=True): info_list = [] if type(data) is list or isAll: if isAll: if type(data) is list: for info in data: info_list.append(info["Name"]) return info_list else: return [data["Name"]] else: return [data["Name"]] elif type(data) is dict: return True def get_SvnUser(self, name=''): ''' 獲取所有使用者 :return: ''' try: data = json.loads(self.exec_shell("Get-SvnLocalUser {0} | ConvertTo-Json".format(name), "查詢使用者")) result = self.parse_data(data, 1 - bool(name)) return result except Exception as e: print("使用者不存在") return [] def get_SvnGroup(self, name=''): ''' 查詢使用者組 :return: 使用者組列表 ''' try: data = json.loads(self.exec_shell("""Get-SvnLocalGroup {0}| ConvertTo-Json""".format(name), "查詢使用者組")) result = self.parse_data(data, 1 - bool(name)) return result except Exception as e: print("使用者組不存在") return [] def get_SvnGroupUser(self, name): try: data = json.loads(self.exec_shell("Get-SvnLocalGroupMember {0} | ConvertTo-Json".format(name), "使用者組中使用者查詢")) result = self.parse_data(data, isAll=True) return result except Exception as e: print("使用者組不存在") return [] def get_Repository(self, name=''): ''' 查詢倉庫 :return:返回倉庫列表 ''' try: data = json.loads(self.exec_shell("""Get-SvnRepository {0}| ConvertTo-Json""".format(name), "查詢倉庫")) result = self.parse_data(data, 1 - bool(name)) return result except Exception as e: print("倉庫不存在") return [] def add_SvnUser(self, username, password): ''' 新增使用者 :param username: 使用者名稱 :param password: 密碼 :return: 返回true 建立成功 or False 建立失敗 ''' try: if self.get_SvnUser(name=username) == []: data = self.exec_shell( '''New-SvnLocalUser -Name {0} -Password (ConvertTo-SecureString -String "{1}" -AsPlainText -Force)'''.format( username, password), "建立使用者") else: if input("使用者已存在是否重置密碼?Y/N").lower().split(' ') == "Y": data = self.exec_shell( '''Set-SvnLocalUser -Name {0} -Password (ConvertTo-SecureString -String "{1}" -AsPlainText -Force)'''.format( username, password), "重置密碼") return True except Exception as e: print(e) def add_SvnGroup(self, GroupName): ''' :param GroupName: :return: True is successful or False is Fail ''' try: if self.get_SvnGroup(GroupName) == []: data = self.exec_shell("""New-SvnLocalGroup {0}""".format(GroupName), "使用者組新增") else: print("使用者組已存在") return True except Exception as e: print(e) def add_UserGroup(self, GroupName, Username): try: if self.get_SvnGroup(GroupName) != []: if username in self.get_SvnGroupUser(GroupName): print("使用者已存在組中") else: self.exec_shell("Add-SvnLocalGroupMember {0} (Get-SvnLocalUser {1})".format(GroupName, Username), "使用者組新增使用者") else: print("使用者組不存在") return True except Exception as e: print("新增失敗") def add_Repository(self, RepoName): ''' 新增倉庫 :param RepoName: :return: ''' try: if self.get_Repository(RepoName) == []: data = self.exec_shell("""New-SvnRepository {0}""".format(RepoName), "倉庫新增") else: print("倉庫已存在") return True except Exception as e: print(e) def add_GrantUser(self, Repository, username, access): ''' 倉庫授權使用者 :param Repository :param username :param access: ReadOnly NoAccess ReadWrite :return: True is successful or False is fail ''' command = """Add-SvnAccessRule -AuthorizationProfile SubversionLocal -Repository {0} -Path / -AccountName {1} -Access {2} -Force""".format( Repository, username, access) if self.get_user(username): print("使用者已存在") self.exec_shell(command, "倉庫授權") else: print("使用者不存在,建立使用者") data = self.Add_SvnUser(username) if data: print("使用者建立成功") self.exec_shell(command, "倉庫授權") else: print("使用者建立失敗") def add_GrantGroup(self, Repository, GroupName, access): ''' 倉庫授權使用者組 :param Repository: :param GroupName: :param access: :return: True is successful or False is fail ''' command = '''Add-SvnAccessRule {0} -Path / -AccountId "@{1}" -Access {2}'''.format(Repository, GroupName, access) try: if self.get_SvnGroup(GroupName): self.exec_shell(command, "倉庫授權使用者組") else: print("使用者不存在,建立使用者") data = self.add_SvnGroup(GroupName) if data: print("使用者建立成功") self.exec_shell(command, "倉庫授權") else: print("使用者建立失敗") except Exception as e: print("使用者組授權失敗") def createObj(): svn = SVN(host="ip", port="port", user="user", passwd="password") return svn if __name__ == "__main__": svn = createObj() params = { "Repository": "xxxx", "username": "xxxx", # 許可權分別是access: ReadOnly NoAccess ReadWrite "access": "ReadOnly", } # 授權倉庫使用者許可權 data = svn.add_GrantUser(Repository=params["Repository"],username=params["username"],access=params["access"]) # 授權使用者使用者組許可權 # data = svn.add_GrantGroup(Repository=params["Repository"],GroupName=params["username"],access=params["access"]) print(data)