kafka-connect-hive Sink插件入門指南
kafka-connect-hive是基於kafka-connect平臺實現的hive數據讀取和寫入插件,主要由source、sink兩部分組成,source部分完成hive表數據的讀取任務,kafka-connect將這些數據寫入到其他數據存儲層中,比如hive到ES數據的流入。sink部分完成向hive表寫數據的任務,kafka-connect將第三方數據源(如MySQL)裏的數據讀取並寫入到hive表中。
在這裏我使用的是landoop公司開發的kafka-connect-hive插件,項目文檔地址Hive Sink,接下來看看如何使用該插件的sink部分。
環境準備
Apache Kafka 2.11-2.1.0
Confluent-5.1.0
Apache Hadoop 2.6.3
Apache Hive 1.2.1
Java 1.8
功能
支持KCQL路由查詢,允許將kafka主題中的所有字段或部分字段寫入hive表中
支持根據某一字段動態分區
支持全量和增量同步數據,不支持部分更新
開始使用
啟動依賴
1、啟動kafka:
cd kafka_2.11-2.1.0
bin/kafka-server-start.sh config/server.properties &
2、啟動schema-registry:
cd confluent-5.1.0
bin/schema-registry-start etc/schema-registry/schema-registry.properties &
schema-registry組件提供了kafka topic的schema管理功能,保存了schema的各個演變版本,幫助我們解決新舊數據schema兼容問題。這裏我們使用apache avro庫來序列化kafka的key和value,因此需要依賴schema-registry組件,schema-registry使用默認的配置。
3、啟動kafka-connect:
修改confluent-5.1.0/etc/schema-registry目錄下connect-avro-distributed.properties文件的配置,修改後內容如下:
# Sample configuration for a distributed Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The group ID is a unique identifier for the set of workers that form a single Kafka Connect
# cluster
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# Internal Storage Topics.
#
# Kafka Connect distributed workers store the connector and task configurations, connector offsets,
# and connector statuses in three internal topics. These topics MUST be compacted.
# When the Kafka Connect distributed worker starts, it will check for these topics and attempt to create them
# as compacted topics if they don‘t yet exist, using the topic name, replication factor, and number of partitions
# as specified in these properties, and other topic-specific settings inherited from your brokers‘
# auto-creation settings. If you need more control over these other topic-specific settings, you may want to
# manually create these topics before starting Kafka Connect distributed workers.
#
# The following properties set the names of these three internal topics for storing configs, offsets, and status.
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
# The following properties set the replication factor for the three internal topics, defaulting to 3 for each
# and therefore requiring a minimum of 3 brokers in the cluster. Since we want the examples to run with
# only a single broker, we set the replication factor here to just 1. That‘s okay for the examples, but
# ALWAYS use a replication factor of AT LEAST 3 for production environments to reduce the risk of
# losing connector offsets, configurations, and status.
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
# The config storage topic must have a single partition, and this cannot be changed via properties.
# Offsets for all connectors and tasks are written quite frequently and therefore the offset topic
# should be highly partitioned; by default it is created with 25 partitions, but adjust accordingly
# with the number of connector tasks deployed to a distributed worker cluster. Kafka Connect records
# the status less frequently, and so by default the topic is created with 5 partitions.
#offset.storage.partitions=25
#status.storage.partitions=5
# The offsets, status, and configurations are written to the topics using converters specified through
# the following required properties. Most users will always want to use the JSON converter without schemas.
# Offset and config data is never visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Confluent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=0.0.0.0
#rest.port=8083
# The Hostname & Port that www.dasheng178.com will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=0.0.0.0
#rest.advertised.port=8083
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
plugin.path=/kafka/confluent-5.1.0/plugins/lib
這裏需要設置plugin.path參數,該參數指定了kafka-connect插件包的保存地址,必須得設置。
下載kafka-connect-hive-1.2.1-2.1.0-all.tar.gz,解壓後將kafka-connect-hive-1.2.1-2.1.0-all.jar放到plugin.path指定的目錄下,然後執行如下命令啟動kafka-connect:
cd confluent-5.1.0
bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties
準備測試數據
1、在hive服務器上使用beeline執行如下命令:
# 創建hive_connect數據庫
create database hive_connect;
# 創建cities_orc表
use hive_connect;
create table cities_orc (city string, state string, population int, country string) stored as orc;
2、使用postman添加kafka-connect-hive sink的配置到kafka-connect:
URL:localhost:8083/connectors/
請求類型:POST
請求體如下:
{
"name": "hive-sink-example",
"config": {
"name": "hive-sink-example",
"connector.class": "com.landoop.streamreactor.connect.hive.sink.hiveSinkConnector",
"tasks.max": 1,
"topics": "hive_sink_orc",
"connect.hive.kcql": "insert into cities_orc select * from hive_sink_orc AUTOCREATE PARTITIONBY state STOREAS ORC WITH_FLUSH_INTERVAL = 10 WITH_PARTITIONING = DYNAMIC",
"connect.hive.database.name": "hive_connect",
"connect.hive.hive.metastore": "thrift",
"connect.hive.hive.metastore.uris": "thrift://quickstart.cloudera:9083",
"connect.hive.fs.defaultFS": "hdfs://www.michenggw.com quickstart.cloudera:9001",
"connect.hive.error.policy": "NOOP",
"connect.progress.enabled": true
}
}
開始測試,查看結果
啟動kafka producer,寫入測試數據,scala測試代碼如下:
class AvroTest {
/**
* 測試kafka使用avro方式生產數據
* 參考 https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
*/
@Test
def testProducer: Unit = {
// 設置kafka broker地址、序列化方式、schema registry組件的地址
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[io.confluent.kafka.serializers.KafkaAvroSerializer])
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[io.confluent.kafka.serializers.KafkaAvroSerializer])
props.put("schema.registry.url", "http://localhost:8081")
// 設置schema
val schema = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"state\",\"type\":\"string\"},{\"name\":\"population\",\"type\":\"int\"},{\"name\":\"country\",\"type\":\"string\"}]}"
val parser = new Schema.Parser()
val schema = parser.parse(schema)
// 構造測試數據
val avroRecord1 = new GenericData.Record(schema)
avroRecord1.put("city", "Philadelphia")
avroRecord1.put("state", "PA")
avroRecord1.put("population", 1568000)
avroRecord1.put("country", "USA")
val avroRecord2 = new GenericData.Record(schema)
avroRecord2.put("city", "Chicago")
avroRecord2.put("state", "IL")
avroRecord2.put("population", 2705000)
avroRecord2.put("country"www.lezongyule.com, "USA")
val avroRecord3 = new GenericData.Record(schema)
avroRecord3.put("city", "New York")
avroRecord3.put("state", "NY")
avroRecord3.put("population", 8538000)
avroRecord3.put("country", "USA")
// 生產數據
val producer = new KafkaProducer[String, GenericData.Record](props)
try {
val recordList = List(avroRecord1, avroRecord2, avroRecord3)
val key = "key1"
for (elem <- recordList) {
val record = new ProducerRecord("hive_sink_orc", key, elem)
for (i <- 0 to 100) {
val ack = producer.send(record).get()
println(s"${ack.toString} written to partition ${ack.partition.toString}")
}
}
} catch {
case e: Throwable => e.printStackTrace()
} finally {
// When you‘re finished producing records, you can flush the producer to ensure it has all been written to Kafka and
// then close the producer to free its resources.
// 調用flush方法確保所有數據都被寫入到Kafka
producer.flush()
// 調用close方法釋放資源
producer.close()
}
}
}
4、使用beeline查詢hive數據:
use hive_connect;
select * from cities_orc;
輸出部分結果如下:
+------------------+------------------------+---------------------+-------------------+--+
| cities_orc.city | cities_orc.population | cities_orc.country | cities_orc.state |
+------------------+------------------------+---------------------+-------------------+--+
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Chicago | 2705000 | USA | IL |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
| Philadelphia | 1568000 | USA | PA |
配置說明
KCQL配置
connect.hive.kcql中的配置項說明如下:
WITH_FLUSH_INTERVAL:long類型,表示文件提交的時間間隔,單位是毫秒
WITH_FLUSH_SIZE:long類型,表示執行提交操作之前,已提交到HDFS的記錄數
WITH_FLUSH_COUNT:long類型,表示執行提交操作之前,未提交到HDFS的記錄數
WITH_SCHEMA_EVOLUTION:string類型,默認值是MATCH,表示hive schema和kafka topic record的schema的兼容策略,hive connector會使用該策略來添加或移除字段
WITH_TABLE_LOCATION:string類型,表示hive表在HDFS中的存儲位置,如果不指定的話,將使用hive中默認的配置
WITH_OVERWRITE:boolean類型,表示是否覆蓋hive表中已存在的記錄,使用該策略時,會先刪除已有的表,再新建
PARTITIONBY:List<String>類型,保存分區字段。指定後,將從指定的列中獲取分區字段的值
WITH_PARTITIONING:string類型,默認值是STRICT,表示分區創建方式。主要有DYNAMIC和STRICT兩種方式。DYNAMIC方式將根據PARTITIONBY指定的分區字段創建分區,STRICT方式要求必須已經創建了所有分區
AUTOCREATE:boolean類型,表示是否自動創建表
Kafka connect配置
Kafka connect的配置項說明如下:
name:string類型,表示connector的名稱,在整個kafka-connect集群中唯一
topics:string類型,表示保存數據的topic名稱,必須與KCQL語句中的topic名稱一致
tasks.max :int類型,默認值為1,表示connector的任務數量
connector.class :string類型,表示connector類的名稱,值必須是com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector
connect.hive.kcql:string類型,表示kafka-connect查詢語句
connect.hive.database.name:string類型,表示hive數據庫的名稱
connect.hive.hive.metastore:string類型,表示連接hive metastore所使用的網絡協議
connect.hive.hive.metastore.uris:string類型,表示hive metastore的連接地址
connect.hive.fs.defaultF:string類型,表示HDFS的地址
kafka-connect-hive Sink插件入門指南