阿里雲Python-SDK管理安全組和RDS例項IP白名單
阿新 • • 發佈:2019-01-27
- 安裝SDK
#pip install aliyun-python-sdk-rds
#pip install aliyun-python-sdk-ecs
如果安裝報錯可能需要安裝python-dev
#rpm -ivh python-dev
- 程式碼示例:
#!/usr/bin/env python
# coding=utf-8
from aliyunsdkcore import client
from aliyunsdkecs.request.v20140526 import DescribeSecurityGroupAttributeRequest
from aliyunsdkecs.request.v20140526 import AuthorizeSecurityGroupRequest
from aliyunsdkecs.request.v20140526 import AuthorizeSecurityGroupEgressRequest
from aliyunsdkecs.request.v20140526 import RevokeSecurityGroupRequest
from aliyunsdkecs.request.v20140526 import RevokeSecurityGroupEgressRequest
from aliyunsdkecs.request.v20140526 import DescribeSecurityGroupsRequest
from aliyunsdkrds.request.v20140815 import ModifySecurityIpsRequest
class Alis():
def __init__(self, AccessKey, AccessSecret, RegionId):
self.AccessKey = AccessKey
self.AccessSecret = AccessSecret
self.RegionId = RegionId
def client(self):
c = client.AcsClient(self.AccessKey, self.AccessSecret, self.RegionId)
return c
def describeSecurityGroupsRequest(self):
request = DescribeSecurityGroupsRequest.DescribeSecurityGroupsRequest()
request.set_accept_format('json')
return request
def describeSecurityGroupAttributeRequest(self,SecurityGroupId,NicType='internet',Direction='ingress'):
"""查詢安全組規則
"""
request = DescribeSecurityGroupAttributeRequest.DescribeSecurityGroupAttributeRequest()
request.set_SecurityGroupId(SecurityGroupId)
request.set_accept_format('json')
request.set_NicType(NicType)
request.set_Direction(Direction)
return request
def authorizeSecurityGroupRequest(self, SecurityGroupId, IpProtocol, Direction, PortRange, SourceCidrIp, DestCidrIp, Priority=1):
"""授權安全組規則
"""
if Direction == 'ingress':
request = AuthorizeSecurityGroupRequest.AuthorizeSecurityGroupRequest()
elif Direction == 'egress':
request = AuthorizeSecurityGroupEgressRequest.AuthorizeSecurityGroupEgressRequest()
else:
raise NameError("The specified parameter 'Direction' is not valid.")
request.set_SecurityGroupId(SecurityGroupId)
request.set_IpProtocol(IpProtocol)
request.set_PortRange(PortRange)
if SourceCidrIp:
request.set_SourceCidrIp(SourceCidrIp)
if DestCidrIp:
request.set_DestCidrIp(DestCidrIp)
request.set_Priority(Priority)
request.set_Description("內部IP訪問")
request.set_Policy('accept')
request.set_accept_format('json')
return request
def revokeSecurityGroupRequest(self, SecurityGroupId, IpProtocol, Direction, PortRange, SourceCidrIp, DestCidrIp, Priority=1):
"""刪除安全組規則
"""
if Direction == 'ingress':
request = RevokeSecurityGroupRequest.RevokeSecurityGroupRequest()
elif Direction == 'egress':
request = RevokeSecurityGroupEgressRequest.RevokeSecurityGroupEgressRequest()
else:
raise NameError("The specified parameter 'Direction' is not valid.")
request.set_SecurityGroupId(SecurityGroupId)
request.set_IpProtocol(IpProtocol)
request.set_PortRange(PortRange)
if SourceCidrIp:
request.set_SourceCidrIp(SourceCidrIp)
if DestCidrIp:
request.set_DestCidrIp(DestCidrIp)
request.set_Priority(Priority)
request.set_Policy('accept')
request.set_accept_format('json')
return request
def modifySecurityIpsRequest(self, DBInstanceId, SecurityIps, DBInstanceIPArrayName, DBInstanceIPArrayAttribute):
"""修改資料庫例項白名單
"""
request = ModifySecurityIpsRequest.ModifySecurityIpsRequest()
request.set_DBInstanceId(DBInstanceId)
request.set_SecurityIps(SecurityIps)
request.set_DBInstanceIPArrayName(DBInstanceIPArrayName)
request.set_DBInstanceIPArrayAttribute(DBInstanceIPArrayAttribute)
return request
if __name__ == '__main__':
ali = Alis(AccessKey, AccessSecret, RegionId) # 阿里雲後臺建立
clt = ali.client()
req = ali.revokeSecurityGroupRequest(securityGroupId, ipProtocol, direction,
portRange, sourceCidrIp, destCidrIp, priority)
res = clt.do_action_with_exception(req)
print("刪除安全組%s" % res)
req = ali.authorizeSecurityGroupRequest(securityGroupId, ipProtocol, direction,
portRange, sourceCidrIp, destCidrIp, priority)
res = clt.do_action_with_exception(req)
print("新增安全組%s" % res)
req = ali.modifySecurityIpsRequest(dbInstanceId, securityIps,
dbInstanceIPArrayName, dbInstanceIPArrayAttribute)
res = clt.do_action_with_exception(req)
print("修改RDS白名單%s" % res)