國科大大資料作業 Hash based distinct HDFS&Hbase 程式碼
阿新 • • 發佈:2019-01-03
PLEASE save your code and data to a portable drive!!!
WARNING: this VM will be cleaned without notice after you log out.
Your code and data on the VM will get lost!!!1. start hdfs and hbase$ start-dfs.sh
$ start-hbase.sh2. stop hdfs and hbase$ stop-hbase.sh
$ stop-dfs.sh3. hdfs directory is ~/work/hdfs4. To compile your java code MyCode.java (implementing class MyCode)
$ javac MyCodethen to run it
$ java MyCode <args>5. compile and run HDFSTest.java$ javac HDFSTest.java
$ java HDFSTest6. compile and run HBaseTest.java$ javac HBaseTest.java
$ java HBaseTest
check if we have successfully create mytable and put the new row
start hbase shell and run command in hbase shell$ hbase shellhbase(main):001:0> scan 'mytable'
ROW COLUMN+CELL
abc column=mycf:a, timestamp=1428459927307, value=789
1 row(s) in 1.8950 secondshbase(main):002:0> disable 'mytable'
0 row(s) in 1.9050 secondshbase(main):003:0> drop 'mytable'
0 row(s) in 1.2320 secondshbase(main):004:0> exit----------------------------------------------------程式碼如下import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;import org.apache.log4j.*;/**
* A class Reading data from a text document on the HDFS,and store it into HBase.
* For example:
*
* java Hw1Grp4 R=/hw1/part.tbl select:R7,gt,1800 distinct:R3,R4,R5
*
* @author Zhang Fengfang
* @version 1.0.0
* @see
* @see
*/
public class Hw1Grp4 {
/**
* Read data from HDFS
* @param filename the Filename of table on the HDFS
* @param cmd The Compare command
* @param selectNum The index of column selected to be compared
* @param middle The number for comparison
* @param colnum The total number of selected columns
* @param distinctnum The array of index of selected columns
*/
//
public static void HDFSRead(String filename,String cmd,int selectNum,double middle,int colnum,int[] distinctnum)
throws IOException, URISyntaxException{
//configure hdfs
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(filename), conf); //read FSData
Path path = new Path(filename);
FSDataInputStream in_stream = fs.open(path); BufferedReader in = new BufferedReader(new InputStreamReader(in_stream));
String s; //use for store FSData
int linecount=0;//use for count lines
double tempvalue; // do select and distinct operation
while ((s=in.readLine())!=null) { //.readLine is use for read a line text every time
linecount++;
//The split() method is used to split a string into an array of strings
String[] items=s.split("\\|");//split by |.
boolean flag=false;
// compare the value to select(gt,ge,eq,lt,le,ne)
//cmd.equals:use for Test whether two objects are equal.
if(cmd.equals("gt")){
tempvalue= Double.parseDouble(items[selectNum]); if(tempvalue > middle){
flag=true;
}
}
else if(cmd.equals("ge")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue >= middle)flag=true;
}
else if(cmd.equals("eq")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue == middle)flag=true;
}
else if(cmd.equals("lt")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue < middle)flag=true;
}
else if(cmd.equals("le")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue <= middle)flag=true;
}
else if(cmd.equals("ne")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue != middle)flag=true;
}else{
System.out.println("cmd error");
System.exit(0);
} // this record is passed by this selection
if(flag == true){
StringBuilder key = new StringBuilder("");
for(int i=0;i<colnum;i++){
key.append(items[distinctnum[i]]);//.append:add data
}
if(datamap.containsKey(key.toString())){
// the temp record is the same with some record in the dataMap by comparing the key
continue;
}
System.out.println(key.toString());
ArrayList<String> record=new ArrayList<String>();
for(int i=0;i<colnum;i++){
record.add(items[distinctnum[i]]);
}
datamap.put(key.toString(), record);//Save the processed data to Hashmap
} } in.close(); fs.close();
} //create a HashMap to store original data
private static Map<String,ArrayList<String>> datamap = new HashMap<String,ArrayList<String>>();
//Program entrance
public static void main(String[] args) throws IOException{
String filename= "hdfs://localhost:9000"+args[0].substring(2);
String str=args[1];
String[] operates=str.split("\\,");//select:R1,gt,5.1
int R= Integer.parseInt(operates[0].substring(8));
String select =operates[1];
double compare=Double.valueOf(operates[2]); String sle=args[2];
String[] distincts=sle.split("\\,");//distinct:R2,R3,R5
int l=distincts.length;
int[] num=new int[l];
num[0]=Integer.parseInt(distincts[0].substring(10));
for(int i=1;i<l;i++){
num[i]=Integer.parseInt(distincts[i].substring(1));
}
try{
HDFSRead(filename,select,R,compare,l,num);
HBaseSave(l,num);
}
catch(Exception e)
{
e.printStackTrace();
}
}
/**
* Create table and save data
* @param colnum The count of colnums
* @param distinctnum The array of selected colnums index
*/
public static void HBaseSave(int colnum,int[] distinctnum) throws IOException
{
// create table descriptor
String tableName= "Result";
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
// create column descriptor
HColumnDescriptor cf = new HColumnDescriptor("res");//cf=column family
htd.addFamily(cf);
// configure HBase
Configuration configuration = HBaseConfiguration.create();
HBaseAdmin hAdmin = new HBaseAdmin(configuration); if (hAdmin.tableExists(tableName)) {
//Check if a table already exists.
hAdmin.disableTable(tableName);
hAdmin.deleteTable(tableName);
System.out.println("Table: "+tableName+" already exists and deleted");
} hAdmin.createTable(htd); //create a table named htd.
System.out.println("table: "+tableName+ " created successfully"); hAdmin.close();
// put data
int row = 0;
HTable table = new HTable(configuration,tableName);
Iterator iter = datamap.entrySet().iterator();
while(iter.hasNext())
{
Map.Entry entry = (Map.Entry) iter.next();
String key = (String)entry.getKey();
Object value = entry.getValue();
System.out.println("save: "+key + ":" + value);
Put put = new Put(String.valueOf(row).getBytes()); //create a operate of put.
for(int i=0;i<colnum;i++){
put.add("res".getBytes(),("R"+String.valueOf(distinctnum[i])).getBytes(),datamap.get(key).get(i).getBytes());
//Adds the specified column or corresponding value to the put instance.
table.put(put);
}
row++;
}
table.close();
System.out.println("put successfully");
}
}
WARNING: this VM will be cleaned without notice after you log out.
Your code and data on the VM will get lost!!!1. start hdfs and hbase$ start-dfs.sh
$ start-hbase.sh2. stop hdfs and hbase$ stop-hbase.sh
$ stop-dfs.sh3. hdfs directory is ~/work/hdfs4. To compile your java code MyCode.java (implementing class MyCode)
$ javac MyCodethen to run it
$ java MyCode <args>5. compile and run HDFSTest.java$ javac HDFSTest.java
$ java HDFSTest6. compile and run HBaseTest.java$ javac HBaseTest.java
$ java HBaseTest
check if we have successfully create mytable and put the new row
start hbase shell and run command in hbase shell$ hbase shellhbase(main):001:0> scan 'mytable'
ROW COLUMN+CELL
abc column=mycf:a, timestamp=1428459927307, value=789
1 row(s) in 1.8950 secondshbase(main):002:0> disable 'mytable'
0 row(s) in 1.9050 secondshbase(main):003:0> drop 'mytable'
0 row(s) in 1.2320 secondshbase(main):004:0> exit----------------------------------------------------程式碼如下import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import java.util.Set;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;import org.apache.log4j.*;/**
* A class Reading data from a text document on the HDFS,and store it into HBase.
* For example:
*
* java Hw1Grp4 R=/hw1/part.tbl select:R7,gt,1800 distinct:R3,R4,R5
*
* @author Zhang Fengfang
* @version 1.0.0
* @see
* @see
*/
public class Hw1Grp4 {
/**
* Read data from HDFS
* @param filename the Filename of table on the HDFS
* @param cmd The Compare command
* @param selectNum The index of column selected to be compared
* @param middle The number for comparison
* @param colnum The total number of selected columns
* @param distinctnum The array of index of selected columns
*/
//
public static void HDFSRead(String filename,String cmd,int selectNum,double middle,int colnum,int[] distinctnum)
throws IOException, URISyntaxException{
//configure hdfs
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(filename), conf); //read FSData
Path path = new Path(filename);
FSDataInputStream in_stream = fs.open(path); BufferedReader in = new BufferedReader(new InputStreamReader(in_stream));
String s; //use for store FSData
int linecount=0;//use for count lines
double tempvalue; // do select and distinct operation
while ((s=in.readLine())!=null) { //.readLine is use for read a line text every time
linecount++;
//The split() method is used to split a string into an array of strings
String[] items=s.split("\\|");//split by |.
boolean flag=false;
// compare the value to select(gt,ge,eq,lt,le,ne)
//cmd.equals:use for Test whether two objects are equal.
if(cmd.equals("gt")){
tempvalue= Double.parseDouble(items[selectNum]); if(tempvalue > middle){
flag=true;
}
}
else if(cmd.equals("ge")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue >= middle)flag=true;
}
else if(cmd.equals("eq")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue == middle)flag=true;
}
else if(cmd.equals("lt")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue < middle)flag=true;
}
else if(cmd.equals("le")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue <= middle)flag=true;
}
else if(cmd.equals("ne")){
tempvalue= Double.parseDouble(items[selectNum]);
if(tempvalue != middle)flag=true;
}else{
System.out.println("cmd error");
System.exit(0);
} // this record is passed by this selection
if(flag == true){
StringBuilder key = new StringBuilder("");
for(int i=0;i<colnum;i++){
key.append(items[distinctnum[i]]);//.append:add data
}
if(datamap.containsKey(key.toString())){
// the temp record is the same with some record in the dataMap by comparing the key
continue;
}
System.out.println(key.toString());
ArrayList<String> record=new ArrayList<String>();
for(int i=0;i<colnum;i++){
record.add(items[distinctnum[i]]);
}
datamap.put(key.toString(), record);//Save the processed data to Hashmap
} } in.close(); fs.close();
} //create a HashMap to store original data
private static Map<String,ArrayList<String>> datamap = new HashMap<String,ArrayList<String>>();
//Program entrance
public static void main(String[] args) throws IOException{
String filename= "hdfs://localhost:9000"+args[0].substring(2);
String str=args[1];
String[] operates=str.split("\\,");//select:R1,gt,5.1
int R= Integer.parseInt(operates[0].substring(8));
String select =operates[1];
double compare=Double.valueOf(operates[2]); String sle=args[2];
String[] distincts=sle.split("\\,");//distinct:R2,R3,R5
int l=distincts.length;
int[] num=new int[l];
num[0]=Integer.parseInt(distincts[0].substring(10));
for(int i=1;i<l;i++){
num[i]=Integer.parseInt(distincts[i].substring(1));
}
try{
HDFSRead(filename,select,R,compare,l,num);
HBaseSave(l,num);
}
catch(Exception e)
{
e.printStackTrace();
}
}
/**
* Create table and save data
* @param colnum The count of colnums
* @param distinctnum The array of selected colnums index
*/
public static void HBaseSave(int colnum,int[] distinctnum) throws IOException
{
// create table descriptor
String tableName= "Result";
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
// create column descriptor
HColumnDescriptor cf = new HColumnDescriptor("res");//cf=column family
htd.addFamily(cf);
// configure HBase
Configuration configuration = HBaseConfiguration.create();
HBaseAdmin hAdmin = new HBaseAdmin(configuration); if (hAdmin.tableExists(tableName)) {
//Check if a table already exists.
hAdmin.disableTable(tableName);
hAdmin.deleteTable(tableName);
System.out.println("Table: "+tableName+" already exists and deleted");
} hAdmin.createTable(htd); //create a table named htd.
System.out.println("table: "+tableName+ " created successfully"); hAdmin.close();
// put data
int row = 0;
HTable table = new HTable(configuration,tableName);
Iterator iter = datamap.entrySet().iterator();
while(iter.hasNext())
{
Map.Entry entry = (Map.Entry) iter.next();
String key = (String)entry.getKey();
Object value = entry.getValue();
System.out.println("save: "+key + ":" + value);
Put put = new Put(String.valueOf(row).getBytes()); //create a operate of put.
for(int i=0;i<colnum;i++){
put.add("res".getBytes(),("R"+String.valueOf(distinctnum[i])).getBytes(),datamap.get(key).get(i).getBytes());
//Adds the specified column or corresponding value to the put instance.
table.put(put);
}
row++;
}
table.close();
System.out.println("put successfully");
}
}