1. 程式人生 > >confluent+mysql實現實時資料交換

confluent+mysql實現實時資料交換

2014 年的時候,Kafka 的三個主要開發人員從 LinkedIn 出來創業,開了一家叫作 Confluent 的公司。和其他大資料公司類似,Confluent 的產品叫作 Confluent Platform。這個產品的核心是 Kafka,分為三個版本:Confluent Open Source、Confluent Enterprise 和 Confluent Cloud。

安裝jdbc-mysql-driver

wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz
tar xzvf mysql-connector-java-5
.1
.39.tar.gz sed -i '$a export CLASSPATH=/root/mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar:$CLASSPATH' /etc/profile source /etc/profile

安裝confluent

下載confluent的tar包解壓安裝。

cd /usr/local
# tar zxvf confluent.tar.gz

confluent平臺各元件的預設埠號

ComponentDefault Port
Zookeeper 2181
Apache Kafka brokers (plain text)9092
Schema Registry REST API8081
REST Proxy8082
Kafka Connect REST API8083
Confluent Control Center9021

confluent的mysql資料來源配置

建立一個confluent從mysql載入資料的配置檔案quickstart-mysql.properties

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.user=root
connection.password=root
connection.url=jdbc:
mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true #資料表白名單 #table.whitelist=t1 mode=timestamp+incrementing timestamp.column.name=modified incrementing.column.name=id #topic的字首,confulent平臺會為每張表建立一個topic,topic的名稱為字首+表名 topic.prefix=mysql-test-


自定義查詢模式:

如果使用上面的配置來啟動服務,則confluent平臺將會監測拉取所有表的資料,有時候可能並不需要這樣做,confulent平臺提供了自定義查詢模式。配置參考如下:

#User defined connector instance name
name=mysql-whitelist-timestamp-source
#The classimplementingtheconnector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
#Maximum number of tasks to run for this connector instance
tasks.max=10

connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true
connection.user=root
connection.password=root
query=SELECT f.`name`,p.price,f.create_time from foods f join price p on (f.id = p.food_id)
mode=timestamp
timestamp.column.name=timestamp

topic.prefix=mysql-joined-data


query模式下使用where查詢語句容易造成kafka拼接sql錯誤,最好採用join

1.啟動zookeeper

因為zookeeper是一個長期的服務,最好在後臺執行,同時需要有寫許可權到/var/lib在這一步以及之後的步驟,如果沒有許可權請檢視安裝confulent的使用者是否具有/var/lib的寫許可權

# cd /usr/local/confulent-3.2.2
# ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &
# 以守護程序方式啟動
# sudo confluent-3.2.2/bin/zookeeper-server-start -daemon /etc/kafka/zookeeper.properties

停止zookeeper

$ ./bin/zookeeper-server-stop

2.啟動kafka

# cd /usr/local/confluent-3.2.2
# ./bin/kafka-server-start ./etc/kafka/server.properties &

停止kafka服務

./bin/kafka-server-stop

3.啟動Schema Registry

# cd /usr/local/confluent-3.2.2
# ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &

停止schema-registry

# ./bin/schema-registry-stop

4.啟動監聽mysql資料的producer

# cd /usr/local/confluent-3.2.2
# ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/quickstart-mysql.properties &

5.啟動消費資料的consumer

# cd /usr/local/confluent-3.2.2
#./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic mysql-test-t1 --from-beginning

測試sql

DROP TABLE IF EXISTS `t1`;
CREATE TABLE `t1` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(200) DEFAULT NULL,
  `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `modified` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of t1
-- ----------------------------
INSERT INTO `t1` VALUES ('1', 'aa', '2017-07-10 08:03:51', '2017-07-10 23:03:30');
INSERT INTO `t1` VALUES ('3', 'bb', '2017-07-10 08:03:45', '2017-07-10 23:03:34');
INSERT INTO `t1` VALUES ('4', '年內', '2017-07-10 08:05:51', '2017-07-10 23:05:45');
INSERT INTO `t1` VALUES ('5', '年內', '2017-07-10 08:44:28', '2017-07-10 23:15:45');
INSERT INTO `t1` VALUES ('6', '公共', '2017-07-18 06:05:11', '2017-07-18 21:04:58');
INSERT INTO `t1` VALUES ('7', '哈哈', '2017-07-18 19:05:04', '2017-07-18 07:32:13');
INSERT INTO `t1` VALUES ('8', '公共經濟', '2017-07-27 20:33:10', '2017-07-18 07:34:43');

資料插入語句

INSERT INTO `t1` (name,createtime,modified)VALUES ('公共經濟2', '2017-07-27 20:33:10', '2017-07-18 07:34:43');

插入新資料後將會在consumer端實時輸出我們插入的資料

{"id":7,"name":{"string":"哈哈"},"createtime":1500429904000,"modified":1500388333000}
{"id":8,"name":{"string":"公共經濟"},"createtime":1501212790000,"modified":1500388483000}
{"id":9,"name":{"string":"公共經濟1"},"createtime":1501212790000,"modified":1500388483000}
{"id":10,"name":{"string":"公共經濟2"},"createtime":1501212790000,"modified":1500388483000}

關於confluent的使用國內目前使用似乎很少,相關的中文文件也極少。本文是去年7月份我在做實時資料交換技術調研是根據官方文件實踐的記錄。


相關推薦

confluent+mysql實現實時資料交換

2014 年的時候,Kafka 的三個主要開發人員從 LinkedIn 出來創業,開了一家叫作 Confluent 的公司。和其他大資料公司類似,Confluent 的產品叫作 Confluent Platform。這個產品的核心是 Kafka,分為三個版本:Confluen

基於echarts實現實時資料傳輸效果

效果圖: 程式碼片段: const targetCoord = [1000, 140] const curveness = 0.2 const linesData = [] const categories = [{ name: '流入中', itemStyle: {

使用AS2(http)協議實現商用資料交換B2B (一) [譯]

前言 公司的 B2B 系統要使用 AS2 這種古老的協議跟客戶做對接,主要是國外客戶,國內基本上都是 FTP。網上關於 AS2 的文章和 github 上可用的輪子都非常少,所以我翻譯了一些 AS2 的文章供參考學習。Applicability Statement

使用hibernate連結MySql實現新增資料功能

開發工具: MyEclipse2013 , 資料庫: MySql 1.首先, 在資料庫中建立資料庫 , 我使用的資料庫工具是SQLyog. 建立如下資料庫: 資料庫建立完成後開啟MyEcl

php與mysql實現使用者資料的增刪改查

首先可建立一個儲存常量的config.php: <?php define('MYSQL_HOST','localhost'); define('MYSQL_USER','root'); define('MYSQL_PW','');然後再建立一個儲存函式的檔案funct

使用Hibernate連線MySQL實現新增資料功能

開發工具: MyEclipse2013 , 資料庫: MySql 1.首先, 在資料庫中建立資料庫 , 我使用的資料庫工具是SQLyog. 建立如下資料庫: 資料庫建立完成後開啟MyEclispe 2.建立Web Project 2.1: 第一項: 導包 需要匯

Swoole WebSocket 實現mysql實時資料展示

最近在學習swoole,自己完成下論壇裡留下的作業,通過swoole_websocket實時展示mysql資料,有個遺留問題,如何判斷mysql是否有增刪改,我想到的方法有: 1、在應用層中有增刪改時,發一個訊息到訊息佇列來監聽。 2、解析mysql bin-log日誌。 3、觸發器。 現在的解決辦

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇)

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇) 2018年03月04日 23:05:29 冰 河 閱讀數:1602更多 所屬專欄: Hadoop生態 版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https:/

canal實戰(一):canal連線kafka實現實時同步mysql資料

前面已經介紹過了canal-kafka的應用。canal-kafka是把kafka作為客戶端,嵌入到canal中,並且在canal基礎上對原始碼進行了修改,以達到特定的實現canal到kafka的傳送。 canal-kafka是阿里雲最近更新的一個新的

Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(程式案例篇)

一、前言二、簡單介紹為了方便,這裡我們只是簡單的向/home/flume/log.log中追加單詞,每行一個單詞,利用Storm接收每個單詞,將單詞計數更新到資料庫,具體的邏輯為,如果資料庫中沒有相關單詞,則將資料插入資料庫,如果存在相關單詞,則更新資料庫中的計數。具體SQL

c++實現資料交換的方法

c++實現資料交換的方法有四種: 1、通過指標完成: template void swap(T *px, T *py){ T temp = *px; *px = *py; *py = temp; } 其呼叫形式:swap(&px, &py) 2通過引用完成:(c++特有

樹莓派/PC實現實時攝像頭資料共享(Python—picamera)

上次實驗使用Python—OpenCV實現,發現傳輸效果並不是很理想,接下來使用Python和picamera實現樹莓派/PC實時攝像頭資料共享,主要也可分為伺服器和客戶端兩部分。 伺服器Demo如下: import numpy as np import cv2 import socke

樹莓派/PC實現實時攝像頭資料共享(Python—OpenCV)

使用Python和OpenCV實現樹莓派/PC實時攝像頭資料共享,主要分為伺服器和客戶端兩部分。 伺服器Demo如下: #伺服器端 import socket import threading import struct import time import cv2 import nu

用Fluent實現MySQL到ODPS資料整合

安裝ruby 首先通過 /etc/issue 命令檢視當前使用centos是哪個版本: [[email protected] ~]$  cat /etc/issue 由於centos版本是6.6,安裝ruby時就要選擇在ce

solr 7+tomcat 8 + mysql實現solr 7基本使用(安裝、整合中文分詞器、定時同步資料庫資料以及專案整合)

基本說明 Solr是一個開源專案,基於Lucene的搜尋伺服器,一般用於高階的搜尋功能; solr還支援各種外掛(如中文分詞器等),便於做多樣化功能的整合; 提供頁面操作,檢視日誌和配置資訊,功能全面。 solr 7 + tomcat 8實現solr 7的安裝 Sol

關於使用python來實現mysql自動生成資料

注:環境 windows 7 旗艦版 python 3.6.4 xlrd模組 pymysql模組 mysql 8.0.12 前幾天拿到一個專案需要在資料庫建立‘一堆’的表!於是就有了一個偷懶的想法! 經過努力終於完成了‘乞丐版’程式碼如下: # -*-

mysql實現跨伺服器查詢資料

在日常的開發中經常進行跨資料庫進行查詢資料。 同伺服器下跨資料庫進行查詢在表前加上資料庫名就可以查詢到資料。 mysql跨伺服器進行查詢提供了FEDERATED引擎進行對映表,然後進行查詢。 mysql資料庫federated引擎是關閉的,首先需要先啟用該引擎。

阿里如何實現海量資料實時分析?

阿里妹導讀:隨著資料量的快速增長,越來越多的企業迎來業務資料化時代,資料成為了最重要的生產資料和業務升級依據。本文由阿里AnalyticDB團隊出品,近萬字長文,首次深度解讀阿里在海量資料實時分析領域的多項核心技術。   數字經濟時代已經來臨,希望能和業界同行共同探索,加速行

WebSocket實現實時推送資料到前端

@Component @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer{ @Resource goodsWebSocketHandler handler;

詳細步驟!!!idea+springboot+mybatis+jsp+bootstrap實現mysql查詢出資料並顯示(原始碼)

實現效果: 資料庫對應資料: 開發環境: IntelliJ IDEA 2017.2.5 x64 java version "1.8.0_151" x64 mysql 6.0.11-alpha-community x64 步驟: 1.建立工程: file--new-