1. 程式人生 > >kafka的connect實施

kafka的connect實施

一.程式碼準備

GET   http://localhost:8083/connectors/

header部分要設定key 和 value 分別是: Content-Type:  application/json

GET   http://localhost:8083/connector-plugins/

POST  http://localhost:8083/connectors/
----------------------------------------
{
  "name": "hdfs-hive-sink-01",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test_source",
    "hdfs.url": "hdfs://localhost:9000",
    "flush.size": "1",
    "hive.integration":"true",
	"hive.metastore.uris":"thrift://localhost:9083",
	"schema.compatibility":"BACKWARD"
  }
}
-------------//這一串程式碼是放到body部分的,選擇row,放進去再post


在xshell中開啟一個終端,輸入以下程式碼:

kafka-avro-console-producer --broker-list localhost:9092 --topic test_source --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

然後就可以在下面屬於資料進行嘗試:(注意資料的格式一定是你上面寫的格式)
{"f1":"f1_001"}
{"f1":"f1_001"}

然後再開啟第二個終端輸入以下程式碼後回車:
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test_source
//在第一個終端中輸入的資料,第二個視窗就會及時更新

在以上的Post操作完成後就可以進行傳到hdfs的操作:

GET     http://localhost:8083/connectors/


GET     http://localhost:8083/connectors/hdfs-hive-sink-01/status 
//檢查連線狀態 ,可省略  
GET     http://localhost:8083/connectors/hdfs-hive-sink-01/config
//檢查配置資訊 ,可省略 


然後再回到xshell開啟第三個終端:
輸入:
bee
show tables;  //如果有多個數據庫要先show databases; use databases——name;
就可以看到根據在第一個終端輸入的資料相應的生成了一個表

二.檢查服務連線狀態、開啟所需的服務

1.看一下當前有哪些服務開啟

2.用ops命令可以檢視哪些命令對應哪些服務

3.用ops start kafka 開啟服務

4. 再檢視服務開啟狀態

5.如果是重啟服務可以用ops restart kafka 

6.關閉服務用 ops stop kafka

 

7.用ops status 檢視當前還有哪些服務開著,需要關閉的可以用 kill -9 命令殺死

 

8. 也可以用 ops start mask後面跟0,1 來開啟指定的服務

三.連線操作

1.輸入第一個url進行試水(可省略)會返回版本資訊等

2.GET這個connector

注意寫header: 

3.再GET connector-plugins

4.POST connector

注意:這個post操作要寫body,先選擇body-->>row -->>寫程式碼

5.解釋一下這個body

{
  "name": "hdfs-hive-sink-01",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",//表示支援最大執行緒是一個
    "topics": "test_source",  //表示從"test_source"讀資料
    "hdfs.url": "hdfs://localhost:9000",//給出hadoop hdfs的地址
    "flush.size": "1",//多少個記錄往裡面寫一次
    "hive.integration":"true",//表示支援hive
 "hive.metastore.uris":"thrift://localhost:9083",//給出hive的url
 "schema.compatibility":"BACKWARD"
  }
}

6.然後回到xshell開啟一個視窗輸入相應命令:然後就可以輸入資料了{"f1":"f1_001"} {"f1":"f2_002"} {"f1":"f3_003"}

kafka-avro-console-producer --broker-list localhost:9092 --topic test_source --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

7.在第二個視窗執行以下命令然後回車,它就會自動接收第一個視窗輸入的資料({"f1":"f1_001"} {"f1":"f2_002"} {"f1":"f3_003"}

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test_source

8.然後再GET connector

9.檢視連線狀態(可省略)

10.檢視相應的資訊(可省略)

11.回到終端開啟第三個連線視窗,輸入bee,然後show tables; 你就會看到生成此時hdfs根據第一個連線視窗輸入的資料生成了一個表,在第一個視窗有資料更新寫入時,這也會相應的改變

三.刪除相應的連線

今天操作的圖解