Hive&HBase JDBC之抽象工廠模式
阿新 • • 發佈:2021-02-14
技術標籤:HBaseHivedomjdbchivehbase大資料
文章目錄
抽象工廠模式的Hive和HBase連線
設計目的:通過將配置檔案和Hive、HBase的執行程式碼全部寫在xml檔案內,然後直接一次性載入讀取xml完成程式碼的執行
一、dwdom工程
1、新建dom4j工程
1.1 新建介面Dom
- 定義初始化dom的init方法
- 定義根據xpath獲取元素資訊的get方法
package cn.kgc.datawarehouse.dom.util;
public interface Dom {
//接收外部xml檔案路徑,完成初始化
void init(String path) throws Exception;//出異常就是失敗,所以不需要返回值
//鍵:字串 //NodeName,拿到多個是list,拿到一個是Node,所以用泛型
<T> T get(String key,String...subs);
}
1.2 新建類DomFactory
1.2.1 重寫init方法
- 檔案讀取check
- 初始化reader、doc、root
1.2.2 重寫get方法
- 判斷節點是否有attribute屬性值,因為檔案會根據屬性值使用不同的方法執行其中的text命令
- 重寫get方法:把傳入的引數拼接為xpath路徑;根據xpath獲取元素列表,分為【元素唯一無子節點、元素唯一有子節點、元素不唯一】三種情況
package cn.kgc.datawarehouse.dom.util;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
public abstract class DomFactory {
//不給初始化
public static Dom get() {
return new Dom() {
private SAXReader reader;
private Document doc;
private Element root;
private File checkPath(String path) throws Exception {
if (null == path) {
throw new NullPointerException(" dom xml null error");
}
File xml = new File(path);
if (!xml.exists()) {
throw new FileNotFoundException(path + " not exist error");
}
if (!xml.isFile()) {
throw new Exception(path + " not file error");
}
if (!xml.getName().endsWith(".xml")) {
throw new Exception(path + " mot xml error");
}
if (!xml.canRead()) {
throw new Exception(path + " unreadable error");
}
return xml;
}
@Override
public void init(String path) throws Exception {
File xml = checkPath(path);
reader = new SAXReader();
doc = reader.read(xml);
root = doc.getRootElement();
}
@Override
public <T> T get(String key, String... subs) {
StringBuilder builder = new StringBuilder("//");
builder.append(key);
for (String sub : subs) {
builder.append("/");
builder.append(sub);
}
//拼出來之後,檢索元素集合
List<Element> list = doc.selectNodes(builder.toString());
if (null != list && list.size() > 0) {
int subSize = 0;
//如果只有一個元素
if (list.size() == 1) {
//如果檢索元素唯一
Element el = list.get(0);
//獲取唯一元素的子元素集合
list = el.elements();
if ((subSize = list.size()) == 0) {
//葉節點,沒有子節點,提取文字
return ((T) el.getText());
}
}
//檢索元素集合超過1,或檢索元素唯一但存在子元素
if (list.size() > 1 || subSize > 0) {
List<String> lst = new ArrayList<>(list.size());
for (Element element : list) {
lst.add((element).getText());
}
return ((T) lst);
}
}
return null;
}
};
}
}
2、打成jar包
MavenProject->Lifecycle->install的方式
二、dwjob工程
1、匯入pom依賴
1.1 自定義的Dom依賴
除了常規的Hive、HBase、Hadoop、Log4j、叢集管理repositories依賴之外,匯入自定義的Dom依賴,如下:
<!--myself - dom-->
<dependency>
<groupId>cn.kgc.datawarehouse.dom</groupId>
<artifactId>dwdom</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
1.2 hadoop相關依賴
再打入其他hadoop相關jar包
<properties>
<hadoop.version>2.6.0</hadoop.version>
<hbase.version>1.2.0-cdh5.14.2</hbase.version>
<hive.version>1.1.0</hive.version>
</properties>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<!-- hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<!--hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- hadoop-client,auth,common,hdfs,mapreduce -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
2、Hive JDBC
2.1 新建介面Hive
定義執行方法execute,過載
- 過載1:根據引數執行一條語句
- 過載2:多條sql同時執行
package cn.kgc.dw.com.hive;
import cn.kgc.dw.com.DW;
import java.util.List;
public interface Hive extends DW {
boolean execute(String sql,Object...params);
boolean execute(List<String> sql);
}
2.2 新建類HiveFactory
設計思路:通過讀取xml檔案,返回介面Hive型別介面物件,然後重寫並呼叫其中的方法。全程con只連線關閉一次
直接放程式碼,原理就是簡單的JDBC
package cn.kgc.dw.com.hive;
import cn.kgc.datawarehouse.dom.util.Dom;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
public class HiveFactory {
public static Hive get(Dom dom) throws Exception {
String driver = dom.get("driver");
Class.forName(driver);
String url = dom.get("url");
String username = dom.get("username");
String password = dom.get("password");
return new Hive() {
//引入log4j4j記錄hive sql執行異常,匯入日誌
private Logger logger = Logger.getLogger(Hive.class);
private Connection getCon() throws SQLException {
return DriverManager.getConnection(url,username,password);
}
private PreparedStatement getPst(Connection con ,String sql,Object...params) throws SQLException {
PreparedStatement pst = con.prepareStatement(sql);
for (int i = 0; i < params.length; i++) {
pst.setObject(i+1,params[i]);
}
return pst;
}
private boolean exe(Connection con,String sql, Object... params) {
PreparedStatement pst = null;
try {
pst = getPst(con,sql,params);
pst.execute();
//成功了也寫進日誌裡
if (params.length>0){
sql +="PARAMS:"+ Arrays.toString(params);
}
logger.info("execute "+sql+" successfully");
return true;
//寫一個介面進行資源釋放
} catch (SQLException e) {
// e.printStackTrace();
//把異常寫到日誌裡,增加log4j
logger.error(e.getMessage());
}finally {
close(pst);
}
return false;
}
@Override
public boolean execute(String sql, Object... params) {
Connection con = null;
try {
con = getCon();
//成功了也寫進日誌裡
return exe(con,sql,params);
//寫一個介面進行資源釋放
} catch (SQLException e) {
// e.printStackTrace();
//把異常寫到日誌裡,增加log4j
logger.error(e.getMessage());
}finally {
close(con);
}
return false;
}
@Override
public boolean execute(List<String> sql) {
Connection con =null;
try {
con = getCon();
for (String s : sql) {
if (!exe(con,s)){
return false;
}
}
return true;
} catch (SQLException e) {
logger.error(e.getMessage());
}finally {
close(con);
}
return false;
}
};
}
}
3、HBase JDBC
3.1 新建HBase介面
一般來說,因為命令語句的不方便,在HBase中很少操作增刪改查。因此本例介面中只寫了建NS和Table的操作
package cn.kgc.dw.com.hbase;
import cn.kgc.dw.com.DW;
import java.util.List;
public interface HBase extends DW {
boolean createNameSpace(List<String> names);
boolean createTable(List<String> tables);
}
3.2 新建類HBaseFactory
- HBase的設計思路和Hive的基本一樣,只是HBase的配置資訊需要傳入一對的值,Hive只需要傳入Value即可
- 需要注意xml檔案裡的前後空格,可以通過trim方法消除
程式碼如下:
package cn.kgc.dw.com.hbase;
import cn.kgc.datawarehouse.dom.util.Dom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
public abstract class HBaseFactory {
public static HBase get(Dom dom) {
List<String> cnf = dom.get("hbase");//相當於 xpath="//hbase"
Configuration config = new Configuration();
for (String s : cnf) {
String[] split = s.split("->");
config.set(split[0], split[1]);
}
return new HBase() {
private Logger logger = Logger.getLogger(HBase.class);
private Connection getCon() throws IOException {
return ConnectionFactory.createConnection(config);
}
@Override
public boolean createNameSpace(List<String> names) {
Connection con = null;
Admin admin = null;
try {
con = getCon();
admin = con.getAdmin();
for (String name : names) {
name = name.trim();
admin.createNamespace(NamespaceDescriptor.create(name).build());
logger.info("create namespace " + name + " succeed");
}
return true;
} catch (IOException e) {
logger.error(e.getMessage());
} finally {
close(admin, con);
}
return false;
}
@Override
public boolean createTable(List<String> tables) {
Connection con = null;
Admin admin = null;
try {
con = getCon();
admin = con.getAdmin();
for (String table : tables) {
table=table.trim();
String[] ps = table.split(",");
TableName tn = TableName.valueOf(ps[0]);
HTableDescriptor tbl = new HTableDescriptor(tn);
for (int i = 1; i < ps.length; i++) {
tbl.addFamily(new HColumnDescriptor(ps[i]));
}
admin.createTable(tbl);
logger.info("create tabe "+table+" succeed");
}
return true;
} catch (IOException e) {
logger.error(e.getMessage());
} finally {
close(admin, con);
}
return false;
}
};
}
}
4、通用常量配置
4.1 新建介面DW
- 新增常用字串配置資訊
- Hive和HBase的關閉方法封裝
package cn.kgc.dw.com;
public interface DW {
String DW_LOG_DIR = "dw.log.dir";
String LAYER = "layers";
String TYPE_HQL="hql";
String TYPE_HBASE_CREATE_NS="hbase->create->namespace";
String TYPE_HBASE_CREATE_TABLE="hbase->create->table";
default void close(AutoCloseable...closeables){
for (AutoCloseable closeable : closeables) {
if (null!=closeable) {
try {
closeable.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
5、Job類
5.1 新建類DWJob
5.2 初始化
- 初始化日誌輸出
- 定義Hive、HBase、Dom物件
- 初始化Hive、HBase、Dom物件、日誌輸出路徑
5.3 對不同型別的執行語句進行區分
- 定義一個鍵值儲存的類Sqls,每個物件儲存Type和該型別的所有sql語句
- 定義一個方法parse迴圈讀取一批葉子節點下的Hive、HBase命令,根據屬性判斷區分Hive、HBase。先new一個list;若第一個是Hive,則new一個Sqls物件並裝進去;若第二個仍是Hive,add進去(Sqls類中封裝了List的add方法);若第二個是HBase,把前一個Sqls物件add到list中,重新new一個Sqls物件;以此類推,直到這一批次所有命令都被add到list中,返回list
5.4 執行語句的方法
- 先根據xpath獲取所有批次(節點)
- 對於每個批次(節點),獲取其中的所有子節點,獲取其中的內容(sql語句)並區分
- 再次迴圈,使用switch case分類執行其中的語句
5.5 start啟動方法
- 初始化
- 執行
- 最後別忘了日誌輸出,防止出了問題無從檢查
程式碼如下:
package cn.kgc.dw.com;
import cn.kgc.dw.com.hbase.HBase;
import cn.kgc.dw.com.hbase.HBaseFactory;
import cn.kgc.dw.com.hive.Hive;
import cn.kgc.dw.com.hive.HiveFactory;
import cn.kgc.datawarehouse.dom.util.Dom;
import cn.kgc.datawarehouse.dom.util.DomFactory;
import org.apache.log4j.Logger;
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class DWJob {
private static Logger logger = Logger.getLogger(DWJob.class);
private Dom dom;
private Hive hive;
private HBase hBase;
private void init(String xmlPath) throws Exception {
dom = DomFactory.get();
dom.init(xmlPath);
System.setProperty(DW.DW_LOG_DIR, new File(xmlPath).getParentFile().getAbsolutePath());//拿到他上一級目錄的絕對路徑
hive = HiveFactory.get(dom);
hBase = HBaseFactory.get(dom);
}
class Sql {
private String type;
private List<String> sqls;
Sql(String type) {
this.type = type;
sqls = new ArrayList<>();
}
public void add(String sql) {
sqls.add(sql);
}
public String getType() {
return type;
}
public List<String> getSqls() {
return sqls;
}
}
private String format(String sql){
return sql.trim();
}
private List<Sql> parse(List<String> sqls){
List<Sql> list = new ArrayList<>();
Sql _sql = null;
for (String sql : sqls) {
String[] ps = sql.split(";");
if (null == _sql) {
_sql = new Sql(ps[0]);
_sql.add(format(ps[1]));
} else if(_sql.getType().equals(ps[0])) {
_sql.add(format(ps[1]));
} else {
list.add(_sql);
_sql = new Sql(ps[0]);
_sql.add(format(ps[1]));
}
}
list.add(_sql);
return list;
}
private void exeJob() {
List<String> layers = dom.get(DW.LAYER);
for (String layer : layers) {//分層
logger.info("-----------------------start job layer:"+layer+"---------------------");
List<String> sqls = dom.get(layer);
List<Sql> parse = parse(sqls);
System.out.println("11111111111111");
for (Sql sql : parse) {//層內分段
switch (sql.getType()) {
case DW.TYPE_HQL:
hive.execute(sql.getSqls());
break;
case DW.TYPE_HBASE_CREATE_NS:
hBase.createNameSpace(sql.getSqls());
break;
case DW.TYPE_HBASE_CREATE_TABLE:
hBase.createTable(sql.getSqls());
break;
}
}
}
}
public void start(String xmlPath) {
long time = new Date().getTime();
try {
init(xmlPath);
logger.info("[ start dw_job_"+time+"]");
System.out.println("111");
exeJob();
logger.info("[ end dw_job_"+time+"]");
} catch (Exception e) {
logger.error("[ error dw_job_"+time+"]"+e.getMessage());
}
}
}
6、執行
ToolBar->Run->Edit Configuration->左上角±>Application->Name寫主類名->Main class寫主類路徑->Program arguments設定引數路徑(本例為xml路徑)