1. 程式人生 > >使用hdfs進行資料採集學習記錄

使用hdfs進行資料採集學習記錄

使用hdfs進行資料定時採集

邏輯過程

  • 這是個定時任務,使用timer進行定時任務的進行。新建collectdata類並繼承Timer類,在collectdata類中進行run方法的重寫。
  • 將待採集資料移動到toupload目錄中,然後連線hdfs客戶端,在hdfs客戶端中利用日期新建目錄,將toupload中的帶採集資料上傳到相應的目錄中,然後將toupload中的資料移動到backup目錄。
  • 在上述過程中,如果對應目錄不存在,則會相應的建立目錄。

在實現程式碼過程中,有部分java知識需要學習鞏固:

  1. 輸出格式化的日期:
//使用SimpleDateFormat類進行格式設定
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
//再使用format方法對日期進行格式應用。
sdf.format(new Date());
  1. 篩選檔案方法
//首先需要一個File,其擁有listFiles()方法能夠以陣列形式返回當前檔案目錄下的檔案物件
File srcDir = new File("e:/logs");
File[] listFiles = srcDir.listFiles(new FilenameFilter(){
  @Override
  public boolean accept(File dir,String name){
      if(name.endsWith("access.log")){
          return true;
      }
      return false;
  }
});
//在上述listFiles()方法中新增篩選器,對篩選器的accept()方法進行重寫,即可實現篩選功能。返回true的檔案即被篩選出來。
  1. FileUtils類可以很方便的實現檔案的移動,並且設定是否自動建立資料夾。此外,它還有其它對檔案進行操作。
FileUtils.moveFileToDirectory(srcDir,dstDir,createDstDir);

重寫datacollect部分程式碼如下:

其中由於log4j日誌部分無法使用,使用系統輸出直接輸出到控制檯。

package datacollect;

import java.io.File;
import java.io.FilenameFilter;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimerTask;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

public class CollectTask extends TimerTask{
	
	
	@Override
	public void run(){
	 try {
		Logger logger = Logger.getLogger("logRollingFile");
		FileUtils.move
			
		//設定採集日期
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
		String day = sdf.format(new Date());
		
		File srcDir = new File("e:/logs/accesslog/");
		File[] listFiles = srcDir.listFiles(new FilenameFilter() {
		
			
			@Override
			public boolean accept(File dir, String name) {
				if(name.endsWith("access.log")){
					return true;
				}
				return false;
			}
		});
		
		System.out.println("要採集的檔案:"+listFiles.toString());
		//記錄日誌
//		logger.info("探測到如下檔案需要採集:"+Arrays.toString(listFiles));
		
		
		//將要採集的檔案移動到待上傳目錄中
		File toUploadDir = new File("e:/logs/toUpload/");
		
//		if(!toUploadDir.exists()){
//			toUploadDir.mkdirs();
//			System.out.println("建立待上傳目錄成功");
//		}
			
		for(File file:listFiles){
			FileUtils.moveFileToDirectory(file, toUploadDir, true);;
			System.out.println("移動到待上傳目錄:"+toUploadDir.getAbsolutePath());
		}
		
//		logger.info("上述檔案移動到了待上傳目錄"+toUploadDir.getAbsolutePath());

		//構造HDFS客戶端物件
			FileSystem fs = FileSystem.get(new URI("hdfs://192.168.111.130:9000"), new Configuration(), "root");
			File[] toUploadFiles = toUploadDir.listFiles();
			    //檢查HDFS客戶端中的目錄是否存在,不存在則建立
			Path hdfsDstPath = new Path("/logs"+day);
			if(!fs.exists(hdfsDstPath)){
				fs.mkdirs(hdfsDstPath);
			}
			System.out.println("HDFS客戶端構造成功!");
			
			    //檢查本地備份目錄是否存在,不存在則建立
			File backupDir = new File("e:/logs/backupDir/"+day+"/") ;
//			if(!backupDir.exists()){
//				backupDir.mkdirs();
//				System.out.println("本地備份目錄建立成功");
//			}
		
			for(File file:toUploadFiles){
				Path dstPath = new Path(hdfsDstPath+"/access_log_"+UUID.randomUUID()+".log");
				fs.copyFromLocalFile(new Path(file.getAbsolutePath().toString()),dstPath);
//				logger.info("檔案傳輸到HDFS完成:"+file.getAbsolutePath()+"--->"+dstPath);
				System.out.println("檔案傳輸到HDFS完成:"+file.getAbsolutePath()+"--->"+dstPath);
				FileUtils.moveFileToDirectory(file, backupDir, true);
//				logger.info("檔案備份成功:"+file.getAbsolutePath()+"--->"+backupDir);
				System.out.println("檔案備份成功:"+file.getAbsolutePath()+"--->"+backupDir);
				}
			
			
			
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		
		
	}
}