1. 程式人生 > 實用技巧 >Flink 從 0 到 1 學習之(23)Flink 讀取hive並寫入hive

Flink 從 0 到 1 學習之(23)Flink 讀取hive並寫入hive

1,讀取實現了,也是找的資料,核心就是實現了

HCatInputFormat
HCatInputFormatBase

上面這兩個類,底層也是 繼承實現了 RichInputFormat:

public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryabl

百度下載這個jar,然後把類找出來


依賴:(大概是這些)

<!--flink_hive依賴-->
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-fs</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>com.jolbox</groupId> <artifactId>bonecp</artifactId> <version>0.8.0.RELEASE</
version> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hive-bundle</artifactId> <version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId
>hive-exec</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-metastore</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-cli</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-common</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-service</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-shims</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-core</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libfb303</artifactId> <version>0.9.3</version> <type>pom</type> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>1.6.2</version> </dependency>

讀取hive資料:

package com.coder.flink.core.FlinkHive
 
 
import org.apache.flink.api.scala.ExecutionEnvironment
 
import org.apache.hadoop.conf.Configuration
import org.apache.flink.api.scala._
 
 
//讀取hive的資料
object ReadHive {
  def main(args: Array[String]): Unit = {
 
      val conf = new Configuration()
      conf.set("hive.metastore.local", "false")
 
      conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083")
       //如果是高可用 就需要是nameserver
//      conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083")
 
      val env = ExecutionEnvironment.getExecutionEnvironment
 
      //todo 返回型別
      val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf))
 
      dataset.first(10).print()
//      env.execute("flink hive test")
 
 
  }
 
}

好訊息是 Flink 1.9支援了Hive讀寫介面不過我們可以用Hive Jdbc的方式去讀寫hive,可能就是效能會比較慢:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>2.1.0</version>
</dependency>
package com.coder.flink.core.FlinkHive;
 
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
 
import java.sql.*;
 
public class FlinkReadHive {
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
 
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection con = DriverManager.getConnection("jdbc:hive2://172.10.4.143:10000/aijiami","hive","hive");
        Statement st = con.createStatement();
        ResultSet rs = st.executeQuery("SELECT * from ods_scenes_detail_new limit 10");
        while (rs.next()){
            System.out.println(rs.getString(1) + "," + rs.getString(2));
        }
        rs.close();
        st.close();
        con.close();
 
 
    }
}
public class HiveApp {
     
    private static String driver = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://Master:10000/default";
    private static String user = "root"; //一般情況下可以使用匿名的方式,在這裡使用了root是因為整個Hive的所有安裝等操作都是root
    private static String password = "";
 
    public static void main(String[] args) {
        ResultSet res = null;
         
        try {
            /**
             * 第一步:把JDBC驅動通過反射的方式載入進來
             */
            Class.forName(driver);
             
            /**
             * 第二步:通過JDBC建立和Hive的聯結器,預設埠是10000,預設使用者名稱和密碼都為空
             */
            Connection conn = DriverManager.getConnection(url, user, password); 
             
            /**
             * 第三步:建立Statement控制代碼,基於該控制代碼進行SQL的各種操作;
             */
            Statement stmt = conn.createStatement();
             
            /**
             * 接下來就是SQL的各種操作;
             * 第4.1步驟:建表Table,如果已經存在的話就要首先刪除;
             */
            String tableName = "testHiveDriverTable";
            stmt.execute("drop table if exists " + tableName );
            
             
            stmt.execute("create table " + tableName + " (id int, name string)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'");
            /**
             *  第4.2步驟:查詢建立的Table;
             */
            String sql = "show tables '" + tableName + "'";
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            if (res.next()) {
              System.out.println(res.getString(1));
            }
            /**
             *  第4.3步驟:查詢建立的Table的schema;
             */
            sql = "describe " + tableName;
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println(res.getString(1) + "\t" + res.getString(2));
            }
          
            /**
             *  第4.4步驟:載入資料進入Hive中的Table;
             */
            String filepath = "/root/Documents/data/sql/testHiveDriver.txt";
            sql = "load data local inpath '" + filepath + "' into table " + tableName;
            System.out.println("Running: " + sql);
            stmt.execute(sql);
          
            /**
             *  第4.5步驟:查詢進入Hive中的Table的資料;
             */
            sql = "select * from " + tableName;
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2));
            }
          
            /**
             *  第4.6步驟:Hive中的對Table進行統計操作;
             */
            sql = "select count(1) from " + tableName;   //在執行select count(*) 時候會生成mapreduce 操作  ,那麼需要啟動資源管理器 yarn  : start-yarn.sh 
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
           
            while (res.next()) {
              System.out.println("Total lines :" + res.getString(1));
            }    
             
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
         
         
 
    }
 
}

 

寫入HDFS的簡單案例:

package com.coder.flink.core.test_demo
 
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem.WriteMode
 
object WriteToHDFS {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2.定義資料 stu(age,name,height)
    val stu: DataSet[(Int, String, String)] = env.fromElements(
      (19, "zhangsan","aaaa"),
      (1449, "zhangsan","aaaa"),
      (33, "zhangsan","aaaa"),
      (22, "zhangsan","aaaa")
    )
 
    //todo 輸出到本地
    stu.setParallelism(1).writeAsText("file:///C:/Users/Administrator/Desktop/Flink程式碼/測試資料/test001.txt",
      WriteMode.OVERWRITE)
    env.execute()
 
 
    //todo 寫入到hdfs,文字文件,路徑不存在則自動建立路徑。
    stu.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt",
      WriteMode.OVERWRITE)
    env.execute()
 
    //todo 寫入到hdfs,CSV文件
    //3.1讀取csv檔案
    val inPath = "hdfs:///input/flink/sales.csv"
    case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)
    val ds2 = env.readCsvFile[Sales](
      filePath = inPath,
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      lenient = false,
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3),
      pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")
    )
    //3.2將CSV文件寫入到hdfs
    val outPath = "hdfs:///output/flink/datasink/sales.csv"
    ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",fieldDelimiter = "|", WriteMode.OVERWRITE)
 
    env.execute()
  }
}