Flume將MySQL表資料存入到HDFS
Flume將MySQL表資料存入到HDFS
一、建立MySQL表
-- ---------------------------- -- Table structure for t_name -- ---------------------------- DROP TABLE IF EXISTS `t_name`; CREATE TABLE `t_name` ( `id` int(11) NOT NULL AUTO_INCREMENT, `sip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, `dip` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, `sport` int(11) NULL DEFAULT NULL, `dport` int(11) NULL DEFAULT NULL, `protocol` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL, `flowvalue` int(11) NULL DEFAULT NULL, `createtime` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 21 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of t_name -- ---------------------------- INSERT INTO `t_name` VALUES (1, '76.58.692.49', '23.28.380.27', 53, 17, 'TCP', 10, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (2, '36.30.754.95', '21.19.847.60', 92, 61, 'TCP', 56, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (3, '29.65.205.81', '61.41.360.21', 79, 44, 'TCP', 45, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (4, '69.65.715.32', '90.60.887.73', 82, 89, 'TCP', 25, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (5, '92.51.427.29', '86.42.538.10', 98, 96, 'TCP', 11, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (6, '10.43.459.69', '42.16.826.51', 77, 32, 'TCP', 53, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (7, '40.52.822.52', '37.87.208.90', 79, 77, 'TCP', 12, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (8, '99.49.363.76', '53.13.402.25', 81, 90, 'TCP', 30, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (9, '94.90.526.47', '80.29.188.65', 29, 62, 'TCP', 62, '2019-03-05 12:25:47'); INSERT INTO `t_name` VALUES (10, '37.84.816.99', '31.64.935.94', 27, 45, 'TCP', 30, '2019-03-05 12:25:47');
二、建立相關狀態檔案與HDFS目標目錄
1. 建立狀態檔案
mkdir /home/lwenhao/flume
cd /home/lwenhao/flume
touch sql-source.status
chmod -R 777 /home/lwenhao/flume
2. 建立HDFS目錄
hdfs fs -mkdir /flume/mysql
hdfs fs -chmod -R 777 /flume/mysql
三、匯入JAR包
我安裝的是MySQL5.7
版本需要flume-ng-sql-source-1.x.x.jar
與mysql-connector-java-5.x.x-bin.jar
下載完成之後,把這兩個jar包複製到/apache-flume-1.9.0-bin/lib/
四、配置Flume
# Channel名稱 agent.channels = ch1 # Sink名稱 agent.sinks = HDFS # Source名稱 agent.sources = sql-source # Agent的channel型別 agent.channels.ch1.type = memory # Source對應的channel名稱 agent.sources.sql-source.channels = ch1 # Source型別 agent.sources.sql-source.type = org.keedio.flume.source.SQLSource # 資料庫URL agent.sources.sql-source.hibernate.connection.url = jdbc:mysql://192.168.1.69:3306/t_hadoop # 資料庫使用者名稱 agent.sources.sql-source.hibernate.connection.user = root # 資料庫密碼 agent.sources.sql-source.hibernate.connection.password = root # 資料庫表名 agent.sources.sql-source.table = t_name # 查詢的列 agent.sources.sql-source.columns.to.select = * # 查詢的列 agent.sources.sql-source.incremental.column.name = id # 增量初始值 agent.sources.sql-source.incremental.value = 0 # 發起查詢的時間間隔,單位是毫秒 agent.sources.sql-source.run.query.delay=5000 # 狀態檔案路徑 agent.sources.sql-source.status.file.path = /home/lwenhao/flume # 狀態檔名稱 agent.sources.sql-source.status.file.name = sql-source.status # Sink對應的channel名稱 agent.sinks.HDFS.channel = ch1 # Sink型別 agent.sinks.HDFS.type = hdfs # Sink路徑 agent.sinks.HDFS.hdfs.path = hdfs://192.168.1.67:9001/flume/mysql # 流資料的檔案型別 agent.sinks.HDFS.hdfs.fileType = DataStream # 資料寫入格式 agent.sinks.HDFS.hdfs.writeFormat = Text # 目標檔案輪轉大小,單位是位元組 agent.sinks.HDFS.hdfs.rollSize = 268435456 # hdfs sink間隔多長將臨時檔案滾動成最終目標檔案,單位是秒;如果設定成0,則表示不根據時間來滾動檔案 agent.sinks.HDFS.hdfs.rollInterval = 0 # 當events資料達到該數量時候,將臨時檔案滾動成目標檔案;如果設定成0,則表示不根據events資料來滾動檔案 agent.sinks.HDFS.hdfs.rollCount = 0
五、 啟動flume
bin/flume-ng agent --conf conf/ --name agent --conf-file conf/flume-conf.conf
六、 效果
相關推薦
Flume將MySQL表資料存入到HDFS
浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>
Flume將MySQL表資料存入到HBase
浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>
利用Flume將MySQL表資料準實時抽取到HDFS、MySQL、Kafka
軟體版本號 jdk1.8、apache-flume-1.6.0-bin、kafka_2.8.0-0.8.0、zookeeper-3.4.5叢集環境安裝請先測試; 參考以下作者資訊,特此感謝;http://blog.csdn.net/wzy0623/article/detail
利用Flume將MySQL表資料準實時抽取到HDFS
一、為什麼要用到Flume 在以前搭建HAWQ資料倉庫實驗環境時,我使用Sqoop抽取從MySQL資料庫增量抽取資料到HDFS,然後用HAWQ的外部表進行訪問。這種方式只需要很少量的配置即可完成資料抽取任務,但缺點同樣明顯,那就是實時性。Sqoop使用MapRe
使用Flume將MySQL表資料實時抽取到hadoop
一、為什麼要用到Flume 在以前搭建HAWQ資料倉庫實驗環境時,我使用Sqoop抽取從MySQL資料庫增量抽取資料到HDFS,然後用HAWQ的外部表進行訪問。這種方式只需要很少量的配置即可完成資料抽取任務,但缺點同樣明顯,那就是實時性。Sqoop使用MapReduce
Windows64環境下 使用Flume將Mysql增量資料同步到Kafka
一.軟體準備1.jdk1.74.maven 下載地址 二.安裝並啟動Kafka1.安裝kafka此步驟看文章,比較詳細,相信不會有問題2.按順序啟動kafka(windows cmd下) 2.1
用sqoop將mysql的資料匯入到hive表中,原理分析
Sqoop 將 Mysql 的資料匯入到 Hive 中 準備Mysql 資料 如圖所示,準備一張表,資料隨便造一些,當然我這裡的資料很簡單。 編寫命令 編寫引數檔案 個人習慣問題,我喜歡把引數寫到檔案裡,然後再命令列引用。 vim mysql-info, #
python將mysql表中資料抽取到另一個mysql庫中,持續更新抽取到oracle中
import MySQLdb import ConfigParser class Mysql2Mysql(object): def getConn(self,filename,dbname): cf = ConfigParser.ConfigPars
用sqoop將mysql的資料匯入到hive表中
用sqoop將mysql的資料匯入到hive表中 1:先將mysql一張表的資料用sqoop匯入到hdfs中 準備一張表 需求 將 bbs_product 表中的前100條資料導 匯出來 只要id brand_id和 na
應對sharding-jdbc結合mybatis實現分庫分表功能 分表的聯合查詢採用將mysql的資料同步到elasticsearch進行篩選
應對sharding-jdbc結合mybatis實現分庫分表功能 分表的聯合查詢採用將mysql的資料同步到elasticsearch進行篩選 安裝操作指南:(1)、(2) 其中windows目錄展示如下: 版本控制:1. 需要jdk:1.8(1.8.0_60)
mysql 表資料操作
INSERT INTO USER VALUES(10,'root','root','[email protected]'); INSERT INTO USER(username,password,email) VALUES('admin','123','[email pr
資料匯入終章:如何將HBase的資料匯入HDFS?
我們的最終目標是將資料匯入Hadoop,在之前的章節中,我們介紹瞭如何將傳統關係資料庫的資料匯入Hadoop,本節涉及到了HBase。HBase是一種實時分散式資料儲存系統,通常位於與Hadoop叢集相同的硬體上,或者與Hadoop叢集緊密相連,能夠直接在MapReduce中使用HBase資料,或將
通過管道傳輸快速將MySQL的資料匯入Redis(自己做過測試)
通過管道傳輸快速將MySQL的資料匯入Redis 通過管道傳輸pipe將MySQL資料批量匯入Redis 自Redis 2.6以上版本起,Redis支援快速大批量匯入資料,即官網的Redis Mass Insertion,即
Python以太坊互動將區塊鏈資料存入sql資料庫
關於區塊鏈介紹性的研討會通常以易於理解的點對點網路和銀行分類賬這類故事開頭,然後直接跳到編寫智慧合約,這顯得非常突兀。因此,想象自己走進叢林,想象以太坊區塊鏈是一個你即將研究的奇怪生物。今天我們將觀察該生物,並與其進行互動然後將有關它的所有資料收集到一個集中儲存中供自己使用。 進行第一次設
利用shell將mysql中資料匯出到檔案和執行mysql語句
利用mysqldump匯出mysql資料 匯出指定條件的資料庫 命令格式 mysqldump -u使用者名稱 -p密碼 -h主機 -P埠 資料庫名 表名 --where "sql語句" > 路徑 示例程式碼 #!/bin/bash #變數定義 host="127.0.
ABAP 通過控制代碼將內表資料寫入記憶體,然後在SMARTFORM中呼叫
首先要在SMARTFORM 中定義控制代碼。在全域性設定-》表格介面中定義PRT_HANDLER1和PRT_HANDLER2. 在全域性定義中定義和程式中內表結構相同的內表,注意在全域性資料中要定義工作區方便呼叫。 然後在程式中呼叫SMARTFORM,利用S
如何將mysql表結構匯出成Excel格式的(並帶備註)
方法一: 1.使用一個MySQL管理工具:SQLyog,點選選單欄“資料庫”下拉的最後一項: 匯出的格式如下: 2.要想轉成Excel格式的只需手動將該表複製到Excel中去。 方法二: 1.以下用的是Navicat Premium,可以換成
用Nginx採集日誌通過flume將日誌檔案儲存到HDFS上
安裝Tomcat 到官網下載apache-tomcat-7.0.69 開啟eclipse->window->preferences->server->runtime environments 編寫專案 Nginx
將MySQL中資料匯入到MongoDB中
第一步: 將user表從MySQL中匯出,右鍵,點選匯出嚮導,選擇格式為xlsx。 第二步: 匯出完成後,雙擊開啟user.xlsx,將user.xlsx另存為csv格式的檔案。(切記不可直接修改後綴名,會導致亂碼,無法匯入到MongoDB中,血的教訓) 第三步:
Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇)
Storm之——Storm+Kafka+Flume+Zookeeper+MySQL實現資料實時分析(環境搭建篇) 2018年03月04日 23:05:29 冰 河 閱讀數:1602更多 所屬專欄: Hadoop生態 版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https:/