1. 程式人生 > >node csv檔案流讀取

node csv檔案流讀取

csv檔案流讀取,可以應對大檔案,資料截斷髮送,不會出現記憶體不足的情況
function readInAppEventReports(user,date,filePath, callback) {
    var reports = [];
    var google_facebook_reports=[];
    var other_reports=[];
    var item_other = {};
    var item_fg={};
    var dbFlag = true;
    var getCount = 0;
    var tCount = 0;     //api獲取資料條數;
var otherCount=0; var facebook_google_Count=0; var other_reports_send_config=5000; var google_facebook_reports_send_config=5000; current_site={ id : filePath.split('=')[1].split('&')[0] }; async.waterfall([ function (cb) { readStream = fs.createReadStream(filePath); console.log('---test'
, filePath); readStream.pipe(csv()) .on('data', function (data) { // readStream.pause(); //資料封裝,打包 var media_source=data['Media Source']; if (media_source==="googleadwords_int" || media_source==="Facebook Ads"
) { item_fg=packItemForInAppEventReportFacebookGoogle(data); if (item_fg.err) { console.log('item_fg.err',item_fg.err); return callback(null); } else { eventDataFilterFacebookGoogle(item_fg.value,google_facebook_reports); } } else { item_other = packItemForInAppEventReport(data); if (item_other.err) { console.log('item_other.err', item_other.err); return callback(null); } else { //資料重複過濾 eventDataFilter(item_other.value, other_reports); } } tCount++; //傳送其餘資料 async.waterfall([ function (fn) { if (other_reports.length >= other_reports_send_config && dbFlag) { console.log('on data... send before other_reports.length=',other_reports.length); dbFlag = false; readStream.pause(); var temOther = other_reports.slice(0, other_reports_send_config); Dbapi.sendMsg({ json: { actionid: 8220, noat: true, events: temOther } }, function (err, result) { console.log('other_reports Dbapi sendMsg', 8220, err, result); getCount += temOther.length; otherCount+=temOther.length; console.log('on data other_reports tCount', tCount, 'getCount', getCount,'otherCount',otherCount); other_reports.splice(0, temOther.length); console.log('on data... after other_reports.length=',other_reports.length); dbFlag = true; readStream.resume(); fn(err); }, 2); } else { fn(null); } }, function (fn) { //傳送facebook,google資料 // console.log('google_facebook_reports.length=',google_facebook_reports.length); if (google_facebook_reports.length >= google_facebook_reports_send_config && dbFlag) { console.log('on data ...send before google_facebook_reports.length=',google_facebook_reports.length); dbFlag = false; readStream.pause(); //傳送其餘資料 var temp_google_facebook_ = google_facebook_reports.slice(0, google_facebook_reports_send_config); Dbapi.sendEvent({ json: { actionid: 1001, noat: true, events: temp_google_facebook_ } }, function (err, result) { console.log('google_facebook_reports Dbapi sendMsg', 1001, err, result); getCount += temp_google_facebook_.length; facebook_google_Count+=temp_google_facebook_.length; console.log('on data google_facebook_reports tCount', tCount, 'getCount', getCount,'facebook_google_Count',facebook_google_Count); google_facebook_reports.splice(0, temp_google_facebook_.length); console.log('on data... send after google_facebook_reports.length=',google_facebook_reports.length); dbFlag = true; readStream.resume(); fn(err); }, 2); } else { fn(null); } } ],function (err) { if (err) console.log('readInAppEventReports on(data) ',err); if (tCount>=API_MAX) { reports.push(data); } }); }) .on('end', function () { var otherMsgSend=function (buff,cb) { Dbapi.sendMsg({ json: { actionid: 8220, noat: true, events: buff } }, function (err, result) { console.log('other_reports Dbapi sendMsg', 8220, err, result); getCount += buff.length; otherCount+=buff.length; console.log('other_reports *** tCount', tCount, 'getCount', getCount,'otherCount',otherCount); cb(err); }, 2); }; var fgMsgSend=function (buff,cb) { Dbapi.sendEvent({ json: { actionid: 1001, noat: true, events: buff } }, function (err, result) { console.log('google_facebook_reports Dbapi sendMsg', 1001, err, result); getCount += buff.length; facebook_google_Count+=buff.length; console.log('google_facebook_reports *** tCount', tCount, 'getCount', getCount,'facebook_google_Count',facebook_google_Count); cb(err); }, 2); }; async.waterfall([ function (fn) { console.log('on end read other_reports'); console.log('on end send before other_reports.length =',other_reports.length ); if (other_reports.length !== 0) { var flag=true; async.whilst( function () { return flag; }, function (fn_cb) { console.log('on end send..other_reports.length =',other_reports.length ); if (other_reports.length > other_reports_send_config){ flag=true; var buff=other_reports.splice(0,other_reports_send_config); otherMsgSend(buff,function (err) { fn_cb(err); }); } else { flag=false; return fn_cb(null) } }, function (err) { if (err) console.log(err); otherMsgSend(other_reports,function (error) { fn(error); }); }); } else { fn(null) } }, function (fn) { console.log('on end read google_facebook_reports'); console.log('on end send before google_facebook_reports.length=',google_facebook_reports.length); if (google_facebook_reports.length !==0) { var flag=true; async.whilst( function () { return flag; }, function (fn_cb) { console.log('on end send...google_facebook_reports.length=',google_facebook_reports.length); if (google_facebook_reports.length > google_facebook_reports_send_config) { flag=true; var buff=google_facebook_reports.splice(0,google_facebook_reports_send_config); fgMsgSend(buff,function (err) { fn_cb(err); }); } else { flag=false; return fn_cb(null); } }, function (err) { if (err) console.log(err); fgMsgSend(google_facebook_reports,function (error) { fn(error); }); }); } else { fn(null); } } ],function (err) { if (err) console.log('readInAppEventReports on(end) ',err); cb(null); }); }); }, function (cb) { // fs.unlink(filePath, function (err) { // cb(err); // }); cb(null); } ], function (err) { if (err) console.log(err); async.waterfall([ function (fn) { console.log('tCount=',tCount,'API_MAX=',API_MAX); if (tCount >= API_MAX - 20000) { console.log('readInAppEventReports API_MAX - 20000 Monitor ...'); var apiErr = Monitor.errFactory(); apiErr.err = '資料不完整'; apiErr.funct = readInstallsReport.toString(); apiErr.param = filePath; apiErr.level = 10; Monitor.send(apiErr); } fn(null); }, function (fn) { console.log('tCount=',tCount,'API_MAX=',API_MAX); if (tCount >= API_MAX) { console.log('readInAppEventReports data >',API_MAX/1000,'K'); // console.log('reports.length=',reports.length); // console.log(JSON.stringify(reports[0])); // var str='2017/7/10 23:59:59'; var detail_date=[]; detail_date=reports[0]['Event Time'].substr(10,6).split(':'); detail_date[0]=detail_date[0].replace(" ",""); if (parseInt(detail_date[1])<59) { detail_date[1]=parseInt(detail_date[1])+1; } if (parseInt(detail_date[1])===59) { detail_date[0]=parseInt(detail_date[0])+1; detail_date[1]='00'; } console.log('detail_date:',detail_date); var param={ detail_date:detail_date, website:current_site }; setTimeout(function () { inAppEventReport(user, date, param, function (err) { console.log('inAppEventReport end err', err); Monitor.send('read event report'); fn(err); }); },3000); } else if (tCount===6 || other_reports.length===0 || google_facebook_reports.length===0) { console.log('當前時間',new Date().toLocaleDateString(),'待獲取應用日期 date=',date,' website= ',current_site.id,'應用超過請求次數'); fn(null); } else { console.log('當前時間',new Date().toLocaleDateString(),'待獲取應用日期 date=',date,' website= ',current_site.id,'該天沒有資料'); fn(null); } } ],function (err) { callback(err); }); }); }
集合遍歷,eachLimit裡面的方法會並行
            async.eachLimit(users[user].websites, users[user].websites.length, function (website, cb) {
                console.log('inAppEventReports user=',user,'date=',date,'website=',website);
                loadInAppEventReports(user, website, date, function (err, path) {
                    if (path) filesPath = filesPath.concat(path);
                    console.log('loadInAppEventReports end', website.id, 'err', err, new Date().toString());
                    cb(null);
                })
            }, function (err) {
                cba(err, filesPath);
            });
可以將limit設定為1 使其序列
    async.eachLimit(dateList.slice(0,6), 1, function (dateItem, callback) {
        console.log('waterfall-start', dateItem);
        async.waterfall([
            function (cb) {
                mainFunction(dateItem, dateItem, function (err) {
                    cb(null);
                });
            }
        ], function (err) {
            callback(null);
        });
    }, function (err) {
        console.error('history end err', err);
    });

相關推薦

node csv檔案讀取

csv檔案流讀取,可以應對大檔案,資料截斷髮送,不會出現記憶體不足的情況 function readInAppEventReports(user,date,filePath, callback) { var reports = []; var g

node 操作檔案 fs 同步與非同步 檔案的寫入與讀取

fs fs ( File System ) 檔案系統 在node中通過fs模組來和系統中的檔案進行互動 通過fs模組可以對磁碟中的檔案做各種增刪改查的操作 寫入檔案 1.同

CSV檔案讀取,TensorFlow和pandas

csv檔案的讀取,有兩種方法:呼叫pandas庫函式或者直接用TensorFlow讀取, 1、呼叫pandas data.csv是自己隨便搞的一個數據檔案,資料樣例和讀取程式碼如下: import tensorflow as tf import pandas as pd def

CSV檔案讀取jpg圖片的URL地址並多執行緒批量下載

很多時候,我們的網站上傳圖片時並沒有根據內容進行資料夾分類,甚至會直接儲存到阿里雲的OSS或是七牛雲等雲端儲存上。這樣,當我們需要打包圖片時,就需要從資料庫找尋分類圖片,通過CURL進行下載。我最近剛剛完成了一個這樣的任務,覺得會比較常用,就把程式放到了github上分享給大家,希望大家能夠喜歡。 do

FFMPEG音視訊解碼流程&MP4音視訊檔案讀取(轉)

1.播放多媒體檔案步驟 通常情況下,我們下載的視訊檔案如MP4,MKV、FLV等都屬於封裝格式,就是把音視訊資料按照相應的規範,打包成一個文字檔案。我們可以使用MediaInfo這個工具檢視媒體檔案的相關資訊。 所以當我們播放一個媒體檔案時,通常需要經過以下幾個步驟

【120】TensorFlow 從CSV檔案讀取資料並訓練線性迴歸模型(面向新手)

正文開始。 學習 TensorFlow 讓我的思維發生了變化。 計算機本質上是一種數學的工具,而我在學習程式設計的時候,思維也不可避免地收到了影響。傳統的程式設計思想,常常認為程式就應該像數學定理或者數學函式一樣,給出一個確定的結果。這是一種基於邏輯推導

C語言fread()函式:讀檔案函式(從檔案讀取資料)

相關函式:fopen, fwrite, fseek, fscanf標頭檔案:#include <stdio.h>定義函式:size_t fread(void * ptr, size_t size, size_t nmemb, FILE * stream);函式說

Java從CSV檔案讀取資料和寫入

.CSV檔案是以逗號分割的資料倉儲,讀取資料時從每一行中讀取一條資料元祖,也就是一條資料,再用字元分割的方式獲取表中的每一個數據項。 package com.conn.csv;

CSV檔案讀取

下午接到專案,讀取一個csv檔案並寫入到資料庫中,把資料存入資料庫中比較簡單,可以參考市面上的寫法,或者用一個物件,存放讀取的資料,批量插入資料庫。 但是下午的資料容量較大,據說有2000W條日處理量,而且要求cpu儘量滿格。 好吧,又是一個坑爹又艱難的專案。言歸正傳,來搞

CSV檔案讀取資料分割問題

CSV檔案預設用英文逗號作為列分隔符,換行符作為行分隔符。 有時欄位裡含有,和換行符就麻煩了,資料輸出會出現混亂。這時可以使用雙引號"來將每個欄位內容括起來,CSV預設認為由""括起來的內容是一個欄位, 這時不管欄位內容裡有除"之外字元的任何字元都可以按原來形式引用。 sp

csv檔案讀取

標頭檔案 XCFileStream.h#pragma once #include <string> #include <list> #include <vector> using namespace std; #include <w

c# 去掉檔案讀取的txt檔案中的空格

百度沒有查到,試驗出一個簡單的方法如下dr[0] = data[0].Trim();//去除字串中的空格private void button1_Click(object sender, EventArgs e) { //建立一個開啟檔

ifstream 檔案讀取格式資料出現的問題

在檔案中均為數字的時候,需要將這些數字按整型讀取,可以直接利用檔案流進行讀取,但是一旦檔案中存在非數字字元的時候,比如出現字串就會出現異常。因此我們需要處理這種情況。下面這段程式可以處理這個問題。其實問題的關鍵就是在in.ignore()函式。該函式的原型為: istr

CSV檔案準確讀取兩種思路

通過查詢網上資料,發現有兩種解析思路:a.通過pattern分割各欄位,b.逐字元讀取並判斷,當然還有通過第三方Jar包來解析的方法。 1.通過Pattern準確分割欄位(Reference:csv檔案讀取) package xufei; import j

《xls json csv 檔案讀取

#coding=utf-8 import xlrd import json import csv #地址前用'\'轉譯符要加 workbook=xlrd.open_workbook('D:/untitled/1022/date.xls') #提取表格名稱 sheets=workbook.sheet_n

android讀取csv檔案資料

csv檔案是一種表格形式的檔案,如果把檔案字尾名改為.txt,會發現同一行資料之間是用英文“,”隔開的。 如何讀取csv檔案以便把資料存入資料庫呢,特別是csv檔案中有些資料是空? csv檔案如下: 把檔案字尾名改為.txt後如下: 電錶id,電錶編號,模組地址,描述,所屬站點名稱,

IO讀取資料檔案,將資料寫入資料庫,並記錄資料匯入日誌

流程分析: 資料型別: ROUTE_ID,LXBM,ROAD_NAME,SRC_LON,SRC_LAT,DEST_LON,DEST_LAT 10000,G50,滬渝高速,115.8605349,30.08934467,115.5437817,30.08898601 10001,G

改良昨天的指令碼,讀取CSV檔案生成散點圖

需要讀取的CSV檔案,資料參考以下表頭順序 讀取成功後生成散點圖檔案 “散點圖.html" 開啟網頁檔案時,同目錄下需要有 echarts.min.js 散點圖效果:   程式碼如下: # -*- coding: utf-8 -*- """ 讀取cs

從minio中讀取檔案進行下載檔案

一、獲取Minio連線     public static String minioUrl;         public static String minioUsername;    

Java使用opencsv 讀取csv檔案

maven依賴 <!-- https://mvnrepository.com/artifact/com.opencsv/opencsv --> <dependency> <groupId>com.opencsv</group