七牛雲-python sdk 下載完整實現
阿新 • • 發佈:2019-02-20
目標:
- 熟悉七牛python sdk 的使用
- 呼叫七牛的domain/list 介面獲取空間繫結的域名列表
- 通過七牛python sdk 實現下載
概述:
七牛提供了python sdk, 可以進行上傳、下載等功能; 本文主要是將一些知識點連線起來,在熟悉python 的基礎上,同時實現下載的功能。 涉及到的一些小的知識點有:匯入模組、正則表示式、字串切片、requests包、熟悉七牛sdk、熟悉七牛獲取空間繫結域名列表的介面,以及Python File的一些知識; 雖然在本文的程式碼示例中體現的不是很多,但是在程式編寫的過程中, 對於上述的知識點都有了很多的鞏固。
程式流程圖:
程式示例:
from qiniu import Auth, urlsafe_base64_encode
from common.AccountMgr import AccountMgr
import requests, json, array, re, os
'''
author: xuhuanchao
date: 2017-07-11
AccountMgr 是自己定義的模組,存放了AK, SK 等賬號資訊
'''
accountMgr = AccountMgr()
accessKey = accountMgr.getAccessKey()
secretKey = accountMgr.getSecretKey()
#通過AK, SK 建立 auth物件
auth = Auth(accessKey, secretKey)
def __getDownloadUrl(bucketName, key):
'''
獲取下載的URL
:param bucketName: 空間名稱
:param key: 空間儲存的檔名稱
:return:
'''
flag = __isPrivateBucket(bucketName)
domainList = getDomainByBucket(bucketName)
print(domainList)
domain = ''
for i in range(len(domainList)):
pattern = re.compile("[a-z0-9.]*clouddn.com]")
match = pattern.match(domainList[i])
if match:
continue
else:
domain = domainList[i]
#如果空間沒有繫結自定義域名, 則使用測試域名
if(domain == ''):
domain = domainList[0]
baseUrl = "http://" + domain + "/" + key
if(flag):
downloadUrl = auth.private_download_url(baseUrl)
else:
downloadUrl = baseUrl
return downloadUrl
def __isPrivateBucket(bucketName):
'''
判斷是否是私有空間, 用admin 開頭命名的空間都是 私有空間; tips: 自己的業務控制
:param bucketName: 空間名稱
:return: bool
'''
if bucketName == "admin" or bucketName[0:5] == "admin":
return True
else:
return False
def getDomainByBucket(bucketName):
'''
通過七牛雲介面, 獲取空間繫結的域名
參考文件:https://developer.qiniu.com/kodo/api/1612/bucket-domainlist
:param bucketName: 空間名稱
:return: list of bucket's domain
'''
connector = "/v6/domain/list?tbl={0}".format(bucketName)
url = "http://api.qiniu.com" + connector
try:
accessToken = auth.token_of_request(connector)
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": "QBox " + accessToken
}
resp = requests.get(url, headers=headers)
resp.raise_for_status()
if(resp.status_code == requests.codes.ok):
return list(resp.json())
except Exception as e:
print("出現異常...資訊如下:\n" + e)
def download(bucketName, key, localPath):
'''
開始下載
:param bucketName: 空間名稱
:param key: 下載的檔名稱
:param localPath: 儲存到本地的路徑
:return: None
'''
downloadUrl = __getDownloadUrl(bucketName, key)
print(downloadUrl)
resp = requests.get(downloadUrl)
path = localPath + "/" + key
with open(path, 'wb+') as f:
f.write(resp.content)
if(__name__ == "__main__"):
bucketName = "test-bucket"
key = "stage.jpg"
localPath = "/Users/ryanxu/Documents"
download(bucketName, key, localPath)