Apache Flink 各類關鍵資料格式讀取/SQL支援
目前事件歸併分為兩種,一種為實時的歸併,即基於Kafka內的資料進行歸併和事件生成;一種是週期性的歸併,即基於Hive中的資料進行資料的歸併和事件生成。
基於SQL歸併時Spark Streaming支援的輸入/輸出資料如下:
資料型別 |
Flink支援情況 |
---|---|
Kafka | 需要定義schema |
HDFS(parquet/csv/textfile) |
讀取parquet需要使用AvroParquetInputFormat csv/textfile有readCsvFile和TextFileInput |
Hive |
1.需要啟用hive service metastore來提供thrift metastore介面 2.需要依賴flink-hcatalog來進行讀取 |
JDBC(PostgreSQL) | JDBCInputFormat |
下面就Apache Flink是否支援上述格式進行測試。
1.Kafka
首先需要定義一個POJO類,用於代表從kafka讀取的dstream裡的內容:
package com.flinklearn.models; /** * Created by dell on 2018/10/23. */ public class TamAlert { private String msg; public TamAlert(){} public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } }
其次,在Flink的DataStream上執行SQL與Spark比較不同,對於Spark而言一直是stream的transform、registerTempTable動作,而在Flink上需要將DataStream轉換為Table,才能執行相關SQL,而如果要進行transform需要再次將Table轉為DataFrame才可以。
程式碼如下:
package com.flinklearn.main import java.util.Properties import com.alibaba.fastjson.{JSON} import com.flinklearn.models.TamAlert import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{TableEnvironment, Types} import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api.scala._ import scala.collection.mutable.ArrayBuffer /** * Created by dell on 2018/10/22. */ class Main { def startApp(): Unit = { val properties = new Properties() properties.setProperty("bootstrap.servers", "brokerserver") properties.setProperty("group.id", "com.flinklearn.main.Main") val env = StreamExecutionEnvironment.getExecutionEnvironment //從kafka讀取資料,得到stream val stream:DataStream[TamAlert] = env .addSource(new FlinkKafkaConsumer010[String]("mytopic", new SimpleStringSchema(), properties)) .map(line => { var rtn:TamAlert = null try{ val temp = JSON.parseObject(line).getJSONObject("payload") rtn = new TamAlert() rtn.setMsg(temp.getString("msg")) }catch{ case ex:Exception => { ex.printStackTrace() } } rtn }).filter(line=>line!=null) //將stream註冊為temp_alert表,並列印msg欄位 val tableEnv:StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) tableEnv.registerDataStream("temp_alert", stream, 'msg) val httpTable = tableEnv.sqlQuery("select msg from temp_alert") val httpStream = tableEnv.toAppendStream[(String,String,Integer)](httpTable) httpStream.print() env.execute("Kafka sql test.") } } object Main { def main(args:Array[String]):Unit = { new Main().startApp() } }
2.HDFS
2.1 Parquet
對於HDFS Parquet格式的資料,Apache Flink並不如Spark一般有十分方便的read.parquet()介面,需要藉助AvroParquetInputFormat來讀取相應檔案。具體操作步驟如下:
1.在pom.xml中引入相應的依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
2.使用avsc檔案定義schema
{"namespace": "com.flinklearn.models",
"type": "record",
"name": "AvroTamAlert",
"fields": [
{"name": "msg", "type": ["string","null"]}
]
}
3.使用avro-tools生成對應的java類,並將java檔案拷貝到專案裡,本例子中是AvroTamAlert.java:
4.使用AvroParquetInputFormat來讀取parquet檔案:
package com.flinklearn.main
import java.util.Arrays
import com.flinklearn.models.{AvroTamAlert}
import org.apache.avro.Schema
import org.apache.avro.util.Utf8
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala.{ExecutionEnvironment}
import org.apache.flink.table.api.TableEnvironment
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat}
import org.apache.flink.api.scala._
import org.apache.parquet.avro.AvroParquetInputFormat
import org.apache.flink.table.api.scala._
/**
* Created by dell on 2018/10/23.
*/
class Main {
def startApp(): Unit ={
val env = ExecutionEnvironment.getExecutionEnvironment
val job = Job.getInstance()
val dIf = new HadoopInputFormat[Void, AvroTamAlert](new AvroParquetInputFormat(), classOf[Void], classOf[AvroTamAlert], job)
FileInputFormat.addInputPath(job, new Path("/user/hive/warehouse/xx.db/yy"))
AvroParquetInputFormat.setAvroReadSchema(job, AvroTamAlert.getClassSchema)
val dataset = env.createInput(dIf).map(line=>line.f1).map(line=>(line.getSip.toString,line.getDip.toString,line.getDport))
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataSet("tmp_table", dataset, 'msg)
val table = tableEnv.sqlQuery("select msg from tmp_table")
tableEnv.toDataSet[(String,String,Integer)](table).print()
env.execute("start hdfs parquet test")
}
}
object Main {
def main(args:Array[String]):Unit = {
new Main().startApp()
}
}
2.2 CSV
需要新增的引數在2.3小節中。
package com.flinklearn.main
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
/**
* Created by dell on 2018/10/25.
*/
object Main {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataset:DataSet[(String,Integer)] = env.readCsvFile("hdfs://ip:8020/mytest")
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataSet("tmp_table", dataset, 'name, 'num)
val table = tableEnv.sqlQuery("select name,num from tmp_table")
val rtnDataset = tableEnv.toDataSet[(String,Integer)](table)
rtnDataset.print()
env.execute("test hdfs csvfile")
}
}
2.3 TextFile
有幾個關鍵的引數必須加到flink-conf.yaml檔案中:
第一個引數指定Hadoop的配置檔案
第二個引數指定模式為舊模式,因為flink1.6.1是用的scala2.11,使用scala介面會存在一定的問題(報jobgraph生成失敗,目前還不清楚具體原因)
第三個引數指定類載入順序(如果不指定,會報hdfs 不可讀取塊錯誤)
同時,需要將flink-hadoop-compatibility_2.11-1.6.1.jar放到flink/lib資料夾下,pom裡打包沒有用。
以上操作做完,就可以正確的讀取hdfs上的檔案了:
package com.flinklearn.main
import org.apache.flink.api.java.io.RowCsvInputFormat
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopInputFormat
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.table.api.TableEnvironment
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{CombineTextInputFormat, TextInputFormat, FileInputFormat}
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
/**
* Created by dell on 2018/10/25.
*/
object Main {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val dataset:DataSet[(LongWritable,Text)] = env.createInput(HadoopInputs.readHadoopFile[LongWritable,Text](
new CombineTextInputFormat,
classOf[LongWritable],
classOf[Text],
"/mytest"
))
val transDataset = dataset.map(line=>{
val lines = line._2.toString.split(",")
if(lines.length == 2){
(lines(0).toString,lines(1).toInt)
}else{
null
}
}).filter(line=>line!=null)
print(transDataset.count())
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerDataSet("tmp_table", transDataset, 'name, 'num)
val table = tableEnv.sqlQuery("select name,num from tmp_table")
val rtnDataset = tableEnv.toDataSet[(String,Integer)](table)
rtnDataset.print()
env.execute("test hdfs textfile")
}
}
3.Hive
2.在pom檔案中新增依賴:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>com.jolbox</groupId>
<artifactId>bonecp</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<type>pom</type>
</dependency>
3.在flink-lib中新增下面所有jar:
accumulo-core-1.6.0.jar derby-10.11.1.1.jar hive-serde-1.2.0.jar mail-1.4.1.jar
accumulo-fate-1.6.0.jar derbyclient-10.14.2.0.jar hive-service-1.2.0.jar maven-scm-api-1.4.jar
accumulo-start-1.6.0.jar eigenbase-properties-1.1.5.jar hive-shims-0.20S-1.2.0.jar maven-scm-provider-svn-commons-1.4.jar
accumulo-trace-1.6.0.jar flink-dist_2.11-1.6.1.jar hive-shims-0.23-1.2.0.jar maven-scm-provider-svnexe-1.4.jar
activation-1.1.jar flink-hadoop-compatibility_2.11-1.6.1.jar hive-shims-1.2.0.jar netty-3.7.0.Final.jar
ant-1.9.1.jar flink-python_2.11-1.6.1.jar hive-shims-common-1.2.0.jar opencsv-2.3.jar
ant-launcher-1.9.1.jar flink-shaded-hadoop2-uber-1.6.1.jar hive-shims-scheduler-1.2.0.jar oro-2.0.8.jar
antlr-2.7.7.jar geronimo-annotation_1.0_spec-1.1.1.jar hive-testutils-1.2.0.jar paranamer-2.3.jar
antlr-runtime-3.4.jar geronimo-jaspic_1.0_spec-1.0.jar httpclient-4.4.jar parquet-hadoop-bundle-1.6.0.jar
apache-curator-2.6.0.pom geronimo-jta_1.1_spec-1.1.1.jar httpcore-4.4.jar parquet-hive-bundle-1.6.0.jar
apache-log4j-extras-1.2.17.jar groovy-all-2.1.6.jar ivy-2.4.0.jar pentaho-aggdesigner-algorithm-5.1.5-jhyde.jar
asm-commons-3.1.jar guava-14.0.1.jar janino-2.7.6.jar php
asm-tree-3.1.jar guava-15.0.jar jcommander-1.32.jar plexus-utils-1.5.6.jar
avro-1.7.5.jar hamcrest-core-1.1.jar jdo-api-3.0.1.jar postgresql-42.0.0.jar
bonecp-0.8.0.RELEASE.jar hive-accumulo-handler-1.2.0.jar jetty-all-7.6.0.v20120127.jar py
calcite-avatica-1.2.0-incubating.jar hive-ant-1.2.0.jar jetty-all-server-7.6.0.v20120127.jar regexp-1.3.jar
calcite-core-1.2.0-incubating.jar hive-beeline-1.2.0.jar jline-2.12.jar servlet-api-2.5.jar
calcite-linq4j-1.2.0-incubating.jar hive-cli-1.2.0.jar joda-time-2.5.jar slf4j-log4j12-1.7.7.jar
curator-client-2.6.0.jar hive-common-1.2.0.jar jpam-1.1.jar snappy-java-1.0.5.jar
curator-framework-2.6.0.jar hive-contrib-1.2.0.jar json-20090211.jar ST4-4.0.4.jar
curator-recipes-2.6.0.jar hive-exec-1.2.0.jar jsr305-3.0.0.jar stax-api-1.0.1.jar
datanucleus-api-jdo-3.2.1.jar hive-hbase-handler-1.2.0.jar jta-1.1.jar stringtemplate-3.2.1.jar
datanucleus-api-jdo-3.2.6.jar hive-hcatalog-core-1.2.2.jar junit-4.11.jar super-csv-2.2.0.jar
datanucleus-core-3.2.10.jar hive-hwi-1.2.0.jar libfb303-0.9.2.jar tempus-fugit-1.1.jar
datanucleus-core-3.2.2.jar hive-jdbc-1.2.0.jar libthrift-0.9.2.jar velocity-1.5.jar
datanucleus-rdbms-3.2.1.jar hive-jdbc-1.2.0-standalone.jar log4j-1.2.16.jar xz-1.0.jar
datanucleus-rdbms-3.2.9.jar hive-metastore-1.2.0.jar log4j-1.2.17.jar zookeeper-3.4.6.jar
4.下載hive1.2.0版本(根據自己的需要來),將hive-site.xml拷貝一份到hive/conf目錄下;啟動hive thrift metastore
5.即可以讀取hive表:
package com.flinklearn.main
import com.flinklearn.models.Alert
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.hadoop.conf.Configuration
/**
* Created by dell on 2018/10/25.
*/
object Main {
def main(args:Array[String]):Unit = {
val conf = new Configuration()
conf.set("hive.metastore.local", "false")
conf.set("hive.metastore.uris", "thrift://ip:9083")
val env = ExecutionEnvironment.getExecutionEnvironment
val dataset = env.createInput(new HCatInputFormat[Alert]("db", "tb", conf))
dataset.first(10).print()
env.execute("flink hive test")
}
}
4.JDBC
package com.flinklearn.main;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
/**
* Created by dell on 2018/10/29.
*/
public class Main {
public static void main(String[] args){
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
JDBCInputFormat inputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://ip:port/nsc")
.setUsername("username")
.setPassword("password")
.setQuery("select xx,yy from zz")
.setRowTypeInfo(new RowTypeInfo(TypeInformation.of(String.class), TypeInformation.of(String.class)))
.finish();
DataSource source = env.createInput(inputFormat);
source.print();
env.execute("jdbc test");
} catch (Exception e) {
e.printStackTrace();
}
}
}