檔案上傳到七牛雲oss並重新整理cdn
阿新 • • 發佈:2020-10-22
目錄
參考文件
https://github.com/qiniu/python-sdk/blob/master/examples
示例程式碼
import qiniu from qiniu import Auth, put_file, build_batch_copy,BucketManager,CdnManager import sys,os,datetime,time,subprocess qiniu_oss_bucket = { "xxxx": { "bucket": "xxx-xxxx", "cdndomain": "https://xxx-xxxx-cdn.xxxxxx.com", "addr": [ "/index.html", ] } } project_name = sys.argv[1] #dir = "/root/deploy-sm/test/{}".format(project_name) dir = "/root/deploy-db/zwl/{}".format(project_name) now_time = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S') def judage_file_type(domain_url,url_lst): file_lst = [] dir_lst = [] for v in url_lst: if v.endswith('/'): # print("說明是重新整理目錄") v = domain_url + v dir_lst.append(v) else: # print("說明是重新整理檔案") v = domain_url + v file_lst.append(v) return file_lst,dir_lst def refresh_cdn(): access_key = 'ftZ2LPyrCtUCiwFIlFWCtKZNj390eZwGj9yHHVas' secret_key = 'UkyykgXVI9cOfPaviKV3aVhkinUUr09AG1rEyiwg' auth = qiniu.Auth(access_key=access_key, secret_key=secret_key) cdn_manager = CdnManager(auth) # refresh_urls_and_dirs url_lst = qiniu_oss_bucket[project_name]["addr"] domain_url = qiniu_oss_bucket[project_name]["cdndomain"] file_lst,dir_lst = judage_file_type(domain_url,url_lst) ret1,ret2 = cdn_manager.refresh_urls_and_dirs(file_lst,dir_lst) print("ret1: ",ret1) print("ret2: ",ret2) if ret1['code'] != 200 and ret2.status_code != 200: data = { "code": "200", "msg": "重新整理成功" } else: data = { "code": "500", "msg": "重新整理失敗" } return data def time_exchange(): t2 = datetime.datetime.now() timestamp = time.mktime(t2.timetuple()) # print("timestamp:",timestamp) datetime_struct = datetime.datetime.fromtimestamp(timestamp) datetime_obj = (datetime_struct + datetime.timedelta(hours=0.04)) datetime_str = datetime_obj.strftime('%Y-%m-%d %H:%M:%S') timestamp_deadtime = datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") return timestamp_deadtime def upload_oss_file_to_qiniu(file): # 上傳檔案到七牛後, 七牛將檔名和檔案大小回調給業務伺服器。 timestamp_deadtime = time_exchange() policy = { "insertOnly": 0, # 同名直接覆蓋 "deadline": timestamp_deadtime } access_key = 'xxx' secret_key = 'xxxxx' auth = Auth(access_key=access_key, secret_key=secret_key) bucket_name = qiniu_oss_bucket[project_name]["bucket"] oss_filename = file.split(project_name+"/")[1] rebucket = BucketManager(auth) token = auth.upload_token(bucket_name,key=oss_filename,policy=policy) #要上傳檔案的本地路徑 localfile_abs_path = file ret, info = put_file(token, oss_filename, localfile_abs_path) # if info.status_code != 200: # # 可能同名了 # ops = build_batch_copy(bucket_name,{oss_filename: oss_filename},bucket_name,force=true) # ret, info = rebucket.batch(ops) # print("ret:",ret) # print("info:",info) def list(dir): """ project_name dir -> /root/deploy-sm/test/${project_name} """ fs=os.listdir(dir) for f in fs: if f == "dist.zip": old_path = dir +"/"+f new_path = dir +"/"+ ".dist-" + now_time command="/usr/bin/mv {_oldpath} {_newpath}".format(_oldpath=old_path,_newpath=new_path) print("command: ",command) msg = subprocess.call(command, shell=True) if msg == 0: print("dist.zip備份成功,更名為{}".format("dist-"+ now_time)) f = ".dist-"+ now_time file = dir+"/"+f upload_oss_file_to_qiniu(file) else: print("dist命名失敗,停止上傳檔案") sys.exit(1) else: file = dir+"/"+f if os.path.isdir(file): list(file) else: print("正在上傳檔案 -> " + file) upload_oss_file_to_qiniu(file) # 重新整理cdn refresh_cdn_result = refresh_cdn() if refresh_cdn_result['code'] !=200: sys.exit(1) if __name__ == "__main__": list(dir)