1. 程式人生 > 其它 >Hive&HBase JDBC之抽象工廠模式

Hive&HBase JDBC之抽象工廠模式

技術標籤:HBaseHivedomjdbchivehbase大資料

文章目錄

抽象工廠模式的Hive和HBase連線

設計目的:通過將配置檔案和Hive、HBase的執行程式碼全部寫在xml檔案內,然後直接一次性載入讀取xml完成程式碼的執行

一、dwdom工程

1、新建dom4j工程

1.1 新建介面Dom

  • 定義初始化dom的init方法
  • 定義根據xpath獲取元素資訊的get方法
package cn.kgc.datawarehouse.dom.util;

public interface Dom {
    //接收外部xml檔案路徑,完成初始化
    void
init(String path) throws Exception;//出異常就是失敗,所以不需要返回值 //鍵:字串 //NodeName,拿到多個是list,拿到一個是Node,所以用泛型 <T> T get(String key,String...subs); }

1.2 新建類DomFactory

1.2.1 重寫init方法
  • 檔案讀取check
  • 初始化reader、doc、root
1.2.2 重寫get方法
  • 判斷節點是否有attribute屬性值,因為檔案會根據屬性值使用不同的方法執行其中的text命令
  • 重寫get方法:把傳入的引數拼接為xpath路徑;根據xpath獲取元素列表,分為【元素唯一無子節點、元素唯一有子節點、元素不唯一】三種情況
package cn.kgc.datawarehouse.dom.util;

import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;

public abstract class DomFactory {
    //不給初始化
    public static Dom get() {
        return new Dom() {
            private SAXReader reader;
            private Document doc;
            private Element root;

            private File checkPath(String path) throws Exception {
                if (null == path) {
                    throw new NullPointerException(" dom xml null error");
                }
                File xml = new File(path);
                if (!xml.exists()) {
                    throw new FileNotFoundException(path + " not exist error");
                }
                if (!xml.isFile()) {
                    throw new Exception(path + " not file error");
                }
                if (!xml.getName().endsWith(".xml")) {
                    throw new Exception(path + " mot xml error");
                }
                if (!xml.canRead()) {
                    throw new Exception(path + " unreadable error");
                }
                return xml;
            }

            @Override
            public void init(String path) throws Exception {
                File xml = checkPath(path);
                reader = new SAXReader();
                doc = reader.read(xml);
                root = doc.getRootElement();
            }

            @Override
            public <T> T get(String key, String... subs) {
                StringBuilder builder = new StringBuilder("//");
                builder.append(key);
                for (String sub : subs) {
                    builder.append("/");
                    builder.append(sub);
                }
                //拼出來之後,檢索元素集合
                List<Element> list = doc.selectNodes(builder.toString());
                if (null != list && list.size() > 0) {
                    int subSize = 0;
                    //如果只有一個元素
                    if (list.size() == 1) {
                        //如果檢索元素唯一
                        Element el = list.get(0);
                        //獲取唯一元素的子元素集合
                        list = el.elements();
                        if ((subSize = list.size()) == 0) {
                            //葉節點,沒有子節點,提取文字
                            return ((T) el.getText());
                        }
                    }
                    //檢索元素集合超過1,或檢索元素唯一但存在子元素
                    if (list.size() > 1 || subSize > 0) {
                        List<String> lst = new ArrayList<>(list.size());
                        for (Element element : list) {
                            lst.add((element).getText());
                        }
                        return ((T) lst);
                    }
                }
                return null;
            }
        };
    }
}

2、打成jar包

MavenProject->Lifecycle->install的方式

二、dwjob工程

1、匯入pom依賴

1.1 自定義的Dom依賴

除了常規的Hive、HBase、Hadoop、Log4j、叢集管理repositories依賴之外,匯入自定義的Dom依賴,如下:

    <!--myself - dom-->
    <dependency>
      <groupId>cn.kgc.datawarehouse.dom</groupId>
      <artifactId>dwdom</artifactId>
      <version>1.0-SNAPSHOT</version>
    </dependency>

1.2 hadoop相關依賴

再打入其他hadoop相關jar包

<properties>
        <hadoop.version>2.6.0</hadoop.version>
        <hbase.version>1.2.0-cdh5.14.2</hbase.version>
        <hive.version>1.1.0</hive.version>
    </properties>
  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>
	<dependencies>
<!-- hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>
        <!--hbase-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <!-- hadoop-client,auth,common,hdfs,mapreduce -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jdk.tools</artifactId>
                    <groupId>jdk.tools</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jdk.tools</artifactId>
                    <groupId>jdk.tools</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jdk.tools</artifactId>
                    <groupId>jdk.tools</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jdk.tools</artifactId>
                    <groupId>jdk.tools</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>

2、Hive JDBC

2.1 新建介面Hive

定義執行方法execute,過載

  • 過載1:根據引數執行一條語句
  • 過載2:多條sql同時執行
package cn.kgc.dw.com.hive;

import cn.kgc.dw.com.DW;
import java.util.List;

public interface Hive extends DW {
    boolean execute(String sql,Object...params);
    boolean execute(List<String> sql);
}

2.2 新建類HiveFactory

設計思路:通過讀取xml檔案,返回介面Hive型別介面物件,然後重寫並呼叫其中的方法。全程con只連線關閉一次

直接放程式碼,原理就是簡單的JDBC

package cn.kgc.dw.com.hive;

import cn.kgc.datawarehouse.dom.util.Dom;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;

public class HiveFactory {
    public static Hive get(Dom dom) throws Exception {
        String driver = dom.get("driver");
        Class.forName(driver);
        String url = dom.get("url");
        String username = dom.get("username");
        String password = dom.get("password");
        return new Hive() {
            //引入log4j4j記錄hive sql執行異常,匯入日誌
            private Logger logger = Logger.getLogger(Hive.class);

            private Connection getCon() throws SQLException {
                return DriverManager.getConnection(url,username,password);
            }
            private PreparedStatement getPst(Connection con ,String sql,Object...params) throws SQLException {
                PreparedStatement pst = con.prepareStatement(sql);
                for (int i = 0; i < params.length; i++) {
                    pst.setObject(i+1,params[i]);
                }
                return pst;
            }

            private boolean exe(Connection con,String sql, Object... params) {
                PreparedStatement pst = null;
                try {
                    pst = getPst(con,sql,params);
                    pst.execute();
                    //成功了也寫進日誌裡
                    if (params.length>0){
                        sql +="PARAMS:"+ Arrays.toString(params);
                    }
                    logger.info("execute "+sql+" successfully");
                    return true;
                    //寫一個介面進行資源釋放
                } catch (SQLException e) {
//                    e.printStackTrace();
                    //把異常寫到日誌裡,增加log4j
                    logger.error(e.getMessage());
                }finally {
                    close(pst);
                }
                return false;
            }

            @Override
            public boolean execute(String sql, Object... params) {
                Connection con = null;
                try {
                    con = getCon();
                    //成功了也寫進日誌裡
                    return exe(con,sql,params);
                    //寫一個介面進行資源釋放
                } catch (SQLException e) {
//                    e.printStackTrace();
                    //把異常寫到日誌裡,增加log4j
                    logger.error(e.getMessage());
                }finally {
                    close(con);
                }
                return false;
            }

            @Override
            public boolean execute(List<String> sql) {
                Connection con =null;
                try {
                    con = getCon();
                    for (String s : sql) {
                        if (!exe(con,s)){
                            return false;
                        }
                    }
                    return true;
                } catch (SQLException e) {
                    logger.error(e.getMessage());
                }finally {
                    close(con);
                }
                return false;
            }
        };
    }
}

3、HBase JDBC

3.1 新建HBase介面

一般來說,因為命令語句的不方便,在HBase中很少操作增刪改查。因此本例介面中只寫了建NS和Table的操作

package cn.kgc.dw.com.hbase;

import cn.kgc.dw.com.DW;
import java.util.List;

public interface HBase extends DW {
    boolean createNameSpace(List<String> names);
    boolean createTable(List<String> tables);
}

3.2 新建類HBaseFactory

  • HBase的設計思路和Hive的基本一樣,只是HBase的配置資訊需要傳入一對的值,Hive只需要傳入Value即可
  • 需要注意xml檔案裡的前後空格,可以通過trim方法消除

程式碼如下:

package cn.kgc.dw.com.hbase;

import cn.kgc.datawarehouse.dom.util.Dom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;

public abstract class HBaseFactory {
    public static HBase get(Dom dom) {
        List<String> cnf = dom.get("hbase");//相當於 xpath="//hbase"
        Configuration config = new Configuration();
        for (String s : cnf) {
            String[] split = s.split("->");
            config.set(split[0], split[1]);
        }

        return new HBase() {
            private Logger logger = Logger.getLogger(HBase.class);

            private Connection getCon() throws IOException {
                return ConnectionFactory.createConnection(config);
            }

            @Override
            public boolean createNameSpace(List<String> names) {
                Connection con = null;
                Admin admin = null;
                try {
                    con = getCon();
                    admin = con.getAdmin();
                    for (String name : names) {
                        name = name.trim();
                        admin.createNamespace(NamespaceDescriptor.create(name).build());
                        logger.info("create namespace " + name + " succeed");
                    }
                    return true;
                } catch (IOException e) {
                    logger.error(e.getMessage());
                } finally {
                    close(admin, con);
                }
                return false;
            }

            @Override
            public boolean createTable(List<String> tables) {
                Connection con = null;
                Admin admin = null;
                try {
                    con = getCon();
                    admin = con.getAdmin();
                    for (String table : tables) {
                        table=table.trim();
                        String[] ps = table.split(",");
                        TableName tn = TableName.valueOf(ps[0]);
                        HTableDescriptor tbl = new HTableDescriptor(tn);
                        for (int i = 1; i < ps.length; i++) {
                            tbl.addFamily(new HColumnDescriptor(ps[i]));
                        }
                        admin.createTable(tbl);
                        logger.info("create tabe "+table+" succeed");
                    }
                    return true;
                } catch (IOException e) {
                    logger.error(e.getMessage());
                } finally {
                    close(admin, con);
                }
                return false;
            }
        };
    }
}

4、通用常量配置

4.1 新建介面DW

  • 新增常用字串配置資訊
  • Hive和HBase的關閉方法封裝
package cn.kgc.dw.com;

public interface DW {
    String DW_LOG_DIR = "dw.log.dir";
    String LAYER = "layers";
    String TYPE_HQL="hql";
    String TYPE_HBASE_CREATE_NS="hbase->create->namespace";
    String TYPE_HBASE_CREATE_TABLE="hbase->create->table";

    default void close(AutoCloseable...closeables){
        for (AutoCloseable closeable : closeables) {
            if (null!=closeable) {
                try {
                    closeable.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

5、Job類

5.1 新建類DWJob

5.2 初始化

  • 初始化日誌輸出
  • 定義Hive、HBase、Dom物件
  • 初始化Hive、HBase、Dom物件、日誌輸出路徑

5.3 對不同型別的執行語句進行區分

  • 定義一個鍵值儲存的類Sqls,每個物件儲存Type和該型別的所有sql語句
  • 定義一個方法parse迴圈讀取一批葉子節點下的Hive、HBase命令,根據屬性判斷區分Hive、HBase。先new一個list;若第一個是Hive,則new一個Sqls物件並裝進去;若第二個仍是Hive,add進去(Sqls類中封裝了List的add方法);若第二個是HBase,把前一個Sqls物件add到list中,重新new一個Sqls物件;以此類推,直到這一批次所有命令都被add到list中,返回list

5.4 執行語句的方法

  • 先根據xpath獲取所有批次(節點)
  • 對於每個批次(節點),獲取其中的所有子節點,獲取其中的內容(sql語句)並區分
  • 再次迴圈,使用switch case分類執行其中的語句

5.5 start啟動方法

  • 初始化
  • 執行
  • 最後別忘了日誌輸出,防止出了問題無從檢查

程式碼如下:

package cn.kgc.dw.com;

import cn.kgc.dw.com.hbase.HBase;
import cn.kgc.dw.com.hbase.HBaseFactory;
import cn.kgc.dw.com.hive.Hive;
import cn.kgc.dw.com.hive.HiveFactory;
import cn.kgc.datawarehouse.dom.util.Dom;
import cn.kgc.datawarehouse.dom.util.DomFactory;
import org.apache.log4j.Logger;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public class DWJob {
    private static Logger logger = Logger.getLogger(DWJob.class);
    private Dom dom;
    private Hive hive;
    private HBase hBase;

    private void init(String xmlPath) throws Exception {
        dom = DomFactory.get();
        dom.init(xmlPath);
        System.setProperty(DW.DW_LOG_DIR, new File(xmlPath).getParentFile().getAbsolutePath());//拿到他上一級目錄的絕對路徑
        hive = HiveFactory.get(dom);
        hBase = HBaseFactory.get(dom);
    }

    class Sql {
        private String type;
        private List<String> sqls;

        Sql(String type) {
            this.type = type;
            sqls = new ArrayList<>();
        }

        public void add(String sql) {
            sqls.add(sql);
        }

        public String getType() {
            return type;
        }

        public List<String> getSqls() {
            return sqls;
        }
    }

    private String format(String sql){
        return sql.trim();
    }

    private List<Sql> parse(List<String> sqls){
        List<Sql> list = new ArrayList<>();
        Sql _sql = null;
        for (String sql : sqls) {
            String[] ps = sql.split(";");
            if (null == _sql) {
                _sql = new Sql(ps[0]);
                _sql.add(format(ps[1]));
            } else if(_sql.getType().equals(ps[0])) {
                _sql.add(format(ps[1]));
            } else {
                list.add(_sql);
                _sql = new Sql(ps[0]);
                _sql.add(format(ps[1]));
            }
        }
        list.add(_sql);
        return list;
    }

    private void exeJob() {
        List<String> layers = dom.get(DW.LAYER);
        for (String layer : layers) {//分層
            logger.info("-----------------------start job layer:"+layer+"---------------------");
            List<String> sqls = dom.get(layer);
            List<Sql> parse = parse(sqls);
            System.out.println("11111111111111");

            for (Sql sql : parse) {//層內分段
                switch (sql.getType()) {
                    case DW.TYPE_HQL:
                        hive.execute(sql.getSqls());
                        break;
                    case DW.TYPE_HBASE_CREATE_NS:
                        hBase.createNameSpace(sql.getSqls());
                        break;
                    case DW.TYPE_HBASE_CREATE_TABLE:
                        hBase.createTable(sql.getSqls());
                        break;
                }
            }
        }
    }

    public void start(String xmlPath) {
        long time = new Date().getTime();
        try {
            init(xmlPath);
            logger.info("[ start dw_job_"+time+"]");
            System.out.println("111");
            exeJob();
            logger.info("[ end dw_job_"+time+"]");
        } catch (Exception e) {
            logger.error("[ error dw_job_"+time+"]"+e.getMessage());
        }
    }
}

6、執行

ToolBar->Run->Edit Configuration->左上角±>Application->Name寫主類名->Main class寫主類路徑->Program arguments設定引數路徑(本例為xml路徑)

image-20210202015527768