利用七牛雲解決PC端檔案同步
阿新 • • 發佈:2019-01-04
經常有一些文件需要在公司和家裡的兩地電腦間手工同步,很麻煩,於是利用七牛雲免費的空間來做服務端實現同步功能。
同時,為了文件內容保密,我採用cryptography模組(通過指定一個複雜的字串)對檔案加密,這樣即使別人獲取了七牛雲上的檔案也無法破解。
import os, time, uuid, requests
from qiniu import Auth, put_file, urlsafe_base64_encode, BucketManager
import qiniu.config
from cryptography.fernet import Fernet
from binascii import b2a_hex, a2b_hex
class Sync(object):
def __init__(self, userKey):
self.fernet = Fernet(b'dUTMoQzCwiamBkwNTGR4AXrw-xxxxxxx=')
self.qn = Auth('UhzOHyzt0MeL', 'GzTlY7c30WZw')
self.userKey = userKey
self.buckName = 'bsync'
self.tmpFile = os.path.join(os.environ['TEMP' ], self.userKey)
self.bmFile = os.environ['LOCALAPPDATA'] + '\\User Data\\'
def getRLUT(self):
'''get remote last update time'''
bucket = BucketManager(self.qn)
ret, obj, info = bucket.list(bucket=self.buckName, prefix=self.userKey+'/')
lastUpdateTime = ''
for i in ret['items']:
updateTime = i['key'].split('/')[1]
if updateTime > lastUpdateTime:
lastUpdateTime = updateTime
return lastUpdateTime
def put(self, timeStr):
with open(self.bmFile, 'r', encoding='utf8') as f:
content = self.fernet.encrypt(bytes(f.read(), encoding='utf8'))
with open(self.tmpFile, 'wb') as f:
f.write(content)
fileKey = self.genFileKey(timeStr)
token = self.qn.upload_token(self.buckName, fileKey, 3600)
ret, info = put_file(token, fileKey, self.tmpFile)
def get(self, timeStr):
fileKey = self.genFileKey(timeStr)
base_url = 'http://xxxxx.bkt.clouddn.com/%s' % (fileKey)
private_url = self.qn.private_download_url(base_url, expires=3600)
resp = requests.get(private_url)
with open(self.bmFile, 'w', encoding='utf8') as f:
content = self.fernet.decrypt(resp.content)
f.write(content.decode('utf8'))
def genFileKey(self, timeStr):
return self.userKey + '/' + timeStr
def run(self):
if not os.path.exists(self.bmFile):
return
remoteLastTimeStr = self.getRLUT()
localLastTimeStr = time.strftime('%Y%m%d%H%M%S', time.localtime(os.path.getmtime(self.bmFile)))
if remoteLastTimeStr < localLastTimeStr:
self.put(localLastTimeStr)
print("伺服器已更新為最新")
elif remoteLastTimeStr > localLastTimeStr:
self.get(remoteLastTimeStr)
print("本地已更新為最新")
else:
print("伺服器與本地狀態同步")
if __name__ == '__main__':
sb = Sync('test')
sb.run()