node csv檔案流讀取
csv檔案流讀取,可以應對大檔案,資料截斷髮送,不會出現記憶體不足的情況
function readInAppEventReports(user,date,filePath, callback) {
var reports = [];
var google_facebook_reports=[];
var other_reports=[];
var item_other = {};
var item_fg={};
var dbFlag = true;
var getCount = 0;
var tCount = 0; //api獲取資料條數;
var otherCount=0;
var facebook_google_Count=0;
var other_reports_send_config=5000;
var google_facebook_reports_send_config=5000;
current_site={
id : filePath.split('=')[1].split('&')[0]
};
async.waterfall([
function (cb) {
readStream = fs.createReadStream(filePath);
console.log('---test' , filePath);
readStream.pipe(csv())
.on('data', function (data) {
// readStream.pause();
//資料封裝,打包
var media_source=data['Media Source'];
if (media_source==="googleadwords_int" || media_source==="Facebook Ads" ) {
item_fg=packItemForInAppEventReportFacebookGoogle(data);
if (item_fg.err) {
console.log('item_fg.err',item_fg.err);
return callback(null);
} else {
eventDataFilterFacebookGoogle(item_fg.value,google_facebook_reports);
}
} else {
item_other = packItemForInAppEventReport(data);
if (item_other.err) {
console.log('item_other.err', item_other.err);
return callback(null);
} else {
//資料重複過濾
eventDataFilter(item_other.value, other_reports);
}
}
tCount++;
//傳送其餘資料
async.waterfall([
function (fn) {
if (other_reports.length >= other_reports_send_config && dbFlag) {
console.log('on data... send before other_reports.length=',other_reports.length);
dbFlag = false;
readStream.pause();
var temOther = other_reports.slice(0, other_reports_send_config);
Dbapi.sendMsg({
json: {
actionid: 8220,
noat: true,
events: temOther
}
}, function (err, result) {
console.log('other_reports Dbapi sendMsg', 8220, err, result);
getCount += temOther.length;
otherCount+=temOther.length;
console.log('on data other_reports tCount', tCount, 'getCount', getCount,'otherCount',otherCount);
other_reports.splice(0, temOther.length);
console.log('on data... after other_reports.length=',other_reports.length);
dbFlag = true;
readStream.resume();
fn(err);
}, 2);
} else {
fn(null);
}
},
function (fn) {
//傳送facebook,google資料
// console.log('google_facebook_reports.length=',google_facebook_reports.length);
if (google_facebook_reports.length >= google_facebook_reports_send_config && dbFlag) {
console.log('on data ...send before google_facebook_reports.length=',google_facebook_reports.length);
dbFlag = false;
readStream.pause();
//傳送其餘資料
var temp_google_facebook_ = google_facebook_reports.slice(0, google_facebook_reports_send_config);
Dbapi.sendEvent({
json: {
actionid: 1001,
noat: true,
events: temp_google_facebook_
}
}, function (err, result) {
console.log('google_facebook_reports Dbapi sendMsg', 1001, err, result);
getCount += temp_google_facebook_.length;
facebook_google_Count+=temp_google_facebook_.length;
console.log('on data google_facebook_reports tCount', tCount, 'getCount', getCount,'facebook_google_Count',facebook_google_Count);
google_facebook_reports.splice(0, temp_google_facebook_.length);
console.log('on data... send after google_facebook_reports.length=',google_facebook_reports.length);
dbFlag = true;
readStream.resume();
fn(err);
}, 2);
} else {
fn(null);
}
}
],function (err) {
if (err) console.log('readInAppEventReports on(data) ',err);
if (tCount>=API_MAX) {
reports.push(data);
}
});
})
.on('end', function () {
var otherMsgSend=function (buff,cb) {
Dbapi.sendMsg({
json: {
actionid: 8220,
noat: true,
events: buff
}
}, function (err, result) {
console.log('other_reports Dbapi sendMsg', 8220, err, result);
getCount += buff.length;
otherCount+=buff.length;
console.log('other_reports *** tCount', tCount, 'getCount', getCount,'otherCount',otherCount);
cb(err);
}, 2);
};
var fgMsgSend=function (buff,cb) {
Dbapi.sendEvent({
json: {
actionid: 1001,
noat: true,
events: buff
}
}, function (err, result) {
console.log('google_facebook_reports Dbapi sendMsg', 1001, err, result);
getCount += buff.length;
facebook_google_Count+=buff.length;
console.log('google_facebook_reports *** tCount', tCount, 'getCount', getCount,'facebook_google_Count',facebook_google_Count);
cb(err);
}, 2);
};
async.waterfall([
function (fn) {
console.log('on end read other_reports');
console.log('on end send before other_reports.length =',other_reports.length );
if (other_reports.length !== 0) {
var flag=true;
async.whilst(
function () {
return flag;
},
function (fn_cb) {
console.log('on end send..other_reports.length =',other_reports.length );
if (other_reports.length > other_reports_send_config){
flag=true;
var buff=other_reports.splice(0,other_reports_send_config);
otherMsgSend(buff,function (err) {
fn_cb(err);
});
} else {
flag=false;
return fn_cb(null)
}
},
function (err) {
if (err) console.log(err);
otherMsgSend(other_reports,function (error) {
fn(error);
});
});
} else {
fn(null)
}
},
function (fn) {
console.log('on end read google_facebook_reports');
console.log('on end send before google_facebook_reports.length=',google_facebook_reports.length);
if (google_facebook_reports.length !==0) {
var flag=true;
async.whilst(
function () {
return flag;
},
function (fn_cb) {
console.log('on end send...google_facebook_reports.length=',google_facebook_reports.length);
if (google_facebook_reports.length > google_facebook_reports_send_config) {
flag=true;
var buff=google_facebook_reports.splice(0,google_facebook_reports_send_config);
fgMsgSend(buff,function (err) {
fn_cb(err);
});
} else {
flag=false;
return fn_cb(null);
}
},
function (err) {
if (err) console.log(err);
fgMsgSend(google_facebook_reports,function (error) {
fn(error);
});
});
} else {
fn(null);
}
}
],function (err) {
if (err) console.log('readInAppEventReports on(end) ',err);
cb(null);
});
});
},
function (cb) {
// fs.unlink(filePath, function (err) {
// cb(err);
// });
cb(null);
}
], function (err) {
if (err) console.log(err);
async.waterfall([
function (fn) {
console.log('tCount=',tCount,'API_MAX=',API_MAX);
if (tCount >= API_MAX - 20000) {
console.log('readInAppEventReports API_MAX - 20000 Monitor ...');
var apiErr = Monitor.errFactory();
apiErr.err = '資料不完整';
apiErr.funct = readInstallsReport.toString();
apiErr.param = filePath;
apiErr.level = 10;
Monitor.send(apiErr);
}
fn(null);
},
function (fn) {
console.log('tCount=',tCount,'API_MAX=',API_MAX);
if (tCount >= API_MAX) {
console.log('readInAppEventReports data >',API_MAX/1000,'K');
// console.log('reports.length=',reports.length);
// console.log(JSON.stringify(reports[0]));
// var str='2017/7/10 23:59:59';
var detail_date=[];
detail_date=reports[0]['Event Time'].substr(10,6).split(':');
detail_date[0]=detail_date[0].replace(" ","");
if (parseInt(detail_date[1])<59) {
detail_date[1]=parseInt(detail_date[1])+1;
}
if (parseInt(detail_date[1])===59) {
detail_date[0]=parseInt(detail_date[0])+1;
detail_date[1]='00';
}
console.log('detail_date:',detail_date);
var param={
detail_date:detail_date,
website:current_site
};
setTimeout(function () {
inAppEventReport(user, date, param, function (err) {
console.log('inAppEventReport end err', err);
Monitor.send('read event report');
fn(err);
});
},3000);
} else if (tCount===6 || other_reports.length===0 || google_facebook_reports.length===0) {
console.log('當前時間',new Date().toLocaleDateString(),'待獲取應用日期 date=',date,' website= ',current_site.id,'應用超過請求次數');
fn(null);
} else {
console.log('當前時間',new Date().toLocaleDateString(),'待獲取應用日期 date=',date,' website= ',current_site.id,'該天沒有資料');
fn(null);
}
}
],function (err) {
callback(err);
});
});
}
集合遍歷,eachLimit裡面的方法會並行
async.eachLimit(users[user].websites, users[user].websites.length, function (website, cb) {
console.log('inAppEventReports user=',user,'date=',date,'website=',website);
loadInAppEventReports(user, website, date, function (err, path) {
if (path) filesPath = filesPath.concat(path);
console.log('loadInAppEventReports end', website.id, 'err', err, new Date().toString());
cb(null);
})
}, function (err) {
cba(err, filesPath);
});
可以將limit設定為1 使其序列
async.eachLimit(dateList.slice(0,6), 1, function (dateItem, callback) {
console.log('waterfall-start', dateItem);
async.waterfall([
function (cb) {
mainFunction(dateItem, dateItem, function (err) {
cb(null);
});
}
], function (err) {
callback(null);
});
}, function (err) {
console.error('history end err', err);
});
相關推薦
node csv檔案流讀取
csv檔案流讀取,可以應對大檔案,資料截斷髮送,不會出現記憶體不足的情況 function readInAppEventReports(user,date,filePath, callback) { var reports = []; var g
node 操作檔案流 fs 同步與非同步 流式檔案的寫入與讀取
fs fs ( File System ) 檔案系統 在node中通過fs模組來和系統中的檔案進行互動 通過fs模組可以對磁碟中的檔案做各種增刪改查的操作 寫入檔案 1.同
CSV檔案的讀取,TensorFlow和pandas
csv檔案的讀取,有兩種方法:呼叫pandas庫函式或者直接用TensorFlow讀取, 1、呼叫pandas data.csv是自己隨便搞的一個數據檔案,資料樣例和讀取程式碼如下: import tensorflow as tf import pandas as pd def
從CSV檔案中讀取jpg圖片的URL地址並多執行緒批量下載
很多時候,我們的網站上傳圖片時並沒有根據內容進行資料夾分類,甚至會直接儲存到阿里雲的OSS或是七牛雲等雲端儲存上。這樣,當我們需要打包圖片時,就需要從資料庫找尋分類圖片,通過CURL進行下載。我最近剛剛完成了一個這樣的任務,覺得會比較常用,就把程式放到了github上分享給大家,希望大家能夠喜歡。 do
FFMPEG音視訊解碼流程&MP4音視訊檔案流讀取(轉)
1.播放多媒體檔案步驟 通常情況下,我們下載的視訊檔案如MP4,MKV、FLV等都屬於封裝格式,就是把音視訊資料按照相應的規範,打包成一個文字檔案。我們可以使用MediaInfo這個工具檢視媒體檔案的相關資訊。 所以當我們播放一個媒體檔案時,通常需要經過以下幾個步驟
【120】TensorFlow 從CSV檔案中讀取資料並訓練線性迴歸模型(面向新手)
正文開始。 學習 TensorFlow 讓我的思維發生了變化。 計算機本質上是一種數學的工具,而我在學習程式設計的時候,思維也不可避免地收到了影響。傳統的程式設計思想,常常認為程式就應該像數學定理或者數學函式一樣,給出一個確定的結果。這是一種基於邏輯推導
C語言fread()函式:讀檔案函式(從檔案流讀取資料)
相關函式:fopen, fwrite, fseek, fscanf標頭檔案:#include <stdio.h>定義函式:size_t fread(void * ptr, size_t size, size_t nmemb, FILE * stream);函式說
Java從CSV檔案中讀取資料和寫入
.CSV檔案是以逗號分割的資料倉儲,讀取資料時從每一行中讀取一條資料元祖,也就是一條資料,再用字元分割的方式獲取表中的每一個數據項。 package com.conn.csv;
CSV檔案的讀取
下午接到專案,讀取一個csv檔案並寫入到資料庫中,把資料存入資料庫中比較簡單,可以參考市面上的寫法,或者用一個物件,存放讀取的資料,批量插入資料庫。 但是下午的資料容量較大,據說有2000W條日處理量,而且要求cpu儘量滿格。 好吧,又是一個坑爹又艱難的專案。言歸正傳,來搞
CSV檔案中讀取資料分割問題
CSV檔案預設用英文逗號作為列分隔符,換行符作為行分隔符。 有時欄位裡含有,和換行符就麻煩了,資料輸出會出現混亂。這時可以使用雙引號"來將每個欄位內容括起來,CSV預設認為由""括起來的內容是一個欄位, 這時不管欄位內容裡有除"之外字元的任何字元都可以按原來形式引用。 sp
csv檔案的讀取類
標頭檔案 XCFileStream.h#pragma once #include <string> #include <list> #include <vector> using namespace std; #include <w
c# 去掉檔案流讀取的txt檔案中的空格
百度沒有查到,試驗出一個簡單的方法如下dr[0] = data[0].Trim();//去除字串中的空格private void button1_Click(object sender, EventArgs e) { //建立一個開啟檔
ifstream 檔案流讀取格式資料出現的問題
在檔案中均為數字的時候,需要將這些數字按整型讀取,可以直接利用檔案流進行讀取,但是一旦檔案中存在非數字字元的時候,比如出現字串就會出現異常。因此我們需要處理這種情況。下面這段程式可以處理這個問題。其實問題的關鍵就是在in.ignore()函式。該函式的原型為: istr
CSV檔案準確讀取兩種思路
通過查詢網上資料,發現有兩種解析思路:a.通過pattern分割各欄位,b.逐字元讀取並判斷,當然還有通過第三方Jar包來解析的方法。 1.通過Pattern準確分割欄位(Reference:csv檔案讀取) package xufei; import j
《xls json csv 檔案讀取》
#coding=utf-8 import xlrd import json import csv #地址前用'\'轉譯符要加 workbook=xlrd.open_workbook('D:/untitled/1022/date.xls') #提取表格名稱 sheets=workbook.sheet_n
android讀取csv檔案資料
csv檔案是一種表格形式的檔案,如果把檔案字尾名改為.txt,會發現同一行資料之間是用英文“,”隔開的。 如何讀取csv檔案以便把資料存入資料庫呢,特別是csv檔案中有些資料是空? csv檔案如下: 把檔案字尾名改為.txt後如下: 電錶id,電錶編號,模組地址,描述,所屬站點名稱,
IO流讀取資料檔案,將資料寫入資料庫,並記錄資料匯入日誌
流程分析: 資料型別: ROUTE_ID,LXBM,ROAD_NAME,SRC_LON,SRC_LAT,DEST_LON,DEST_LAT 10000,G50,滬渝高速,115.8605349,30.08934467,115.5437817,30.08898601 10001,G
改良昨天的指令碼,讀取CSV檔案生成散點圖
需要讀取的CSV檔案,資料參考以下表頭順序 讀取成功後生成散點圖檔案 “散點圖.html" 開啟網頁檔案時,同目錄下需要有 echarts.min.js 散點圖效果: 程式碼如下: # -*- coding: utf-8 -*- """ 讀取cs
從minio中讀取檔案流進行下載檔案
一、獲取Minio連線 public static String minioUrl; public static String minioUsername;
Java使用opencsv 讀取csv檔案
maven依賴 <!-- https://mvnrepository.com/artifact/com.opencsv/opencsv --> <dependency> <groupId>com.opencsv</group