前端和後臺進行大量資料同步的一個小記
阿新 • • 發佈:2019-02-08
最近在做一個前端收銀的專案,其中一塊很噁心的問題就是商品庫資料同步的問題,來記錄一下整個過程。
由於商品同步的資料量較大,所以採用了檔案同步的方式。
整個邏輯流程如下:
①下載服務端資料(zip壓縮包)→②解壓並解析裡面的檔案列表(files)→③跟本地資料庫的資料進行對比修改→④服務端資料對比修改完成後,打包本地差異資料
→⑤將本地的差異資料包上傳到服務端→⑥資料同步完成。
準備工作:
這邊Android端採用的資料庫框架是greenDao這個框架(確實很不錯,用起來很舒服,像我這種很討厭SQL程式設計的用起來居然也很順手~~~~)
整理一個檔案相關操作的幫助類:
public class FileCacheHelper { /** * 將資料存入指定檔案 * @param file 指定的檔案 * @param data 要儲存的資料 * @throws IOException */ public static void saveToCacheFile(File file, String data) throws IOException{ FileOutputStream outputStream = new FileOutputStream(file);因為整個過程涉及到很多io操作和其中還穿插著介面呼叫,這個時候rxJava的好處就體現出來,(輕鬆穿梭於各個執行緒之間,哈哈~~),下面是主要同步的過程:outputStream.write(data.getBytes()); outputStream.close(); } /** * 讀取檔案中的快取資料 * @param file * @return * @throws IOException */ public static String readFromCacheFile(File file) throws IOException{ String str = ""; StringBuffer data = new StringBuffer(); if(file.exists()){ FileInputStream inputStream = newFileInputStream(file); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); while((str=reader.readLine()) != null){ data.append(str); } inputStream.close(); } return data.toString(); } /** * 返回每一行的資料 * @param file * @return * @throws IOException */ public static List<String> readFromFile(File file) throws IOException { List<String> stringList = new ArrayList<>(); if(file.exists()){ FileInputStream inputStream = new FileInputStream(file); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String str = ""; while ((str=reader.readLine()) != null){ stringList.add(str); } inputStream.close(); } return stringList; } public static File createOrGetCacheFile(Context context,String fileName) throws IOException { String path = getCacahFilePath(context); path += File.separator+ fileName; Log.d("path", path); return new File(path); } //獲取外部(內部)資料儲存路徑 public static String getCacahFilePath(Context context){ String path = ""; if(context.getExternalCacheDir() != null){ path = context.getExternalCacheDir().getAbsolutePath(); //外部路徑 }else{ path = context.getFilesDir().getAbsolutePath(); //內部路徑 } return path; } //解壓檔案(獲取檔案列表) public static List<String> unzipFile(Context context, File file) throws IOException { List<String> fileNameList = new ArrayList<>(); ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(file)); ZipEntry zipEntry; String szName = ""; while ((zipEntry = zipInputStream.getNextEntry()) != null){ szName = zipEntry.getName(); if(!zipEntry.isDirectory()){ String destName = ""; if(szName.contains("/")){ String names[] = szName.split("/"); destName = names[names.length - 1]; }else{ destName = szName; } File fileDest = FileCacheHelper.createOrGetCacheFile(context,destName); File fileDir = new File(fileDest.getParent()); if(!fileDir.exists()){ fileDir.mkdirs(); } FileOutputStream fileOutputStream = new FileOutputStream(fileDest); byte[] buff = new byte[2048]; int length; while ((length = zipInputStream.read(buff)) != -1){ fileOutputStream.write(buff, 0 , length); fileOutputStream.flush(); } fileOutputStream.close(); fileNameList.add(destName); } } zipInputStream.close(); return fileNameList; } /** * Compress file and folder * @param srcFileString file or folder to be Compress * @param zipFileString the path name of result ZIP * @throws Exception */ public static void zipFolder(String srcFileString, String zipFileString)throws Exception { //create ZIP ZipOutputStream outZip = new ZipOutputStream(new FileOutputStream(zipFileString)); //create the file File file = new File(srcFileString); //compress ZipFiles(file.getParent()+File.separator, file.getName(), outZip); //finish and close outZip.finish(); outZip.close(); } /** * compress files * @param folderString * @param fileString * @param zipOutputSteam * @throws Exception */ private static void ZipFiles(String folderString, String fileString, ZipOutputStream zipOutputSteam)throws Exception{ if(zipOutputSteam == null) return; File file = new File(folderString+fileString); if (file.isFile()) { ZipEntry zipEntry = new ZipEntry(fileString); FileInputStream inputStream = new FileInputStream(file); zipOutputSteam.putNextEntry(zipEntry); int len; byte[] buffer = new byte[4096]; while((len=inputStream.read(buffer)) != -1) { zipOutputSteam.write(buffer, 0, len); } zipOutputSteam.closeEntry(); } else { //folder String fileList[] = file.list(); //no child file and compress if (fileList.length <= 0) { ZipEntry zipEntry = new ZipEntry(fileString+File.separator); zipOutputSteam.putNextEntry(zipEntry); zipOutputSteam.closeEntry(); } //child files and recursion for (int i = 0; i < fileList.length; i++) { ZipFiles(folderString, fileString+java.io.File.separator+fileList[i], zipOutputSteam); }//end of for } } //儲存的下載的檔案到本地 public static File saveFile(Context context,ResponseBody body){ InputStream inputStream; byte[] buff = new byte[2048]; int length; FileOutputStream fileOutputStream; File file = null; try { inputStream = body.byteStream(); file = FileCacheHelper.createOrGetCacheFile(context,"syncData.zip"); fileOutputStream = new FileOutputStream(file); while ((length = inputStream.read(buff)) != -1){ fileOutputStream.write(buff, 0 , length); fileOutputStream.flush(); } fileOutputStream.close(); }catch (Exception e){ e.printStackTrace(); } return file; }
Flowable<MposResponseMsg> syncResult = MposApiManager.getInstance().getApiService().syncDown(param.getMapParam()); syncResult.subscribeOn(Schedulers.newThread()) //發出資料同步的請求 .filter(new Predicate<MposResponseMsg>() { @Override public boolean test(MposResponseMsg mposResponseMsg) throws Exception { return mposResponseMsg.syncDownResponse.success; //收到請求成功的標識 } }) .flatMap(new Function<MposResponseMsg, Publisher<MposResponseMsg>>() { @Override public Publisher<MposResponseMsg> apply(MposResponseMsg mposResponseMsg) throws Exception { final SyncFlowParam syncFlowParam = new SyncFlowParam(); syncFlowParam.setBatchNo(mposResponseMsg.syncDownResponse.batchNo); return Flowable.interval(2, TimeUnit.SECONDS) .flatMap(new Function<Long, Publisher<MposResponseMsg>>() { @Override public Publisher<MposResponseMsg> apply(Long aLong) throws Exception { return MposApiManager.getInstance(). getApiService().syncFlow(syncFlowParam.getMapParam()); } //輪詢資料檔案打包介面,等待資料打包,2秒一次,上限50次 }) .take(50) .takeUntil(new Predicate<MposResponseMsg>() { @Override public boolean test(MposResponseMsg mposResponseMsg) throws Exception { return mposResponseMsg.sycnflowResponse.success } }); //等到返回打包完成的標識,不再輪詢,準備開始下載資料包 } }) .flatMap(new Function<MposResponseMsg, Publisher<ResponseBody>>() { @Override public Publisher<ResponseBody> apply(MposResponseMsg mposResponseMsg) throws Exception { String fileUrl = mposResponseMsg.sycnflowResponse.synchronizeFlowListJson.get(0).getPath(); return ApiManager.getInstance().getDownloadService().downloadSyncFile(fileUrl); } ---//開始下載資料包 }) .observeOn(Schedulers.io()) .map(new Function<ResponseBody, List<String>>() { @Override public List<String> apply(ResponseBody body) throws Exception { return FileCacheHelper.unzipFile(AboutActivity.this, FileCacheHelper.saveFile(AboutActivity.this, body)); } --//解壓資料包,拿到檔案列表 }) .flatMap(new Function<List<String>, Publisher<String>>() { @Override public Publisher<String> apply(List<String> strings) throws Exception { fileNames.addAll(strings); return Flowable.fromIterable(strings); } }) .map(new Function<String, File>() { @Override public File apply(String s) throws Exception { return FileCacheHelper.createOrGetCacheFile(AboutActivity.this, s); --//拿到單個數據檔案 } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new ApiSubscriberCallBack<File>() { @Override public void onSuccess(File file) { syncProduct(file); --//開始同步資料檔案 } @Override public void onComplete() { LoadingDialogHelper.dismiss(); } }); }下面是檔案同步,同步完成後,打包本地差異資料,然後上傳的伺服器:
//同步商品資料 private void syncProduct(File file){ LoadingDialogHelper.show(this); Flowable.just(file) .subscribeOn(Schedulers.io()) ...(中間一系列的變換操作,資料庫讀寫等) .subscribe(new ApiSubscriberCallBack<Product>() { @Override public void onSuccess(Product product) {Utils.updateProduct(product); } @Override public void onComplete() {Utils.deleteProdutDisabled(); uploadFiles(); //打包本地差異資料,準備上傳 } @Override public void onFailure(Throwable t) { Log.d("error", t.getMessage()); } }); }打包差異資料並上傳:
private void uploadFiles(){ LoadingDialogHelper.show(this); Flowable<MposResponseMsg> uploadFile = MposApiManager.getInstance().getApiService().syncUpload(new SyncUploadParam().getMapParam()); uploadFile.subscribeOn(Schedulers.newThread()) --//請求上傳檔案的介面 .filter(new Predicate<MposResponseMsg>() { @Override public boolean test(MposResponseMsg mposResponseMsg) throws Exception { return mposResponseMsg.syncUploadResponse.success; --//請求成功,準備打包資料 } }) .observeOn(Schedulers.io()) .flatMap(new Function<MposResponseMsg, Publisher<MposResponseMsg>>() { @Override public Publisher<MposResponseMsg> apply(MposResponseMsg mposResponseMsg) throws Exception { String bNo = mposResponseMsg.syncUploadResponse.batchNo; RequestBody batchNo = RequestBody.create(MediaType.parse("text/plain"), bNo); RequestBody storeNo = RequestBody.create(MediaType.parse("text/plain"),MPosApplication.getInstance().getStore_no()); File zipFile = FileCacheHelper.initSyncFiles(AboutActivity.this); RequestBody requestFile = RequestBody.create(MediaType.parse("multipart/form-data"), zipFile); MultipartBody.Part part = MultipartBody.Part.createFormData(bNo, bNo + ".zip", requestFile); return ApiManager.getInstance().getDownloadService().uploadSyncFile(part, batchNo, storeNo); } ---------------//打包資料,並進行上傳 }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new ApiSubscriberCallBack<MposResponseMsg>() { @Override public void onSuccess(MposResponseMsg mposResponseMsg) { LoadingDialogHelper.dismiss();showMessage1("資料同步完成"); } }); }整個核心過程,大概就是這些,剛用rxjava不久,有些過程寫的不是很好,以後還得多多熟悉rxjava這個東西,多用它的操作符,將這個過程寫的更加順滑。
tt