百萬級流水匯入與匯出(千萬級未測試)
阿新 • • 發佈:2018-11-11
百萬級流水寫入檔案,再從檔案中寫入資料庫
千萬級資料量未測試
Laravel 中進行的除錯
class BusinessFlowsHistoryJob extends Job { public function __construct() { } public function handle() { //統計時間由統計昨天交易變為不等於今天交易的所有記錄 //$date = date('Ymd',strtotime('-1 day')); $date = date("Ymd"); Log::info($date.':'.'交易流水歷史記錄匯入開始'); $status = $this->nowToHistory($date); if($status) Log::info($date.':'.'交易流水歷史記錄匯入完成'); } protected function nowToHistory($date) { try { $dir_path = public_path('BusinessFlowHistoryFile'); if(!is_dir($dir_path)) mkdir(iconv("UTF-8", "GBK", $dir_path),0777,true); $ext = '.txt'; $fileName = date('Ymd',strtotime('-1 day')).$ext; $filePath = $dir_path.'/'.$fileName; if(file_exists($filePath)) @unlink($filePath); $myfile = fopen($dir_path.'/'.$fileName, "w") or die("Unable to open file!"); $select = [ 'id','mch_no','end_id','system_check_no','batch_no','sign_path','created_at','updated_at','org_id','parent_org_id','trade_date', 'trade_time','end_type','tel_no','out_card','card_sqn','out_bank','in_card','in_bank','cups_no','settle_date', DB::raw("TO_CHAR(amount,'fm99999990.00') as amount"), DB::raw("TO_CHAR(in_fee,'fm99999990.00') as in_fee"), DB::raw("TO_CHAR(fee_total,'fm99999990.00') as fee_total"), DB::raw("TO_CHAR(settle_amount,'fm99999990.00') as settle_amount"), DB::raw("TO_CHAR(ref_amount,'fm99999990.00') as ref_amount"), 'input_mod','id_no','emv_flag','ic_data','ic_len','old_trace','rsv_data','orn_data','acq_rsv','txn_rsv1', 'txn_rsv2','txn_rsv3','accept_no','trade_no','input_id','input_code','output_id','output_code','trade_name','arrival_flag', 'user_type','correct_count','end_time','print_flag','resp_no','type' ]; BusinessFlowNow::where('trade_date', '!=',$date)->select($select)->chunk(5000, function($obj) use(&$myfile){ foreach ($obj as $v) { $txt = $v->id.'|'.$v->mch_no.'|'.$v->end_id.'|'.$v->system_check_no.'|'.$v->batch_no.'|' .$v->sign_path.'|'.$v->created_at.'|'.$v->updated_at.'|'.$v->org_id.'|'.$v->parent_org_id.'|' .$v->trade_date.'|'.$v->trade_time.'|'.$v->end_type.'|'.$v->tel_no.'|'.$v->out_card.'|' .$v->card_sqn.'|'.$v->out_bank.'|'.$v->in_card.'|'.$v->in_bank.'|'.$v->cups_no.'|' .$v->settle_date.'|'.$v->amount.'|'.$v->in_fee.'|'.$v->fee_total.'|'.$v->settle_amount.'|' .$v->ref_amount.'|'.$v->input_mod.'|'.$v->id_no.'|'.$v->emv_flag.'|'.$v->ic_data.'|' .$v->ic_len.'|'.$v->old_trace.'|'.$v->rsv_data.'|'.$v->orn_data.'|'.$v->acq_rsv.'|' .$v->txn_rsv1.'|'.$v->txn_rsv2.'|'.$v->txn_rsv3.'|'.$v->accept_no.'|'.$v->trade_no.'|' .$v->input_id.'|'.$v->input_code.'|'.$v->output_id.'|'.$v->output_code.'|'.$v->trade_name.'|' .$v->arrival_flag.'|'.$v->user_type.'|'.$v->correct_count.'|'.$v->end_time.'|'.$v->print_flag.'|' .$v->resp_no.'|'.$v->type.'!'."\n"; fwrite($myfile, $txt); unset($txt); } }); fclose($myfile); //生成歷史記錄 if(file_exists($filePath) && (lstat($filePath)['size'] > 1000)){ $file = fopen($filePath, "r") or die("讀取檔案失敗!"); $i = 0; $tmpStrArr = []; array_push($tmpStrArr,fgets($file)); while(!feof($file)) { $i++; if($i == 100){ $dataString = implode("", $tmpStrArr); $this->insertBusinessFlow($dataString,$date); $tmpStrArr = []; unset($dataString); $i = 0; } $tmpStrArr[$i]= fgets($file); if($tmpStrArr[$i] == false) break; } fclose($file); $this->insertBusinessFlow(implode("", $tmpStrArr),$date); DB::table('business_flows_now')->where('trade_date','!=',$date)->delete(); }else{ Log::info("檔案不存在或者檔案的大小小於1000位元組"); return 0; } return 1; } catch (\Exception $e) { Log::error($date.':'.'執行交易流水歷史記錄匯入失敗,錯誤資訊: '.$e); return 0; } } private function insertBusinessFlow($dataString,$date){ $expcon = array_filter(explode('!', trim($dataString))); $data = []; $count = 0; DB::transaction(function()use($expcon,$data,$count,$date) { foreach ($expcon as $kal=>$val){ $count++; if($count % 100 == 0){ DB::table('business_flows')->insert($data);//生成歷史記錄 unset($data); } $tmpData = explode('|',trim($val)); $data[$kal]['id'] = $tmpData[0];$data[$kal]['mch_no'] = $tmpData[1];$data[$kal]['end_id'] = $tmpData[2];$data[$kal]['system_check_no'] = $tmpData[3];$data[$kal]['batch_no'] = $tmpData[4]; $data[$kal]['sign_path'] = $tmpData[5];$data[$kal]['created_at'] = $tmpData[6];$data[$kal]['updated_at'] = $tmpData[7];$data[$kal]['org_id'] = $tmpData[8];$data[$kal]['parent_org_id'] = $tmpData[9]; $data[$kal]['trade_date'] = $tmpData[10];$data[$kal]['trade_time'] = $tmpData[11];$data[$kal]['end_type'] = $tmpData[12];$data[$kal]['tel_no'] = $tmpData[13];$data[$kal]['out_card'] = $tmpData[14]; $data[$kal]['card_sqn'] = $tmpData[15];$data[$kal]['out_bank'] = $tmpData[16];$data[$kal]['in_card'] = $tmpData[17];$data[$kal]['in_bank'] = $tmpData[18];$data[$kal]['cups_no'] = $tmpData[19]; $data[$kal]['settle_date'] = $tmpData[20];$data[$kal]['amount'] = $tmpData[21];$data[$kal]['in_fee'] = $tmpData[22];$data[$kal]['fee_total'] = $tmpData[23];$data[$kal]['settle_amount'] = $tmpData[24]; $data[$kal]['ref_amount'] = $tmpData[25];$data[$kal]['input_mod'] = $tmpData[26];$data[$kal]['id_no'] = $tmpData[27];$data[$kal]['emv_flag'] = $tmpData[28];$data[$kal]['ic_data'] = $tmpData[29]; $data[$kal]['ic_len'] = $tmpData[30];$data[$kal]['old_trace'] = $tmpData[31];$data[$kal]['rsv_data'] = $tmpData[32];$data[$kal]['orn_data'] = $tmpData[33];$data[$kal]['acq_rsv'] = $tmpData[34]; $data[$kal]['txn_rsv1'] = $tmpData[35];$data[$kal]['txn_rsv2'] = $tmpData[36];$data[$kal]['txn_rsv3'] = $tmpData[37];$data[$kal]['accept_no'] = $tmpData[38];$data[$kal]['trade_no'] = $tmpData[39]; $data[$kal]['input_id'] = $tmpData[40];$data[$kal]['input_code'] = $tmpData[41];$data[$kal]['output_id'] = $tmpData[42];$data[$kal]['output_code'] = $tmpData[43];$data[$kal]['trade_name'] = $tmpData[44]; $data[$kal]['arrival_flag'] = $tmpData[45];$data[$kal]['user_type'] = $tmpData[46];$data[$kal]['correct_count'] = $tmpData[47];$data[$kal]['end_time'] = $tmpData[48];$data[$kal]['print_flag'] = $tmpData[49]; $data[$kal]['resp_no'] = $tmpData[50];$data[$kal]['type'] = $tmpData[51]; } DB::table('business_flows')->insert($data);//生成歷史記錄 }); }