kafka的connect實施
阿新 • • 發佈:2018-12-12
一.程式碼準備
GET http://localhost:8083/connectors/ header部分要設定key 和 value 分別是: Content-Type: application/json GET http://localhost:8083/connector-plugins/ POST http://localhost:8083/connectors/ ---------------------------------------- { "name": "hdfs-hive-sink-01", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "test_source", "hdfs.url": "hdfs://localhost:9000", "flush.size": "1", "hive.integration":"true", "hive.metastore.uris":"thrift://localhost:9083", "schema.compatibility":"BACKWARD" } } -------------//這一串程式碼是放到body部分的,選擇row,放進去再post 在xshell中開啟一個終端,輸入以下程式碼: kafka-avro-console-producer --broker-list localhost:9092 --topic test_source --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' 然後就可以在下面屬於資料進行嘗試:(注意資料的格式一定是你上面寫的格式) {"f1":"f1_001"} {"f1":"f1_001"} 然後再開啟第二個終端輸入以下程式碼後回車: kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test_source //在第一個終端中輸入的資料,第二個視窗就會及時更新 在以上的Post操作完成後就可以進行傳到hdfs的操作: GET http://localhost:8083/connectors/ GET http://localhost:8083/connectors/hdfs-hive-sink-01/status //檢查連線狀態 ,可省略 GET http://localhost:8083/connectors/hdfs-hive-sink-01/config //檢查配置資訊 ,可省略 然後再回到xshell開啟第三個終端: 輸入: bee show tables; //如果有多個數據庫要先show databases; use databases——name; 就可以看到根據在第一個終端輸入的資料相應的生成了一個表
二.檢查服務連線狀態、開啟所需的服務
1.看一下當前有哪些服務開啟
2.用ops命令可以檢視哪些命令對應哪些服務
3.用ops start kafka 開啟服務
4. 再檢視服務開啟狀態
5.如果是重啟服務可以用ops restart kafka
6.關閉服務用 ops stop kafka
7.用ops status 檢視當前還有哪些服務開著,需要關閉的可以用 kill -9 命令殺死
8. 也可以用 ops start mask後面跟0,1 來開啟指定的服務
三.連線操作
1.輸入第一個url進行試水(可省略)會返回版本資訊等
2.GET這個connector
注意寫header:
3.再GET connector-plugins
4.POST connector
注意:這個post操作要寫body,先選擇body-->>row -->>寫程式碼
5.解釋一下這個body
{ "name": "hdfs-hive-sink-01", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1",//表示支援最大執行緒是一個 "topics": "test_source", //表示從"test_source"讀資料 "hdfs.url": "hdfs://localhost:9000",//給出hadoop hdfs的地址 "flush.size": "1",//多少個記錄往裡面寫一次 "hive.integration":"true",//表示支援hive "hive.metastore.uris":"thrift://localhost:9083",//給出hive的url "schema.compatibility":"BACKWARD" } }
6.然後回到xshell開啟一個視窗輸入相應命令:然後就可以輸入資料了{"f1":"f1_001"} {"f1":"f2_002"} {"f1":"f3_003"}
kafka-avro-console-producer --broker-list localhost:9092 --topic test_source --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
7.在第二個視窗執行以下命令然後回車,它就會自動接收第一個視窗輸入的資料({"f1":"f1_001"} {"f1":"f2_002"} {"f1":"f3_003"}
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test_source
8.然後再GET connector
9.檢視連線狀態(可省略)
10.檢視相應的資訊(可省略)
11.回到終端開啟第三個連線視窗,輸入bee,然後show tables; 你就會看到生成此時hdfs根據第一個連線視窗輸入的資料生成了一個表,在第一個視窗有資料更新寫入時,這也會相應的改變
三.刪除相應的連線
今天操作的圖解