貼一段java讀取hdfs 解壓gz zip tar.gz保存到hdfs的代碼
阿新 • • 發佈:2018-08-10
arc edi res 解壓 文件路徑 ado trace pub config package main.java;
import java.io.*;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.*;
import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;;
/**
* 解壓tar.gz zip gz文件包 這裏的數據源和輸出目錄都為HDFS
*
*/
public class GZipHdfs {
private BufferedOutputStream bufferedOutputStream;
String zipfileName = null;
public GZipHdfs(String fileName) {
this.zipfileName = fileName;
}
/*
* 執行入口,rarFileName為需要解壓的文件路徑(具體到文件),destDir為解壓目標路徑 路徑為HDFS
*/
public List<String> unTargzFile(String rarFileName, String destDir) throws IOException {
GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;
List<String> r = GZipHdfs.defUnTargzFile(outputDirectory, fs);
fs.close();
return r;
}
public List<String> defUnTargzFile(String outputDirectory, FileSystem fs) {
FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List<String> tarList = new LinkedList<String>();
try {
FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));
GZIPInputStream is = new GZIPInputStream(new BufferedInputStream(
hdfsInputStream));
in = new ArchiveStreamFactory().createArchiveInputStream("tar", is);
bufferedInputStream = new BufferedInputStream(in);
TarArchiveEntry entry = (TarArchiveEntry) in.getNextEntry();
while (entry != null) {
String name = entry.getName();
String[] names = name.split("/");
String fileName = outputDirectory;
for (int i = 0; i < names.length; i++) {
String str = names[i];
fileName = fileName + "/" + str;
}
FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));
bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
entry = (TarArchiveEntry) in.getNextEntry();
tarList.add(name);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ArchiveException e) {
e.printStackTrace();
} finally {
try {
if (bufferedInputStream != null) {
bufferedInputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return tarList;
}
/*
* 執行入口,rarFileName為需要解壓的文件路徑(具體到文件),destDir為解壓目標路徑 路徑為HDFS
*/
public List<String> unZipFile(String rarFileName, String destDir) throws IOException {
GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;
List<String> r = GZipHdfs.defUnZipFile(outputDirectory, fs);
fs.close();
return r;
}
public List<String> defUnZipFile(String outputDirectory, FileSystem fs) {
FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List<String> zipList = new LinkedList<String>();
try {
FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));
ZipInputStream is = new ZipInputStream(new BufferedInputStream(
hdfsInputStream));
bufferedInputStream = new BufferedInputStream(is);
ZipEntry entry =is.getNextEntry();
while (entry != null) {
String name = entry.getName();
String[] names = name.split("/");
String fileName = outputDirectory;
for (int i = 0; i < names.length; i++) {
String str = names[i];
fileName = fileName + "/" + str;
}
FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));
bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
entry = (ZipEntry) is.getNextEntry();
zipList.add(name);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (bufferedInputStream != null) {
bufferedInputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return zipList;
}
/*
* 執行入口,rarFileName為需要解壓的文件路徑(具體到文件),destDir為解壓目標路徑 路徑為HDFS
*/
public List<String> unGZipFile(String rarFileName, String destDir) throws IOException {
GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;
List<String> r = GZipHdfs.defUnGZipFile(outputDirectory, fs);
fs.close();
return r;
}
public List<String> defUnGZipFile(String outputDirectory, FileSystem fs) {
FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List<String> tarList = new LinkedList<String>();
try {
FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));
GzipCompressorInputStream is = new GzipCompressorInputStream(new BufferedInputStream(
hdfsInputStream));
bufferedInputStream = new BufferedInputStream(is);
String[] nameList = zipfileName.split("/");
String name=nameList[nameList.length-1].replace(".gz","");
String fileName = outputDirectory+"/"+name;
FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));
bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
tarList.add(name);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (bufferedInputStream != null) {
bufferedInputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return tarList;
}
}
import java.io.*;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.*;
import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.compress.archivers.ArchiveInputStream;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;;
/**
* 解壓tar.gz zip gz文件包 這裏的數據源和輸出目錄都為HDFS
*
*/
public class GZipHdfs {
private BufferedOutputStream bufferedOutputStream;
String zipfileName = null;
public GZipHdfs(String fileName) {
this.zipfileName = fileName;
}
/*
* 執行入口,rarFileName為需要解壓的文件路徑(具體到文件),destDir為解壓目標路徑 路徑為HDFS
*/
public List<String> unTargzFile(String rarFileName, String destDir) throws IOException {
GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;
List<String> r = GZipHdfs.defUnTargzFile(outputDirectory, fs);
fs.close();
return r;
}
public List<String> defUnTargzFile(String outputDirectory, FileSystem fs) {
FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List<String> tarList = new LinkedList<String>();
try {
FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));
GZIPInputStream is = new GZIPInputStream(new BufferedInputStream(
hdfsInputStream));
in = new ArchiveStreamFactory().createArchiveInputStream("tar", is);
bufferedInputStream = new BufferedInputStream(in);
TarArchiveEntry entry = (TarArchiveEntry) in.getNextEntry();
while (entry != null) {
String name = entry.getName();
String[] names = name.split("/");
String fileName = outputDirectory;
for (int i = 0; i < names.length; i++) {
String str = names[i];
fileName = fileName + "/" + str;
}
FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));
bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
entry = (TarArchiveEntry) in.getNextEntry();
tarList.add(name);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ArchiveException e) {
e.printStackTrace();
} finally {
try {
if (bufferedInputStream != null) {
bufferedInputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return tarList;
}
/*
* 執行入口,rarFileName為需要解壓的文件路徑(具體到文件),destDir為解壓目標路徑 路徑為HDFS
*/
public List<String> unZipFile(String rarFileName, String destDir) throws IOException {
GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;
List<String> r = GZipHdfs.defUnZipFile(outputDirectory, fs);
fs.close();
return r;
}
public List<String> defUnZipFile(String outputDirectory, FileSystem fs) {
FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List<String> zipList = new LinkedList<String>();
try {
FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));
ZipInputStream is = new ZipInputStream(new BufferedInputStream(
hdfsInputStream));
bufferedInputStream = new BufferedInputStream(is);
ZipEntry entry =is.getNextEntry();
while (entry != null) {
String name = entry.getName();
String[] names = name.split("/");
String fileName = outputDirectory;
for (int i = 0; i < names.length; i++) {
String str = names[i];
fileName = fileName + "/" + str;
}
FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));
bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
entry = (ZipEntry) is.getNextEntry();
zipList.add(name);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (bufferedInputStream != null) {
bufferedInputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return zipList;
}
/*
* 執行入口,rarFileName為需要解壓的文件路徑(具體到文件),destDir為解壓目標路徑 路徑為HDFS
*/
public List<String> unGZipFile(String rarFileName, String destDir) throws IOException {
GZipHdfs GZipHdfs = new GZipHdfs(rarFileName);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(destDir), conf);
boolean result = fs.isDirectory(new Path(destDir));
if (!result) {
fs.mkdirs(new Path(destDir));
}
String outputDirectory = destDir;
List<String> r = GZipHdfs.defUnGZipFile(outputDirectory, fs);
fs.close();
return r;
}
public List<String> defUnGZipFile(String outputDirectory, FileSystem fs) {
FileInputStream fis = null;
ArchiveInputStream in = null;
BufferedInputStream bufferedInputStream = null;
List<String> tarList = new LinkedList<String>();
try {
FSDataInputStream hdfsInputStream = fs.open(new Path(zipfileName));
GzipCompressorInputStream is = new GzipCompressorInputStream(new BufferedInputStream(
hdfsInputStream));
bufferedInputStream = new BufferedInputStream(is);
String[] nameList = zipfileName.split("/");
String name=nameList[nameList.length-1].replace(".gz","");
String fileName = outputDirectory+"/"+name;
FSDataOutputStream hdfsOutStream = fs.create(new Path(fileName));
bufferedOutputStream = new BufferedOutputStream(
hdfsOutStream);
int b;
while ((b = bufferedInputStream.read()) != -1) {
bufferedOutputStream.write(b);
}
bufferedOutputStream.flush();
bufferedOutputStream.close();
tarList.add(name);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (bufferedInputStream != null) {
bufferedInputStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
return tarList;
}
}
貼一段java讀取hdfs 解壓gz zip tar.gz保存到hdfs的代碼