1. 程式人生 > >前端和後臺進行大量資料同步的一個小記

前端和後臺進行大量資料同步的一個小記

    最近在做一個前端收銀的專案,其中一塊很噁心的問題就是商品庫資料同步的問題,來記錄一下整個過程。

    由於商品同步的資料量較大,所以採用了檔案同步的方式。

    整個邏輯流程如下:

     ①下載服務端資料(zip壓縮包)→②解壓並解析裡面的檔案列表(files)→③跟本地資料庫的資料進行對比修改→④服務端資料對比修改完成後,打包本地差異資料

     →⑤將本地的差異資料包上傳到服務端→⑥資料同步完成。

    準備工作:

    這邊Android端採用的資料庫框架是greenDao這個框架(確實很不錯,用起來很舒服,像我這種很討厭SQL程式設計的用起來居然也很順手~~~~)

    整理一個檔案相關操作的幫助類:

public class FileCacheHelper {
   /**
    * 將資料存入指定檔案
    * @param file 指定的檔案
    * @param data 要儲存的資料
    * @throws IOException
    */
public static void saveToCacheFile(File file, String data) throws IOException{
      FileOutputStream outputStream = new FileOutputStream(file);
outputStream.write(data.getBytes()); outputStream.close(); } /** * 讀取檔案中的快取資料 * @param file * @return * @throws IOException */ public static String readFromCacheFile(File file) throws IOException{ String str = ""; StringBuffer data = new StringBuffer(); if(file.exists()){ FileInputStream inputStream = new
FileInputStream(file); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); while((str=reader.readLine()) != null){ data.append(str); } inputStream.close(); } return data.toString(); } /** * 返回每一行的資料 * @param file * @return * @throws IOException */ public static List<String> readFromFile(File file) throws IOException { List<String> stringList = new ArrayList<>(); if(file.exists()){ FileInputStream inputStream = new FileInputStream(file); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String str = ""; while ((str=reader.readLine()) != null){ stringList.add(str); } inputStream.close(); } return stringList; } public static File createOrGetCacheFile(Context context,String fileName) throws IOException { String path = getCacahFilePath(context); path += File.separator+ fileName; Log.d("path", path); return new File(path); } //獲取外部(內部)資料儲存路徑 public static String getCacahFilePath(Context context){ String path = ""; if(context.getExternalCacheDir() != null){ path = context.getExternalCacheDir().getAbsolutePath(); //外部路徑 }else{ path = context.getFilesDir().getAbsolutePath(); //內部路徑 } return path; } //解壓檔案(獲取檔案列表) public static List<String> unzipFile(Context context, File file) throws IOException { List<String> fileNameList = new ArrayList<>(); ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(file)); ZipEntry zipEntry; String szName = ""; while ((zipEntry = zipInputStream.getNextEntry()) != null){ szName = zipEntry.getName(); if(!zipEntry.isDirectory()){ String destName = ""; if(szName.contains("/")){ String names[] = szName.split("/"); destName = names[names.length - 1]; }else{ destName = szName; } File fileDest = FileCacheHelper.createOrGetCacheFile(context,destName); File fileDir = new File(fileDest.getParent()); if(!fileDir.exists()){ fileDir.mkdirs(); } FileOutputStream fileOutputStream = new FileOutputStream(fileDest); byte[] buff = new byte[2048]; int length; while ((length = zipInputStream.read(buff)) != -1){ fileOutputStream.write(buff, 0 , length); fileOutputStream.flush(); } fileOutputStream.close(); fileNameList.add(destName); } } zipInputStream.close(); return fileNameList; } /** * Compress file and folder * @param srcFileString file or folder to be Compress * @param zipFileString the path name of result ZIP * @throws Exception */ public static void zipFolder(String srcFileString, String zipFileString)throws Exception { //create ZIP ZipOutputStream outZip = new ZipOutputStream(new FileOutputStream(zipFileString)); //create the file File file = new File(srcFileString); //compress ZipFiles(file.getParent()+File.separator, file.getName(), outZip); //finish and close outZip.finish(); outZip.close(); } /** * compress files * @param folderString * @param fileString * @param zipOutputSteam * @throws Exception */ private static void ZipFiles(String folderString, String fileString, ZipOutputStream zipOutputSteam)throws Exception{ if(zipOutputSteam == null) return; File file = new File(folderString+fileString); if (file.isFile()) { ZipEntry zipEntry = new ZipEntry(fileString); FileInputStream inputStream = new FileInputStream(file); zipOutputSteam.putNextEntry(zipEntry); int len; byte[] buffer = new byte[4096]; while((len=inputStream.read(buffer)) != -1) { zipOutputSteam.write(buffer, 0, len); } zipOutputSteam.closeEntry(); } else { //folder String fileList[] = file.list(); //no child file and compress if (fileList.length <= 0) { ZipEntry zipEntry = new ZipEntry(fileString+File.separator); zipOutputSteam.putNextEntry(zipEntry); zipOutputSteam.closeEntry(); } //child files and recursion for (int i = 0; i < fileList.length; i++) { ZipFiles(folderString, fileString+java.io.File.separator+fileList[i], zipOutputSteam); }//end of for } } //儲存的下載的檔案到本地 public static File saveFile(Context context,ResponseBody body){ InputStream inputStream; byte[] buff = new byte[2048]; int length; FileOutputStream fileOutputStream; File file = null; try { inputStream = body.byteStream(); file = FileCacheHelper.createOrGetCacheFile(context,"syncData.zip"); fileOutputStream = new FileOutputStream(file); while ((length = inputStream.read(buff)) != -1){ fileOutputStream.write(buff, 0 , length); fileOutputStream.flush(); } fileOutputStream.close(); }catch (Exception e){ e.printStackTrace(); } return file; }
因為整個過程涉及到很多io操作和其中還穿插著介面呼叫,這個時候rxJava的好處就體現出來,(輕鬆穿梭於各個執行緒之間,哈哈~~),下面是主要同步的過程:
Flowable<MposResponseMsg> syncResult = MposApiManager.getInstance().getApiService().syncDown(param.getMapParam()); 
syncResult.subscribeOn(Schedulers.newThread())                     //發出資料同步的請求
            .filter(new Predicate<MposResponseMsg>() {
                @Override
public boolean test(MposResponseMsg mposResponseMsg) throws Exception {
                    return mposResponseMsg.syncDownResponse.success;   //收到請求成功的標識
}
            })
            .flatMap(new Function<MposResponseMsg, Publisher<MposResponseMsg>>() {
                @Override
public Publisher<MposResponseMsg> apply(MposResponseMsg mposResponseMsg) throws Exception {
                    final SyncFlowParam syncFlowParam = new SyncFlowParam();
syncFlowParam.setBatchNo(mposResponseMsg.syncDownResponse.batchNo);
                    return Flowable.interval(2, TimeUnit.SECONDS)
                            .flatMap(new Function<Long, Publisher<MposResponseMsg>>() {
                                @Override
public Publisher<MposResponseMsg> apply(Long aLong) throws Exception {
                                    return MposApiManager.getInstance().
                                           getApiService().syncFlow(syncFlowParam.getMapParam());
}     //輪詢資料檔案打包介面,等待資料打包,2秒一次,上限50次   
                            })                                        
                            .take(50)
                            .takeUntil(new Predicate<MposResponseMsg>() {
                                @Override
public boolean test(MposResponseMsg mposResponseMsg) throws Exception {
                                    return mposResponseMsg.sycnflowResponse.success }
                            });  //等到返回打包完成的標識,不再輪詢,準備開始下載資料包
}
            })
            .flatMap(new Function<MposResponseMsg, Publisher<ResponseBody>>() {
                @Override
public Publisher<ResponseBody> apply(MposResponseMsg mposResponseMsg) throws Exception {
                    String fileUrl = mposResponseMsg.sycnflowResponse.synchronizeFlowListJson.get(0).getPath();
                    return ApiManager.getInstance().getDownloadService().downloadSyncFile(fileUrl);
}                                                    ---//開始下載資料包
            })
            .observeOn(Schedulers.io())
            .map(new Function<ResponseBody, List<String>>() {
                @Override
public List<String> apply(ResponseBody body) throws Exception {
                    return FileCacheHelper.unzipFile(AboutActivity.this, FileCacheHelper.saveFile(AboutActivity.this, body));
}                                                   --//解壓資料包,拿到檔案列表
            })
            .flatMap(new Function<List<String>, Publisher<String>>() {
                @Override
public Publisher<String> apply(List<String> strings) throws Exception {
                    fileNames.addAll(strings);
                    return Flowable.fromIterable(strings);        
}
            })
            .map(new Function<String, File>() {
                @Override
public File apply(String s) throws Exception {
                    return FileCacheHelper.createOrGetCacheFile(AboutActivity.this, s);    --//拿到單個數據檔案
}                                                  
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new ApiSubscriberCallBack<File>() {
                @Override
public void onSuccess(File file) { 
                   syncProduct(file);                                             --//開始同步資料檔案
                }

                @Override
public void onComplete() {
                    LoadingDialogHelper.dismiss();
                }
            });
}
下面是檔案同步,同步完成後,打包本地差異資料,然後上傳的伺服器:
//同步商品資料
private void syncProduct(File file){
    LoadingDialogHelper.show(this);
Flowable.just(file)
            .subscribeOn(Schedulers.io())
            ...(中間一系列的變換操作,資料庫讀寫等)
            .subscribe(new ApiSubscriberCallBack<Product>() {
                @Override
public void onSuccess(Product product) {Utils.updateProduct(product);
}

                @Override
public void onComplete() {Utils.deleteProdutDisabled();
uploadFiles();      //打包本地差異資料,準備上傳
}

                @Override
public void onFailure(Throwable t) {
                    Log.d("error", t.getMessage());
}
            });
}
打包差異資料並上傳:
private void uploadFiles(){
    LoadingDialogHelper.show(this);
Flowable<MposResponseMsg> uploadFile = MposApiManager.getInstance().getApiService().syncUpload(new SyncUploadParam().getMapParam());
uploadFile.subscribeOn(Schedulers.newThread())           --//請求上傳檔案的介面
            .filter(new Predicate<MposResponseMsg>() {
                @Override
public boolean test(MposResponseMsg mposResponseMsg) throws Exception {
                    return mposResponseMsg.syncUploadResponse.success;   --//請求成功,準備打包資料
}
            })
            .observeOn(Schedulers.io())
            .flatMap(new Function<MposResponseMsg, Publisher<MposResponseMsg>>() {
                @Override
public Publisher<MposResponseMsg> apply(MposResponseMsg mposResponseMsg) throws Exception {
                    String bNo = mposResponseMsg.syncUploadResponse.batchNo;
RequestBody batchNo = RequestBody.create(MediaType.parse("text/plain"), bNo);
RequestBody storeNo = RequestBody.create(MediaType.parse("text/plain"),MPosApplication.getInstance().getStore_no());
File zipFile = FileCacheHelper.initSyncFiles(AboutActivity.this);
RequestBody requestFile = RequestBody.create(MediaType.parse("multipart/form-data"), zipFile);
MultipartBody.Part part = MultipartBody.Part.createFormData(bNo, bNo + ".zip", requestFile);
                    return ApiManager.getInstance().getDownloadService().uploadSyncFile(part, batchNo, storeNo);
}         ---------------//打包資料,並進行上傳
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new ApiSubscriberCallBack<MposResponseMsg>() {
                @Override
public void onSuccess(MposResponseMsg mposResponseMsg) {
                    LoadingDialogHelper.dismiss();showMessage1("資料同步完成");
}
            });
}
整個核心過程,大概就是這些,剛用rxjava不久,有些過程寫的不是很好,以後還得多多熟悉rxjava這個東西,多用它的操作符,將這個過程寫的更加順滑。

tttt