監控hdfs系統的檔案狀態
阿新 • • 發佈:2018-11-08
package com.zx.dao; import com.zx.utils.PropertiesUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSInotifyEventInputStream; import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.inotify.Event; import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.inotify.MissingEventsException; import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Properties; //監控hdfs系統的檔案狀態 public class MonitorHdfs extends Thread { private ArrayList<String> fileList = new ArrayList<String>(); public void getFileStatus() throws IOException, InterruptedException, MissingEventsException { Properties properties = PropertiesUtils.getProperties("spark-conf.properties"); String hdfsPath = (String) properties.get("hdfsPath"); HdfsAdmin admin = new HdfsAdmin( URI.create(hdfsPath), new Configuration() ); DFSInotifyEventInputStream eventStream = admin.getInotifyEventStream(); while( true ) { EventBatch events = eventStream.take(); for( Event event : events.getEvents() ) { System.out.println("======================================================="); System.out.println( "event type = " + event.getEventType() ); switch( event.getEventType() ) { case CREATE: Event.CreateEvent createEvent = (Event.CreateEvent) event; System.out.println( " path = " + createEvent.getPath() ); String filePath = createEvent.getPath(); if(filePath.contains("/upload/")){ if(filePath.contains("._COPYING_")){ filePath = filePath.substring(0,filePath.length()-10); } this.fileList.add(filePath); } for(String str:fileList){ System.out.println(str); } break; case CLOSE: Event.CloseEvent closeEvent = (Event.CloseEvent) event; System.out.println( " path = " + closeEvent.getPath() ); break; case APPEND: Event.AppendEvent appendEvent = (Event.AppendEvent) event; System.out.println( " path = " + appendEvent.getPath() ); break; case RENAME: Event.RenameEvent renameEvent = (Event.RenameEvent) event; System.out.println( " srcPath = " + renameEvent.getSrcPath() ); System.out.println( " dstPath = " + renameEvent.getDstPath() ); break; case METADATA: Event.MetadataUpdateEvent metadataUpdateEvent = (Event.MetadataUpdateEvent) event; System.out.println( " path = " + metadataUpdateEvent.getPath() ); break; case UNLINK: Event.UnlinkEvent unlinkEvent = (Event.UnlinkEvent) event; System.out.println( " path = " + unlinkEvent.getPath() ); break; default: break; } System.out.println("======================================================="); } } } public ArrayList<String> getFileList() { return fileList; } public void setFileList(ArrayList<String> fileList) { this.fileList = fileList; } public void clearFileList(){ this.fileList.clear(); } }