Java 程式設計讀寫 Hadoop Sequence 型別檔案
阿新 • • 發佈:2019-02-18
In this blog I'll show you how to write a simple hadoop client application in Java. This app will handle reading, writing and copying Hadoop Sequence Files on local or remote Hadoop file systems (HDFS).
1. HadoopClient.java
If you notice, I am using Hadoop by Cloudera in my pom file. To use GZip, I need to add a native library to my development environment which is Ubuntu 12.10.
Please make sure the version of Hadoop client dependecy you use in your pom file matches the version of Hadoop you downloaded to your system, otherwise you will get a run time error:
1. HadoopClient.java
2. HadoopClientTest.javapackage com.noushin.hadoop.client; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; /** * This class handles interactions with Hadoop. * * @author nbashir * */ @Component public class HadoopClient { private static Configuration conf = new Configuration(); private final static Logger logger = Logger.getLogger(HadoopClient.class); /** * Convert the lines of text in a file to binary and write to a Hadoop * sequence file. * * @param dataFile File containing lines of text * @param sequenceFileName Name of the sequence file to create * @param hadoopFS Hadoop file system * * @throws IOException */ public static void writeToSequenceFile(File dataFile, String sequenceFileName, String hadoopFS) throws IOException { IntWritable key = null; BytesWritable value = null; conf.set("fs.defaultFS", hadoopFS); Path path = new Path(sequenceFileName); if ((conf != null) && (dataFile != null) && (dataFile.exists())) { SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(path), SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new GzipCodec()), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(BytesWritable.class)); List<String> lines = FileUtils.readLines(dataFile); for (int i = 0; i < lines.size(); i++) { value = new BytesWritable(lines.get(i).getBytes()); key = new IntWritable(i); writer.append(key, value); } IOUtils.closeStream(writer); } } /** * Read a Hadoop sequence file on HDFS. * * @param sequenceFileName Name of the sequence file to read * @param hadoopFS Hadoop file system * * @throws IOException */ public static void readSequenceFile(String sequenceFileName, String hadoopFS) throws IOException { conf.set("fs.defaultFS", hadoopFS); Path path = new Path(sequenceFileName); SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path)); IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf); while (reader.next(key, value)) { logger.info("key : " + key + " - value : " + new String(value.getBytes())); } IOUtils.closeStream(reader); } /** * Copy a local sequence file to a remote file on HDFS. * * @param from Name of the sequence file to copy * @param to Name of the sequence file to copy to * @param remoteHadoopFS HDFS host URI * * @throws IOException */ public static void copySequenceFile(String from, String to, String remoteHadoopFS) throws IOException { conf.set("fs.defaultFS", remoteHadoopFS); FileSystem fs = FileSystem.get(conf); Path localPath = new Path(from); Path hdfsPath = new Path(to); boolean deleteSource = true; fs.copyFromLocalFile(deleteSource, localPath, hdfsPath); logger.info("Copied SequenceFile from: " + from + " to: " + to); } /** * Print all the values in Hadoop HDFS configuration object. * * @param conf */ public static void listHadoopConfiguration(Configuration conf) { int i = 0; logger.info("------------------------------------------------------------------------------------------"); Iterator iterator = conf.iterator(); while (iterator.hasNext()) { i++; iterator.next(); logger.info(i + " - " + iterator.next()); } logger.info("------------------------------------------------------------------------------------------"); } }
3. pom.xmlpackage com.noushin.hadoop.client; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration public class HadoopClientTest { @Autowired HadoopClient hadoopClient; String sequenceFileName = "/tmp/nb.sgz"; String hadoopLocalFS = "file:///"; String hadoopRemoteFS = "hdfs://stage-hadoop01:8020"; @Test public void testConfig() { Configuration conf = new Configuration(); HadoopClient.listHadoopConfiguration(conf); } @Test public void testWriteSequenceFile() { String dataFileName = "/tmp/test.txt"; try { int numOfLines = 20; String baseStr = "....Test..."; List<String> lines = new ArrayList<String>(); for (int i = 0; i < numOfLines; i++) lines.add(i + baseStr + UUID.randomUUID()); File dataFile = new File(dataFileName); FileUtils.writeLines(dataFile, lines, true); Thread.sleep(2000); HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void testReadSequenceFile() { try { HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS); } catch (IOException e) { e.printStackTrace(); } } @Test public void testCopySequenceFileToRemoteHDFS() { String tempFileName = "/tmp/local-test.txt"; String sequenceFileName = "/tmp/seqfile-record-compressed.sgz"; String hadoopLocalFS = "file:///"; String hadoopRemoteFS = "hdfs://stage-hadoop01:8020"; try { int numOfLines = 5; String baseStr = "....Test..."; List<String> lines = new ArrayList<String>(); for (int i = 0; i < numOfLines; i++) lines.add(i + baseStr + UUID.randomUUID()); File dataFile = new File(tempFileName); FileUtils.writeLines(dataFile, lines, true); Thread.sleep(2000); HadoopClient.writeToSequenceFile(dataFile, sequenceFileName, hadoopLocalFS); HadoopClient.readSequenceFile(sequenceFileName, hadoopLocalFS); HadoopClient.copySequenceFile(sequenceFileName, sequenceFileName, hadoopRemoteFS); HadoopClient.readSequenceFile(sequenceFileName, hadoopRemoteFS); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
If you are using Eclipse for development and testing like I do, you need to add the following step, so you can compress your sequence file using GZip.<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.noushin.hadoop</groupId> <artifactId>client</artifactId> <version>0.0.1-SNAPSHOT</version> <name>hdpc</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop-client.version>2.0.0-cdh4.2.0</hadoop-client.version> <junit.version>4.10</junit.version> <log4j.version>1.2.17</log4j.version> <spring.version>3.2.0.RELEASE</spring.version> </properties> <dependencies> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <!-- Hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop-client.version}</version> <scope>provided</scope> </dependency> <!-- Logging --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <!-- Spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>org.springframework.test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> </dependencies> </project>
If you notice, I am using Hadoop by Cloudera in my pom file. To use GZip, I need to add a native library to my development environment which is Ubuntu 12.10.
sudo apt-get update; sudo apt-get install hadoop
This will install Hadoop native libraries in /usr/lib/hadoop/lib/native. Now, In Eclipse, edit ->Properties->Java Build Path->Libraries->Maven Dependencies->Native library location, and set "Location Path" to /usr/lib/hadoop/lib/native.Please make sure the version of Hadoop client dependecy you use in your pom file matches the version of Hadoop you downloaded to your system, otherwise you will get a run time error:
ERROR nativeio.NativeIO: Unable to initialize NativeIO libraries
To verify a sequence file was created on HDFS, log into one of your hadoop nodes and run this command:hadoop fs -ls /tmp/nb.sgz
And, if you run into a problem and need to see what Hadoop is doing, turn on debugging for Hadoop classes by adding the following entry to your log4j.properties: #Turn on hadoop logging
log4j.logger.org.apache.hadoop=DEBUG
To run Hive: login using a hadoop user such as oozie_job, so that environment is set up.
$ sudo su - oozie_job
To use hive:
$ hive
now you can query data using sql like commands:
DESCRIBE my_transactions;
SELECT * FROM my_transactions WHERE year=2013 AND month=3 AND day=14;
To see where a partition is pointing to:
DESCRIBE EXTENDED my_transactions PARTITION(year=2013, month=3, day=28);
To create a partition, so Hive can find data for its queries:
ALTER TABLE my_transactions ADD PARTITION(year=2013, month=3, day=26) LOCATION '/tmp/2013/03/26';
To drop a partition and point it to a new location:
ALTER TABLE my_transactions DROP PARTITION(year=2013, month=3, day=26);