大資料之sqoop
4 大資料協作框架之sqoop
- Hadoop 2.x
- HDFS
- YARN
- MapReduce
- Zookeeper
- Hive
- 對日誌型別的海量資料
- hdfs
- mr, hive - hql
一、面臨的問題
第一個問題:hdfs
-
檔案來源於哪裡?
-
資料怎樣儲存到hdfs ? 海量
-
現實中的資料來源於兩個方便
- RDBMS(Oracle,MySQL,DB2....) >>>> sqoop(SQL to HADOOP)(底層都是mapreduce)
- 檔案(apache,nginx日誌資料) >>>> Flume(實時抽取資料)
第二個問題:排程任務
- 對資料分析的任務 job,至少都是上千(網際網路公司)的,我們怎樣去排程(排程任務)
- 怎樣執行,多長執行一次,執行頻率
- 某一些業務的分析,需要許多job任務共同完成,相互依賴關係,工作流怎樣去排程 ?
- 可用框架
- Oozie :apache的
- 宙斯:阿里的
第三個問題:監控
- hadoop 2.x 生態系統中重要的框架,8個,怎樣統一的監控?
- 統一的 web ui 接面,管理框架,監控框架
- Hue
二、回顧
- hadoop 2.x
- HDFS
- yarn
- mapreduce
- Hive
三、Sqoop 概述
Sqoop工作機制是將匯入或到處命令翻譯成mapreduce程式來實現。在翻譯出的mapreduce
3.1 Sqoop 架構
- Hdaoop 生態系統包括:HDFS、Hive、Hbase等
- RDBMS體系包括:Mysql、Orcale、DB2等
- Sqoop的理解:SQL 到 Hadoop 和 Hadoop 到 SQL
3.2 對於資料流的理解
站在Apache立場上看待資料流問題,可以分為資料的匯入和匯出
- Import 資料匯入:RDBMS ----> Hadoop
- Export 資料匯出:Hadoop ----> RDBMS
四、sqoop 使用要點
4.1 sqoop 依賴
-
依賴java
-
依賴hadoop
-
sqoop 依賴與hadoop
- 資料的一方,儲存在hdfs
- 底層的資料傳輸的實現是mapreduce/yarn
- 所以,我們需要對針對hadoop的版本對sqoop進行編譯
- 由於編譯sqoop十分的麻煩,所以在企業實際情況中,Hadoop都使用Cloudera 或者 Hortonworks
-
實際使用版本下載地址 CDH
-
CDH 5.3.x 版本,非常穩定,好用 cdh-5.3.6 各個版本之間的依賴和相容性,我們不需要考慮
- hadoop-2.5.0-cdh5.3.6.tar.gz
- hive-0.13.1-cdh5.3.6.tar.gz
- zookeeper-3.4.5-cdh5.3.6.tar.gz
- sqoop-1.4.5-cdh5.3.6.tar.gz
4.2 部署Sqoop
-
部署HDFS
-
部署Yarn
-
部署Hive
-
安裝Sqoop
-
修改配置
-
將 sqoop-env-template.sh 重新命名為 sqoop-env.sh
-
修改 sqoop-env.sh 配置檔案
#Set path to where bin/hadoop is available export HADOOP_COMMON_HOME=/opt/models/hadoop-2.5.0 #Set path to where hadoop-*-core.jar is available export HADOOP_MAPRED_HOME=/opt/models/hadoop-2.5.0 #Set the path to where bin/hive is available export HIVE_HOME=/opt/models/hive-0.13.1
-
-
使用 sqoop
bin/sqoop
-
4.3 sqoop 使用 五 要素
-
rdbms/mysql
- jdbcurl \ username \ password \ tablename
-
轉換
- import / export
-
HDFS
- path
-
RDBMS以Mysql資料庫為例講解,拷貝jdbc驅動包到 $SQOOP_HOME/lib 目錄下
4.4 常用命令
-
檢視有哪些資料庫
bin/sqoop list-database \ --connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306 \ --username root \ --password 123456
五、Import 將資料匯入到HDFS中
5.1 Import 概覽
5.2 Import 命令 -- 僅僅使用 map
5.2.1 匯入資料
-
檢視幫助命令:bin/sqoop help import
-
準備mysql資料
-
在mysql中建立表格
create table `my_user` ( `id` tinyint(4) not null auto_increment, `account` varchar(255) default null, `password` varchar(255) default null, primary key (`id`) )
-
向 mysql 中的表中插入資料
insert into `my_user` values ('1', 'admin', 'admin'); insert into `my_user` values ('2', 'pu', '123456'); insert into `my_user` values ('3', 'system', 'system'); insert into `my_user` values ('4', 'zxh', 'zxh'); insert into `my_user` values ('5', 'test', 'test'); insert into `my_user` values ('6', 'pudong', 'pudong'); insert into `my_user` values ('7', 'qiqi', 'qiqi');
-
-
匯入資料到Hadoop
# 當沒有指定匯入的目錄的時候,系統預設會在當前使用者主目錄下建立一個以表名為名稱的資料夾,並將資料存入 bin/sqoop import \ --connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \ --username root \ --password 123456 \ --table my_user # 指定目錄匯入 bin/sqoop import \ --connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \ --username root \ --password 123456 \ --table my_user --target-dir /user/beifeng/sqoop/import/im_my_user \ # 設定 map 的個數 在hadoop中一個 map 一份資料 --num-mappers 1
-
日誌分析
-
Beginning code generation 生成程式碼
- 會以該表名(my_user)為名新建一個 my_user.java 的類,並存放在 $SQOOP_HOME 目錄中
-
生成完類之後,才開始執行sql語句
select t.* from `my_user` as t limit 1 select t.* from `my_user` as t limit 1
-
開始執行hadoop相關的東西
- ...../my_user.java uses or overrides a deprecated API
- Note: Recompile with -xlint: deprecation for details
-
開始寫入
- INFO orm.CompilationManager: Writing jar file : /tmp/sqoop-beifeng/compi......
- 在yarn中顯示為 my_user.jar
- Beginning import of my_user
- 執行mapreduce -- sqoop底層實現
-
-
5.2.2 匯入資料的子集 where
bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user
--target-dir /user/beifeng/sqoop/import/im_my_user \
--where "country = 'USA' "
5.2.3 匯入格式設定 -- 存在於 import control arguments 中
-
hive 中資料儲存檔案
- textfile
- orcfile
- parquet
-
匯入hadoop格式設定為 parquet (常用的格式)
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_parquet \--where "country = 'USA' " \--num-mappers 1 \--as-parquetfile
-
在hive中建立一張表:hive_user
drop table if exists default.hive_user;create table default.hive_user ( id int, username string, password string)# 分隔符需要去讀取匯入的檔案,才能確定row format delimited fileds terinated by ','stored as parquet;
-
將資料載入到hive
hive (default) > load data inpath '/user/beifeng/sqoop/imp_my_user_parquet' into table default.hive_user ;
- 匯入後,hive中hive_user中的資料全是NULL ???
5.2.4 子查詢 --columns
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_column \--where "country = 'USA' " \--num-mappers 1 \--as-parquetfile \# 匯入某些列--columns id,username
5.2.5 過濾/查詢 --query
- 在實際的專案中,要處理的資料,需要進行初步的清洗和過濾
- 某些欄位的過濾
- 條件
- join
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \# 報錯# --query 'select id, username from my_user' \# 正確--query 'select id, username from my_user where $CONDITIONS ' \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_query \--where "country = 'USA' " \--num-mappers 1 \--as-parquetfile \# 匯入某些列--columns id,username
5.2.6 壓縮 --compress
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_snappy \--num-mappers 1 \# 開啟壓縮--compress \# 設定壓縮方式--compression-codec org.apache.hadoop.io.compress.SnappyCodec \# 設定分隔符--fields-terminated-by '\t'
-
建立一個snappy壓縮的表,並匯入上面的資料,然後查詢
drop table if exists default.hive_user_snappy;create table default.hive_user_snappy ( id int, username string, password string)# 分隔符需要去讀取匯入的檔案,才能確定row format delimited fileds terinated by ',' ;load data inpath 'user/beifeng/sqoop/imp_my_user_snappy' into table default.hive_user_snappy ; # queryselect * from default.hive_user_sanppy ;
5.2.7 增量匯入 increment 兩種方式
5.2.7.1 query
-
有一個唯一的識別符號,在企業中,通常這個表都有一個欄位,類似於插入時間 createtime
where createtime => xxxx and createtime < xxxx
5.2.7.2 sqoop 引數
- --check-colunm<colunm> Source column to check for incremental change
- --incremental <import-type> Define an incremental import of type 'append' or 'lastmodified'
- append : 增加
- lastmodified
- --last-value <value> Last imported value in the incrremental check column
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_incr \--num-mappers 1 \--incremental append \--check-column id \# 上一次最後的值,這一次的開始值--last-value 4
5.2.8 --direct
走mysql直接匯入的路徑,不走mapreduce,速度很快
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_incr \--num-mappers 1 \--delete-target-dir \--direct
5.2.9 custom boundary query import
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--query 'SELECT normcities.id, ' \countries.country, \normcities.city \from normcities \join counties using(country_id) \where $CONDITIONS ' \--split-by id \--target-dir cities \--boundary-query "select min(id), max(id) from normcities"
六、Export 匯出資料RDBMS
-
hive table
- table
- hiveserver2 進行 jdbc 方式查詢資料
- hdfs file 儲存到hdfs中
- export ---> mysql / orcale /db2
- table
6.1 export to mysql table
bin/sqoop export \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user \# hdfs 的路徑--export-dir /user/beifeng/sqoop/exp/user/ \--num-mappers 1
七. Hive 資料匯入匯出
hive 的資料本身就儲存在hdfs中,僅僅是將檔案加上了scheam
7.1 import to hive table
-
在 hive 中建立一張表
use default ;drop table if exists user_hive ;create table user_hive(id int,account string,password string)row format delimited filesds terminated by '\t' ;
-
匯入資料
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user \--fields-terminated-by '\t' \# 刪除快取目錄--delete-target-dir \# hdfs 的路徑--target-dir /user/beifeng/sqoop/imp_my_user \--num-mappers 1 \# 指定匯入到 hive 中的哪張表中--hive-database default \--hive-table user_hive \
7.2 export to mysql table 和之前的匯出是一樣的
bin/sqoop export \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user \# hdfs 的路徑--export-dir /user/beifeng/sqoop/exp/user/ \--num-mappers 1 \--hive-fields-terminated-by '\t'
注意
八. sqoop 使用 --option-files
-
例子 實際企業中使用的
- 指令碼 shell scripts
## step 1load data ...## step 2bin/hive -f## step 3bin/sqoop --options-file file_path/file_name
import--connectjdbc:mysql://hadoop-senior.ibeifeng.com:3306/test--usernameroot--password123456--tablemy_user--target-dir/user/beifeng/sqoop/imp_my_user--num-mapper1
- 執行檔案
bin/sqoop --options-file file_path/file_name# 傳遞引數bin/sqoop --options-file file_paht/file_name --num-mapper 2
九. 總結
- 資料轉換工具 sqoop
- Sqoop 功能、使用原則
- 將RDBMS資料帶入到HIve表中(全量、增量)
- 將HDFS上檔案匯出到RDBMS表中
- 在實際的開發過程中
- import
- oracle/db2 --> hdfs/ hive/ hbase
- import
- 作業
- 將Oracle 11g中的表資料 匯入到 hdfs和hive中
- Oracle --> hdfs(textfile)
- Oracle --> hdfs(snappy, parquet), load hive table ,select
- Oracle --> hive table
- 將Oracle 11g中的表資料 匯入到 hdfs和hive中
- export
- hive /hdfs --> mysql/oracle
- hiveserver2 jdbc
5 大資料協作框架之Flume
- 實時檔案收集框架
- Cloudera 開發的框架
- kafka/flume + storm(實時計算)/spark 整合
一、flume概述
1.1 flume 概述
- 分散式的:每臺需要收集資料的伺服器上都有一個flume客戶端
- 穩定的
- 可用的
- 基於流式的資料
- 線上的資料分析應用,實時性要求特別高
1.2 flume架構設計 & 說明
- 源端:日誌在某個伺服器上面(web server)
- 中間層:Agent(client):收集
- 其實就是Flume,Flume-ng只有一個角色節點:agent。
- 拿到元資料source,每隔一段時間去獲取資料
- source拿到資料之後,將資料封裝成event,並放到管道當中
- Channel:管道,保證了資料的安全性
- 管道將資料放入到sink
- sink:慢慢的從Channel中拿資料,並寫入到HDFS中
- agent由三個角色組成
- source
- 用於採集資料,source是產生資料流的地方
- source會將產生的資料流傳輸到channel
- channle
- 連結sources和sinks,這個有點像一個佇列
- 將資料存放到記憶體中。速度比較快
- sink
- 從channel收集到資料,將資料寫到目標源,可以是下一個source也可以是HDFS或者HBase
- sink是用於前端展示的
- source --> channel:中間可以進行一些其他的操作,例如資料清洗
- channel --> sink:中間可以進行一些其他的操作,例如資料清洗
- source
- Events
- Event是flume資料傳輸的基本單元,從source ---> hdfs
- Flume以事件的形式將資料從源頭傳送到最終的目的
- Event由可選的header和載有有資料的一個byte array構成
- header:裡面可以有一些對於資料的操作,例如filter、引數傳遞
- 載有的資料對flumet是不透明的
- Header是容納了key-value字串對的無序集合,key在集合內是唯一的
- Header可以在上下文路由中使用擴充套件
- 其實就是Flume,Flume-ng只有一個角色節點:agent。
- 目的地:HDFS
1.3 flume 三要素解釋
- source -- push -- > channel
- source 是自己將資料push給channel
- channel -- pull --> sink
- sink 是自己主動到 channel 中去拉取
二、flume 使用
2.1 執行環境
- 執行在有logs的地方
- 系統:Linux
- JVM/JDK 1.7
- 如何選擇jdk版本:archive.cloudera.com/cm5中可以找到相關的
- 輕量級的服務(eg:zk,jn,zkfc,sqoop),對於記憶體要求不高
- Disk Space:10G左右就行了
- Directory Permission:Read/Write
2.2 解壓 & 配置
2.1.1 解壓 & 安裝
2.1.2 配置
2.1.2.1 配置flume-env.sh
- 將 flume-env.sh.template 複製並重命名為 flume-env.sh
- 修改其中的配置
export JAVA_HOME=/opt/models/jdk1.8.0_281
2.1.2.2 放入 hdfs jar包
2.2 具體使用
# 檢視具體使用方式bin/flume-ng# 檢視版本bin/flume-ng version
三、使用方式 & 命令
3.1 global options
3.1.1 --conf,-c <conf>
- use configs in <conf> directory
- 指定配置檔案路徑
3.1.2 --classpath, -C <cp>
- append to the classpath
- 依賴的一些包,例如hive、HBase等
- 現在我們不用,直接放到 lib 目錄中就可以了
3.1.3 -Dproperty=value
- sets a java system property value
- 日誌是在後端的,如果我們需要在前端監控或者使用的話,就需要我們去設定這個值了
3.2 agent options
3.2.1 --name, -n <name>
- the name of this agent(required)
3.2.2 --conf-file, -f <file>
- specify a config file(required if -z missing)
3.3 案例
bin/flume-ng agent --conf conf --name agent-test --conf-file test.confbin/flume-ng agent -c conf -n agent-test --conf-file test.conf
四、Agent 應用編寫
4.1 修改配置檔案 flume-conf.properties
-
將 flume-conf.properties.template 複製並命名為 a1.conf
-
修改 a1.conf
### define agent### agent name : a1a1.sources = r1a1.channels = c1a1.sinks = k1### define sourcesa1.sources.r1.type = netcata1.sources.r1.bind = hadoop-senior.ibeifeng.coma1.sources.r1.port = 444### define channelsa1.channels.c1.type = memory# 每個channel 中儲存的 event 個數a1.channels.c1.capacity = 1000# sink 每次獲取多少個event = 每次提交多少個事務給 sinka1.channels.c1.transactionCapacity = 100### define sinka1.sinks.k1.type = logger a1.sinks.k1.channel = c1## 每次顯示的最大的位元組長度a1.sinks.k1.maxBytesToLog = 1024### bind the sources and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
4.2 安裝軟體
-
安裝 xinetd
# 啟動 xinetd 服務/etc/rc.d/initd/xinetd restart
-
安裝 telnet
-
安裝 telnet-server
-
安裝 netcat
4.3 執行 Agent a1.conf
bin/flume-ng agent \-c conf \-n a1 \-f conf/a1.conf \-Dflume.root.logger=DEBUG,console
4.4 使用 telnet 向埠傳送資料 測試
telnet hadoop-senior.ibeifeng.com 444flume hello
五、三大元件常用屬性
5.1 sources
-
Exec Source 執行檔案/指令碼/命令,通常用於測試
-
Spooling Directory Source 會監控這個目錄
-
kafka Source
-
Syslog Sources 系統日誌
-
HTTP Source 內網和外網之間的資料
5.2 channels
-
memory channel
-
File Channle 將資料放到一個檔案中,sink直接從檔案中獲取
5.3 Sinks 核心
-
HDFS Sink
-
Hive Sink
-
HBase Sinks
-
MorphlineSolrSink 往Solr中抽取
-
ElasticSearchSink 檢索
六、綜合案例
6.1 需求
- 收集log
- hive執行日誌 /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
- tail -f 讀取的方式收集
- memory
- hdfs
- 儲存到 /user/beifeng/flume/hive-logs/ 目錄中
6.2 編寫Agent程式 a2 配置檔案 flume-tail.conf
### define agent### agent name : a2a2.sources = r2a2.channels = c2a2.sinks = k2### define sourcesa2.sources.r2.type = execa2.sources.r2.command = tail -f /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.loga2.sources.r2.sources.shell = /bin/bash -c### define channelsa2.channels.c2.type = memory# 每個channel 中儲存的 event 個數a2.channels.c2.capacity = 2000# sink 每次獲取多少個event = 每次提交多少個事務給 sinka2.channels.c2.transactionCapacity = 200### define sinka2.sinks.k2.type = hdfs ## HDFS directory path (eg hdfs://namenode/flume/webdata/)a2.sinks.k2.hdfs.path = hdfs:hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/hive-logs/a2.sinks.k2.hdfs.fileType = DataStreama2.sinks.k2.hdfs.writeFormat = Text# number of events written to file before it is flushed to HDFSa2.sinks.k2.hdfs.batchSize = 10### bind the sources and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
6.3 放入 hadoop 依賴包
- 將上面的 jar 包放入到 flume 的 lib 目錄中
6.3 執行 Agent 程式 a2
bin/flume-ng agent \-c conf \-n a2 \-f conf/flume-tail.conf \-Dfilum.root.logger = DEBUG,console
6.4 啟動 Hive 並進行一些操作
- 操作
hive (default) > show databases;hive (default) > show tables;hive (default) > select count(1) from user;
- 實時產生的日誌
6.5 關於 hadoop 做了 HA
配置檔案中
a2.sinks.k2.hdfs.path = hdfs:hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/hive-logs/
修改為
a2.sinks.k2.hdfs.path = hdfs://ns1/user/beifeng/flume/hive-logs/
其中的ns1是在hadoop配置檔案配置的,flume需要去讀取並解析ns1相關的東西
解決方式:將hadoop中相關的檔案放到 flume 的conf 目錄中就可以了
- core-site.xml
- hdfs-site.xml
6.5 作業
- 在 flume抽取資料時,如何在hdfs sink 如何設定檔案的大小
七、專案架構(企業中)
7.1 資料倉庫架構(根據Hadoop來的)
- 抽取
- RDBMS :Sqoop
- Nginx :Flume
- 分析
- MapReduce
- Hive
- Pig
- Impala:底層是記憶體
- 對於分析任務的排程
- OozieI
- 結果
- Query
- Reporting
- Analyze
7.2 flume 在資料倉庫中的架構
7.2.1 flume
7.2.2 企業大資料倉庫之資料收集架構
-
所有的需要收集日誌的伺服器全部是Linux
- 每個伺服器上都執行一個hive,然後將資料收集到HDFS中
-
所有的伺服器全部是windows
- 先將資料通過FTP收集到Linux伺服器上
- 然後將Linux伺服器上是資料通過flume收集到HDFS中
八、企業案例 flume收集資料儲存到HDFS中
8.1 HDFS Sink
-
依據伺服器上的時間抽取日誌 server logs
-
0926
- /user/beifeng/flume/applogs
- /20150926
- xxx.log
- yyy.log
- zzz.log
- /20150926
- /user/beifeng/flume/applogs
-
上面的所有目錄都可以自動創建出來的(按照伺服器的時間建立目錄)
-
hdfs.useLocalTimestamp = truehdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/applogs/%Y%m%d
-
8.2 案例要求
- 監控某個日誌檔案的目錄:
- /app/logs/2014-12-20
- 。。。。
- /app/logs/2014-12-29
- zz.log 變化的,未完成的
- xxx.log.comp 200MB 已經完成的
- yy.log.comp 200MB 已經完成的
- 要求:只收集已經完成的日誌
- 200MB太大了,實時性差了
- 變通:將檔案大小設定小一點
- 200MB太大了,實時性差了
8.3 Spooling Directory Source
8.3.1 相關引數
8.3.2 VS exec
- 在使用 exec 來監聽資料來源雖然實時性較高,但是可靠性較差,當 source 程式執行一場或者Linux明令終端都會造成資料丟失,在恢復正常執行之前資料的完整性無法得到保障
- Spooling Directory Paths 通過監聽某個目錄下的新增檔案,將檔案的內容讀取出來,實現日誌資訊的收集。實際生產中會結合 log4j 來使用。被傳輸結束的檔案會修改後綴名,新增 .completed 字尾(可修改)
8.4 監控目錄 實時抽取資料
-
監控目錄
- 日誌目錄,抽取完整的日誌檔案,正在寫的日誌檔案不抽取
-
使用 FIleChannel
- 本地檔案系統緩衝,比記憶體安全性更高
-
資料儲存在HDFS
- 儲存對應hive表的目錄或者hdfs目錄
-
思考
- 每天抽取檔案到一個目錄中,自動生成目錄,如何做?
8.5 Agent 程式編寫 a3
- 編寫 a3 的配置檔案 flume-app.conf
### define agent### agent name : a3a3.sources = r3a3.channels = c3a3.sinks = k3### define sourcesa3.sources.r3.type = spooldir## 需要監控的目錄a3.sources.r3.spoolDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/spoollogs## 設定抽取時忽略的檔案 ## Regular expression specifying which files ignore(skip)a3.sources.r3.ignorePattern = ^(.)*\\.log$## 設定抽取已完成的檔案的字尾a3.channels.c3.fileSuffix = .delete### define channelsa3.channels.c3.type = filea3.channels.c3.checkpointDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/checkpointa3.channels.c3.dataDirs = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/data## 每個channel 中儲存的 event 個數a3.channels.c3.capacity = 3000## sink 每次獲取多少個event = 每次提交多少個事務給 sinka3.channels.c3.transactionCapacity = 300### define sinka3.sinks.k3.type = hdfs ## 自動按照server時間建立對應的目錄# a3.sinks.k3.hdfs.hdfs.useLocalTimestamp = true# a3.sinks.k3.hdfs.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/applogs/%Y%m%da3.sinks.k3.hdfs.path = /user/beifeng/flume/spoollogs/a3.sinks.k3.hdfs.fileType = DataStreama3.sinks.k3.hdfs.writeFormat = Text# number of events written to file before it is flushed to HDFSa3.sinks.k3.hdfs.batchSize = 10### bind the sources and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
8.6 執行 a3
bin/flume-ng agent \-conf conf \-name a3 \-conf-file conf/flume-app.conf \-Dflume.root.logger=DEBUG,console