1. 程式人生 > >利用七牛雲解決PC端檔案同步

利用七牛雲解決PC端檔案同步

經常有一些文件需要在公司和家裡的兩地電腦間手工同步,很麻煩,於是利用七牛雲免費的空間來做服務端實現同步功能。
同時,為了文件內容保密,我採用cryptography模組(通過指定一個複雜的字串)對檔案加密,這樣即使別人獲取了七牛雲上的檔案也無法破解。

import os, time, uuid, requests
from qiniu import Auth, put_file, urlsafe_base64_encode, BucketManager
import qiniu.config
from cryptography.fernet import Fernet
from binascii import
b2a_hex, a2b_hex class Sync(object): def __init__(self, userKey): self.fernet = Fernet(b'dUTMoQzCwiamBkwNTGR4AXrw-xxxxxxx=') self.qn = Auth('UhzOHyzt0MeL', 'GzTlY7c30WZw') self.userKey = userKey self.buckName = 'bsync' self.tmpFile = os.path.join(os.environ['TEMP'
], self.userKey) self.bmFile = os.environ['LOCALAPPDATA'] + '\\User Data\\' def getRLUT(self): '''get remote last update time''' bucket = BucketManager(self.qn) ret, obj, info = bucket.list(bucket=self.buckName, prefix=self.userKey+'/') lastUpdateTime = ''
for i in ret['items']: updateTime = i['key'].split('/')[1] if updateTime > lastUpdateTime: lastUpdateTime = updateTime return lastUpdateTime def put(self, timeStr): with open(self.bmFile, 'r', encoding='utf8') as f: content = self.fernet.encrypt(bytes(f.read(), encoding='utf8')) with open(self.tmpFile, 'wb') as f: f.write(content) fileKey = self.genFileKey(timeStr) token = self.qn.upload_token(self.buckName, fileKey, 3600) ret, info = put_file(token, fileKey, self.tmpFile) def get(self, timeStr): fileKey = self.genFileKey(timeStr) base_url = 'http://xxxxx.bkt.clouddn.com/%s' % (fileKey) private_url = self.qn.private_download_url(base_url, expires=3600) resp = requests.get(private_url) with open(self.bmFile, 'w', encoding='utf8') as f: content = self.fernet.decrypt(resp.content) f.write(content.decode('utf8')) def genFileKey(self, timeStr): return self.userKey + '/' + timeStr def run(self): if not os.path.exists(self.bmFile): return remoteLastTimeStr = self.getRLUT() localLastTimeStr = time.strftime('%Y%m%d%H%M%S', time.localtime(os.path.getmtime(self.bmFile))) if remoteLastTimeStr < localLastTimeStr: self.put(localLastTimeStr) print("伺服器已更新為最新") elif remoteLastTimeStr > localLastTimeStr: self.get(remoteLastTimeStr) print("本地已更新為最新") else: print("伺服器與本地狀態同步") if __name__ == '__main__': sb = Sync('test') sb.run()