使用hdfs進行資料採集學習記錄
阿新 • • 發佈:2018-12-10
使用hdfs進行資料定時採集
邏輯過程:
- 這是個定時任務,使用timer進行定時任務的進行。新建collectdata類並繼承Timer類,在collectdata類中進行run方法的重寫。
- 將待採集資料移動到toupload目錄中,然後連線hdfs客戶端,在hdfs客戶端中利用日期新建目錄,將toupload中的帶採集資料上傳到相應的目錄中,然後將toupload中的資料移動到backup目錄。
- 在上述過程中,如果對應目錄不存在,則會相應的建立目錄。
在實現程式碼過程中,有部分java知識需要學習鞏固:
- 輸出格式化的日期:
//使用SimpleDateFormat類進行格式設定 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); //再使用format方法對日期進行格式應用。 sdf.format(new Date());
- 篩選檔案方法
//首先需要一個File,其擁有listFiles()方法能夠以陣列形式返回當前檔案目錄下的檔案物件 File srcDir = new File("e:/logs"); File[] listFiles = srcDir.listFiles(new FilenameFilter(){ @Override public boolean accept(File dir,String name){ if(name.endsWith("access.log")){ return true; } return false; } }); //在上述listFiles()方法中新增篩選器,對篩選器的accept()方法進行重寫,即可實現篩選功能。返回true的檔案即被篩選出來。
- FileUtils類可以很方便的實現檔案的移動,並且設定是否自動建立資料夾。此外,它還有其它對檔案進行操作。
FileUtils.moveFileToDirectory(srcDir,dstDir,createDstDir);
重寫datacollect部分程式碼如下:
其中由於log4j日誌部分無法使用,使用系統輸出直接輸出到控制檯。
package datacollect; import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class CollectTask extends TimerTask{ @Override public void run(){ try { Logger logger = Logger.getLogger("logRollingFile"); FileUtils.move //設定採集日期 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String day = sdf.format(new Date()); File srcDir = new File("e:/logs/accesslog/"); File[] listFiles = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { if(name.endsWith("access.log")){ return true; } return false; } }); System.out.println("要採集的檔案:"+listFiles.toString()); //記錄日誌 // logger.info("探測到如下檔案需要採集:"+Arrays.toString(listFiles)); //將要採集的檔案移動到待上傳目錄中 File toUploadDir = new File("e:/logs/toUpload/"); // if(!toUploadDir.exists()){ // toUploadDir.mkdirs(); // System.out.println("建立待上傳目錄成功"); // } for(File file:listFiles){ FileUtils.moveFileToDirectory(file, toUploadDir, true);; System.out.println("移動到待上傳目錄:"+toUploadDir.getAbsolutePath()); } // logger.info("上述檔案移動到了待上傳目錄"+toUploadDir.getAbsolutePath()); //構造HDFS客戶端物件 FileSystem fs = FileSystem.get(new URI("hdfs://192.168.111.130:9000"), new Configuration(), "root"); File[] toUploadFiles = toUploadDir.listFiles(); //檢查HDFS客戶端中的目錄是否存在,不存在則建立 Path hdfsDstPath = new Path("/logs"+day); if(!fs.exists(hdfsDstPath)){ fs.mkdirs(hdfsDstPath); } System.out.println("HDFS客戶端構造成功!"); //檢查本地備份目錄是否存在,不存在則建立 File backupDir = new File("e:/logs/backupDir/"+day+"/") ; // if(!backupDir.exists()){ // backupDir.mkdirs(); // System.out.println("本地備份目錄建立成功"); // } for(File file:toUploadFiles){ Path dstPath = new Path(hdfsDstPath+"/access_log_"+UUID.randomUUID()+".log"); fs.copyFromLocalFile(new Path(file.getAbsolutePath().toString()),dstPath); // logger.info("檔案傳輸到HDFS完成:"+file.getAbsolutePath()+"--->"+dstPath); System.out.println("檔案傳輸到HDFS完成:"+file.getAbsolutePath()+"--->"+dstPath); FileUtils.moveFileToDirectory(file, backupDir, true); // logger.info("檔案備份成功:"+file.getAbsolutePath()+"--->"+backupDir); System.out.println("檔案備份成功:"+file.getAbsolutePath()+"--->"+backupDir); } } catch (Exception e) { e.printStackTrace(); } } }