hadoop生態系統學習之路(六)hive的簡單使用
一、hive的基本概念與原理
Hive是基於Hadoop之上的資料倉庫,可以儲存、查詢和分析儲存在 Hadoop 中的大規模資料。Hive 定義了簡單的類 SQL 查詢語言,稱為 HQL,它允許熟悉 SQL 的使用者查詢資料,允許熟悉 MapReduce 開發者的開發自定義的 mapper 和 reducer 來處理內建的 mapper 和 reducer 無法完成的複雜的分析工作。Hive 沒有專門的資料格式。
hive的訪問方式:
hive的執行原理:
二、hive的常用命令
連線進入hive:hive
刪除資料庫:drop database if exists qyk_test cascade;如下圖:
然後,我們使用create database qyk_test;建立一個qyk_test的資料庫,如下:
接下來,我們執行create table user_info(id bigint, account string, name string, age int) row format delimited fields terminated by ‘\t’;建立一張表,如下:
我們可以執行describe user_info;查看錶結構,如下:
然後,我們使用create table user_info_tmp like user_info;建立一個和user_info一樣結構的臨時表,如下:
然後我們準備一個檔案user_info.txt,以製表符分隔,如下
11 1200.0 qyk1 21
22 1301 qyk2 22
33 1400.0 qyk3 23
44 1500.0 qyk4 24
55 1210.0 qyk5 25
66 124 qyk6 26
77 1233 qyk7 27
88 15011 qyk8 28
接下來執行load data local inpath ‘/tmp/user_info.txt’ into table user_info;可看到如下:
然後執行select * from user_info;可看到:
然後,我們執行insert into table user_info_tmp select id, account, name, age from user_info;可以看到:
這裡,hive將此語句的執行轉為MR,最後將資料入到user_info_tmp。
然後,我們執行select count(*) from user_info_tmp;可看到:
同樣的是將sql轉為mr執行。
最後,執行insert overwrite table user_info select * from user_info where 1=0;清空表資料。
執行drop table user_info_tmp;便可刪除表,如下:
好了,基本命令就講到這兒,關於外部表、分割槽、桶以及儲存格式相關的概念大家也可以去研究下。
三、編寫MR將資料直接入到hive
此MR只有Mapper,沒有reducer。直接在mapper輸出到hive表。
pom需新增依賴:
<!-- hcatalog相關jar -->
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-hbase-storage-handler</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-server-extensions</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-pig-adapter</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-webhcat-java-client</artifactId>
<version>${hive.version}</version>
</dependency>
Mapper類:
/**
* Project Name:mr-demo
* File Name:HiveStoreMapper.java
* Package Name:org.qiyongkang.mr.hivestore
* Date:2016年4月4日下午10:02:07
* Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
*
*/
package org.qiyongkang.mr.hivestore;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
/**
* ClassName:HiveStoreMapper <br/>
* Function: Mapper類. <br/>
* Date: 2016年4月4日 下午10:02:07 <br/>
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class HiveStoreMapper extends Mapper<Object, Text, WritableComparable<Object>, HCatRecord> {
private HCatSchema schema = null;
//每個mapper例項,執行一次
@Override
protected void setup(Mapper<Object, Text, WritableComparable<Object>, HCatRecord>.Context context)
throws IOException, InterruptedException {
schema = HCatOutputFormat.getTableSchema(context.getConfiguration());
}
@Override
protected void map(Object key, Text value, Mapper<Object, Text, WritableComparable<Object>, HCatRecord>.Context context)
throws IOException, InterruptedException {
//每行以製表符分隔 id, account, name, age
String[] strs = value.toString().split("\t");
HCatRecord record = new DefaultHCatRecord(4);
//id,通過列下表
record.set(0, Long.valueOf(strs[0]));
//account
record.set(1, strs[1]);
//name
record.set(2, strs[2]);
//age,通過欄位名稱
record.set("age", schema, Integer.valueOf(strs[3]));
//寫入到hive
context.write(null, record);
}
public static void main(String[] args) {
String value = "1 1200 qyk 24";
String[] strs = value.toString().split("\t");
for (int i = 0; i < strs.length; i++) {
System.out.println(strs[i]);
}
}
}
主類:
/**
* Project Name:mr-demo
* File Name:LoadDataToHiveMR.java
* Package Name:org.qiyongkang.mr.hivestore
* Date:2016年4月4日下午9:55:42
* Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
*
*/
package org.qiyongkang.mr.hivestore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
/**
* ClassName:LoadDataToHiveMR <br/>
* Function: MR將資料直接入到hive. <br/>
* Date: 2016年4月4日 下午9:55:42 <br/>
*
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class LoadDataToHiveMR {
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
Job job = Job.getInstance(conf, "hive store");
job.setJarByClass(LoadDataToHiveMR.class);
// 設定Mapper
job.setMapperClass(HiveStoreMapper.class);
// 由於沒有reducer,這裡設定為0
job.setNumReduceTasks(0);
// 設定輸入檔案路徑
FileInputFormat.addInputPath(job, new Path("/qiyongkang/input"));
// 指定Mapper的輸出
job.setMapOutputKeyClass(WritableComparable.class); // map
job.setMapOutputValueClass(DefaultHCatRecord.class);// map
//設定要入到hive的資料庫和表
HCatOutputFormat.setOutput(job, OutputJobInfo.create("qyk_test", "user_info", null));
//這裡注意是使用job.getConfiguration(),不能直接使用conf
HCatSchema hCatSchema = HCatOutputFormat.getTableSchema(job.getConfiguration());
HCatOutputFormat.setSchema(job, hCatSchema);
//設定輸出格式類
job.setOutputFormatClass(HCatOutputFormat.class);
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
然後,我們使用maven打個包,上傳到伺服器。
然後,我們準備一個user_info.txt,上傳至hdfs中的/qiyongkang/input下:
11 1200.0 qyk1 21
22 1301 qyk2 22
33 1400.0 qyk3 23
44 1500.0 qyk4 24
55 1210.0 qyk5 25
66 124 qyk6 26
77 1233 qyk7 27
88 15011 qyk8 28
注意以製表符\t分隔。
然後執行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,在jobhistory可以看到:
其實,hive的元資料是放在hdfs上,執行hadoop fs -ls /user/hive/warehouse可以看到:
然後,我們在hive命令列執行 select * from user_info;可以看到:
說明資料從hdfs寫入到hive成功。
四、使用java jdbc連線Thrift Server查詢元資料
接下來,我們使用java編寫一個客戶端,來查詢剛才入到hive裡面的資料,程式碼如下:
package org.hive.demo;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
public class HiveStoreClient {
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
private static String url = "jdbc:hive2://172.31.25.8:10000/qyk_test";
private static String user = "hive";
private static String password = "";
private static final Logger log = Logger.getLogger(HiveStoreClient.class);
@SuppressWarnings("rawtypes")
public static void main(String[] args) {
Connection conn = null;
Statement stmt = null;
ResultSet res = null;
try {
//載入驅動
Class.forName(driverName);
//獲取連線
conn = DriverManager.getConnection(url, user, password);
stmt = conn.createStatement();
// select * query
String sql = "select * from user_info";
System.out.println("Running: " + sql);
//執行查詢
res = stmt.executeQuery(sql);
//處理結果集
List list = convertList(res);
System.out.println("總記錄:" + list);
//獲取總個數
sql = "select count(1) from user_info";
System.out.println("Running: " + sql);
res = stmt.executeQuery(sql);
while (res.next()) {
System.out.println("總個數:" + res.getString(1));
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
log.error(driverName + " not found!", e);
System.exit(1);
} catch (SQLException e) {
e.printStackTrace();
log.error("Connection error!", e);
System.exit(1);
} finally {
try {
res.close();
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
/**
*
* convertList:將結果集轉換成map. <br/>
*
* @author qiyongkang
* @param rs
* @return
* @throws SQLException
* @since JDK 1.6
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static List convertList(ResultSet rs) throws SQLException {
List list = new ArrayList();
ResultSetMetaData md = rs.getMetaData();
int columnCount = md.getColumnCount(); //Map rowData;
while (rs.next()) { //rowData = new HashMap(columnCount);
Map rowData = new HashMap();
for (int i = 1; i <= columnCount; i++) {
rowData.put(md.getColumnName(i), rs.getObject(i));
}
list.add(rowData);
}
return list;
}
}
執行後,可以看到控制檯輸出如下:
開始的異常可以忽略。可以看到資料,說明是成功的。
好了,hive就講到這兒了。其實,hive還可以同步hbase的資料,還可以將hive的表資料同步到impala,因為它們都是使用相同的元資料,這個在後面的博文中再進行介紹。