1. 程式人生 > 其它 >大資料之sqoop

大資料之sqoop

4 大資料協作框架之sqoop

  • Hadoop 2.x
    • HDFS
    • YARN
    • MapReduce
    • Zookeeper
  • Hive

  • 對日誌型別的海量資料
    • hdfs
    • mr, hive - hql

一、面臨的問題

第一個問題:hdfs

  • 檔案來源於哪裡?

  • 資料怎樣儲存到hdfs ? 海量

  • 現實中的資料來源於兩個方便

    • RDBMS(Oracle,MySQL,DB2....) >>>> sqoop(SQL to HADOOP)(底層都是mapreduce)
    • 檔案(apache,nginx日誌資料) >>>> Flume(實時抽取資料)

第二個問題:排程任務

  • 對資料分析的任務 job,至少都是上千(網際網路公司)的,我們怎樣去排程(排程任務)
    • 怎樣執行,多長執行一次,執行頻率
    • 某一些業務的分析,需要許多job任務共同完成,相互依賴關係,工作流怎樣去排程 ?
  • 可用框架
    • Oozie :apache的
    • 宙斯:阿里的

第三個問題:監控

  • hadoop 2.x 生態系統中重要的框架,8個,怎樣統一的監控?
  • 統一的 web ui 接面,管理框架,監控框架
  • Hue

二、回顧

  • hadoop 2.x
    • HDFS
    • yarn
    • mapreduce
  • Hive

三、Sqoop 概述

Sqoop工作機制是將匯入或到處命令翻譯成mapreduce程式來實現。在翻譯出的mapreduce

中主要是對inputformatoutputformat進行定製

3.1 Sqoop 架構

  • Hdaoop 生態系統包括:HDFS、Hive、Hbase等
  • RDBMS體系包括:Mysql、Orcale、DB2等
  • Sqoop的理解:SQL 到 Hadoop 和 Hadoop 到 SQL

3.2 對於資料流的理解

站在Apache立場上看待資料流問題,可以分為資料的匯入和匯出

  • Import 資料匯入:RDBMS ----> Hadoop
  • Export 資料匯出:Hadoop ----> RDBMS

四、sqoop 使用要點

4.1 sqoop 依賴

  • 依賴java

  • 依賴hadoop

  • sqoop 依賴與hadoop

    • 資料的一方,儲存在hdfs
    • 底層的資料傳輸的實現是mapreduce/yarn
    • 所以,我們需要對針對hadoop的版本對sqoop進行編譯
    • 由於編譯sqoop十分的麻煩,所以在企業實際情況中,Hadoop都使用Cloudera 或者 Hortonworks
  • 實際使用版本下載地址 CDH

  • CDH 5.3.x 版本,非常穩定,好用 cdh-5.3.6 各個版本之間的依賴和相容性,我們不需要考慮

    • hadoop-2.5.0-cdh5.3.6.tar.gz
    • hive-0.13.1-cdh5.3.6.tar.gz
    • zookeeper-3.4.5-cdh5.3.6.tar.gz
    • sqoop-1.4.5-cdh5.3.6.tar.gz

4.2 部署Sqoop

  • 部署HDFS

  • 部署Yarn

  • 部署Hive

  • 安裝Sqoop

    • 修改配置

      • sqoop-env-template.sh 重新命名為 sqoop-env.sh

      • 修改 sqoop-env.sh 配置檔案

        #Set path to where bin/hadoop is available
        export HADOOP_COMMON_HOME=/opt/models/hadoop-2.5.0
        
        #Set path to where hadoop-*-core.jar is available 
        export HADOOP_MAPRED_HOME=/opt/models/hadoop-2.5.0
          
        #Set the path to where bin/hive is available
        export HIVE_HOME=/opt/models/hive-0.13.1
        
    • 使用 sqoop

      bin/sqoop
      

4.3 sqoop 使用 五 要素

  1. rdbms/mysql

    1. jdbcurl \ username \ password \ tablename
  2. 轉換

    1. import / export
  3. HDFS

    1. path
  4. RDBMS以Mysql資料庫為例講解,拷貝jdbc驅動包到 $SQOOP_HOME/lib 目錄下

4.4 常用命令

  1. 檢視有哪些資料庫

    bin/sqoop list-database \
    --connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306 \
    --username root \
    --password 123456
    

五、Import 將資料匯入到HDFS中

5.1 Import 概覽

5.2 Import 命令 -- 僅僅使用 map

5.2.1 匯入資料

  • 檢視幫助命令:bin/sqoop help import

  • 準備mysql資料

    • 在mysql中建立表格

      create table `my_user` (
      	`id` tinyint(4) not null auto_increment,
          `account` varchar(255) default null,
          `password` varchar(255) default null,
          primary key (`id`)
      )
      
    • 向 mysql 中的表中插入資料

      insert into `my_user` values ('1', 'admin', 'admin');
      insert into `my_user` values ('2', 'pu', '123456');
      insert into `my_user` values ('3', 'system', 'system');
      insert into `my_user` values ('4', 'zxh', 'zxh');
      insert into `my_user` values ('5', 'test', 'test');
      insert into `my_user` values ('6', 'pudong', 'pudong');
      insert into `my_user` values ('7', 'qiqi', 'qiqi');
      
  • 匯入資料到Hadoop

    # 當沒有指定匯入的目錄的時候,系統預設會在當前使用者主目錄下建立一個以表名為名稱的資料夾,並將資料存入
    bin/sqoop import \
    --connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
    --username root \
    --password 123456 \
    --table my_user
    
    #  指定目錄匯入
    bin/sqoop import \
    --connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
    --username root \
    --password 123456 \
    --table my_user
    --target-dir /user/beifeng/sqoop/import/im_my_user \
    # 設定 map 的個數 在hadoop中一個 map 一份資料
    --num-mappers 1
    
    • 日誌分析

      • Beginning code generation 生成程式碼

        • 會以該表名(my_user)為名新建一個 my_user.java 的類,並存放在 $SQOOP_HOME 目錄中
      • 生成完類之後,才開始執行sql語句

        select t.* from `my_user` as t limit 1
        select t.* from `my_user` as t limit 1
        
      • 開始執行hadoop相關的東西

        • ...../my_user.java uses or overrides a deprecated API
        • Note: Recompile with -xlint: deprecation for details
      • 開始寫入

        • INFO orm.CompilationManager: Writing jar file : /tmp/sqoop-beifeng/compi......
        • 在yarn中顯示為 my_user.jar
        • Beginning import of my_user
        • 執行mapreduce -- sqoop底層實現

5.2.2 匯入資料的子集 where

bin/sqoop import \
--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \
--username root \
--password 123456 \
--table my_user
--target-dir /user/beifeng/sqoop/import/im_my_user \
--where "country = 'USA' "

5.2.3 匯入格式設定 -- 存在於 import control arguments 中

  • hive 中資料儲存檔案

    • textfile
    • orcfile
    • parquet
  • 匯入hadoop格式設定為 parquet (常用的格式)

    bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_parquet \--where "country = 'USA' " \--num-mappers 1 \--as-parquetfile
    
  • 在hive中建立一張表:hive_user

    drop table if exists default.hive_user;create table default.hive_user (	id int,    username string,    password string)# 分隔符需要去讀取匯入的檔案,才能確定row format delimited fileds terinated by ','stored as parquet;
    
  • 將資料載入到hive

    hive (default) > load data inpath '/user/beifeng/sqoop/imp_my_user_parquet' into table default.hive_user ;
    
    • 匯入後,hive中hive_user中的資料全是NULL ???

5.2.4 子查詢 --columns

bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_column \--where "country = 'USA' " \--num-mappers 1 \--as-parquetfile \# 匯入某些列--columns id,username

5.2.5 過濾/查詢 --query

  • 在實際的專案中,要處理的資料,需要進行初步的清洗和過濾
    • 某些欄位的過濾
    • 條件
    • join
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \# 報錯# --query 'select id, username from my_user' \# 正確--query 'select id, username from my_user where $CONDITIONS ' \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_query \--where "country = 'USA' " \--num-mappers 1 \--as-parquetfile \# 匯入某些列--columns id,username

5.2.6 壓縮 --compress

bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_snappy \--num-mappers 1 \# 開啟壓縮--compress \# 設定壓縮方式--compression-codec org.apache.hadoop.io.compress.SnappyCodec \# 設定分隔符--fields-terminated-by '\t' 
  • 建立一個snappy壓縮的表,並匯入上面的資料,然後查詢

    drop table if exists default.hive_user_snappy;create table default.hive_user_snappy (	id int,    username string,    password string)# 分隔符需要去讀取匯入的檔案,才能確定row format delimited fileds terinated by ',' ;load data inpath 'user/beifeng/sqoop/imp_my_user_snappy' into table default.hive_user_snappy ; # queryselect * from default.hive_user_sanppy ;
    

5.2.7 增量匯入 increment 兩種方式

5.2.7.1 query
  • 有一個唯一的識別符號,在企業中,通常這個表都有一個欄位,類似於插入時間 createtime

    where createtime => xxxx and createtime < xxxx
    
5.2.7.2 sqoop 引數
  • --check-colunm<colunm> Source column to check for incremental change
  • --incremental <import-type> Define an incremental import of type 'append' or 'lastmodified'
    • append : 增加
    • lastmodified
  • --last-value <value> Last imported value in the incrremental check column
bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_incr \--num-mappers 1 \--incremental append \--check-column id \# 上一次最後的值,這一次的開始值--last-value 4

5.2.8 --direct

走mysql直接匯入的路徑,不走mapreduce,速度很快

bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user--target-dir /user/beifeng/sqoop/import/im_my_user_incr \--num-mappers 1 \--delete-target-dir \--direct

5.2.9 custom boundary query import

bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--query 'SELECT normcities.id, ' \countries.country, \normcities.city \from normcities \join counties using(country_id) \where $CONDITIONS ' \--split-by id \--target-dir cities \--boundary-query "select min(id), max(id) from normcities"

六、Export 匯出資料RDBMS

  • hive table

    • table
      • hiveserver2 進行 jdbc 方式查詢資料
    • hdfs file 儲存到hdfs中
      • export ---> mysql / orcale /db2

6.1 export to mysql table

bin/sqoop export \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user \# hdfs 的路徑--export-dir /user/beifeng/sqoop/exp/user/ \--num-mappers 1

七. Hive 資料匯入匯出

hive 的資料本身就儲存在hdfs中,僅僅是將檔案加上了scheam

7.1 import to hive table

  • 在 hive 中建立一張表

    use default ;drop table if exists user_hive ;create table user_hive(id int,account string,password string)row format delimited filesds terminated by '\t' ;
    
  • 匯入資料

    bin/sqoop import \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user \--fields-terminated-by '\t' \# 刪除快取目錄--delete-target-dir \# hdfs 的路徑--target-dir /user/beifeng/sqoop/imp_my_user \--num-mappers 1 \# 指定匯入到 hive 中的哪張表中--hive-database default \--hive-table user_hive \
    

7.2 export to mysql table 和之前的匯出是一樣的

bin/sqoop export \--connect jdbc:mysql://hadoop-senior.ibeifeng.com:3306/test \--username root \--password 123456 \--table my_user \# hdfs 的路徑--export-dir /user/beifeng/sqoop/exp/user/ \--num-mappers 1 \--hive-fields-terminated-by '\t'

注意

八. sqoop 使用 --option-files

  • 例子 實際企業中使用的

    • 指令碼 shell scripts
    ## step 1load data ...## step 2bin/hive -f## step 3bin/sqoop --options-file file_path/file_name
    
    import--connectjdbc:mysql://hadoop-senior.ibeifeng.com:3306/test--usernameroot--password123456--tablemy_user--target-dir/user/beifeng/sqoop/imp_my_user--num-mapper1
    
    • 執行檔案
    bin/sqoop --options-file file_path/file_name# 傳遞引數bin/sqoop --options-file file_paht/file_name --num-mapper 2
    

九. 總結

  • 資料轉換工具 sqoop
    • Sqoop 功能、使用原則
    • 將RDBMS資料帶入到HIve表中(全量、增量)
    • 將HDFS上檔案匯出到RDBMS表中
  • 在實際的開發過程中
    • import
      • oracle/db2 --> hdfs/ hive/ hbase
  • 作業
    • 將Oracle 11g中的表資料 匯入到 hdfs和hive中
      • Oracle --> hdfs(textfile)
      • Oracle --> hdfs(snappy, parquet), load hive table ,select
      • Oracle --> hive table
  • export
    • hive /hdfs --> mysql/oracle
    • hiveserver2 jdbc

5 大資料協作框架之Flume

  • 實時檔案收集框架
  • Cloudera 開發的框架
  • kafka/flume + storm(實時計算)/spark 整合

一、flume概述

1.1 flume 概述

  • 分散式的:每臺需要收集資料的伺服器上都有一個flume客戶端
  • 穩定的
  • 可用的
  • 基於流式的資料
  • 線上的資料分析應用,實時性要求特別高

1.2 flume架構設計 & 說明

  • 源端:日誌在某個伺服器上面(web server)
  • 中間層:Agent(client):收集
    • 其實就是Flume,Flume-ng只有一個角色節點:agent。
      • 拿到元資料source,每隔一段時間去獲取資料
      • source拿到資料之後,將資料封裝成event,並放到管道當中
      • Channel:管道,保證了資料的安全性
      • 管道將資料放入到sink
      • sink:慢慢的從Channel中拿資料,並寫入到HDFS中
    • agent由三個角色組成
      • source
        • 用於採集資料,source是產生資料流的地方
        • source會將產生的資料流傳輸到channel
      • channle
        • 連結sources和sinks,這個有點像一個佇列
        • 將資料存放到記憶體中。速度比較快
      • sink
        • 從channel收集到資料,將資料寫到目標源,可以是下一個source也可以是HDFS或者HBase
        • sink是用於前端展示的
      • source --> channel:中間可以進行一些其他的操作,例如資料清洗
      • channel --> sink:中間可以進行一些其他的操作,例如資料清洗
    • Events
      • Event是flume資料傳輸的基本單元,從source ---> hdfs
      • Flume以事件的形式將資料從源頭傳送到最終的目的
      • Event由可選的header和載有有資料的一個byte array構成
        • header:裡面可以有一些對於資料的操作,例如filter、引數傳遞
        • 載有的資料對flumet是不透明的
        • Header是容納了key-value字串對的無序集合,key在集合內是唯一的
        • Header可以在上下文路由中使用擴充套件
  • 目的地:HDFS

1.3 flume 三要素解釋

  • source -- push -- > channel
    • source 是自己將資料push給channel
  • channel -- pull --> sink
    • sink 是自己主動到 channel 中去拉取

二、flume 使用

2.1 執行環境

  • 執行在有logs的地方
  • 系統:Linux
  • JVM/JDK 1.7
    • 如何選擇jdk版本:archive.cloudera.com/cm5中可以找到相關的
  • 輕量級的服務(eg:zk,jn,zkfc,sqoop),對於記憶體要求不高
  • Disk Space:10G左右就行了
  • Directory Permission:Read/Write

2.2 解壓 & 配置

2.1.1 解壓 & 安裝

2.1.2 配置

2.1.2.1 配置flume-env.sh
  • 將 flume-env.sh.template 複製並重命名為 flume-env.sh
  • 修改其中的配置
export JAVA_HOME=/opt/models/jdk1.8.0_281
2.1.2.2 放入 hdfs jar包

2.2 具體使用

# 檢視具體使用方式bin/flume-ng# 檢視版本bin/flume-ng version

三、使用方式 & 命令

3.1 global options

3.1.1 --conf,-c <conf>

  • use configs in <conf> directory
  • 指定配置檔案路徑

3.1.2 --classpath, -C <cp>

  • append to the classpath
  • 依賴的一些包,例如hive、HBase等
  • 現在我們不用,直接放到 lib 目錄中就可以了

3.1.3 -Dproperty=value

  • sets a java system property value
  • 日誌是在後端的,如果我們需要在前端監控或者使用的話,就需要我們去設定這個值了

3.2 agent options

3.2.1 --name, -n <name>

  • the name of this agent(required)

3.2.2 --conf-file, -f <file>

  • specify a config file(required if -z missing)

3.3 案例

bin/flume-ng agent --conf conf --name agent-test --conf-file test.confbin/flume-ng agent -c conf -n agent-test --conf-file test.conf

四、Agent 應用編寫

4.1 修改配置檔案 flume-conf.properties

  • 將 flume-conf.properties.template 複製並命名為 a1.conf

  • 修改 a1.conf

    ### define agent### agent name : a1a1.sources = r1a1.channels = c1a1.sinks = k1### define sourcesa1.sources.r1.type = netcata1.sources.r1.bind = hadoop-senior.ibeifeng.coma1.sources.r1.port = 444### define channelsa1.channels.c1.type = memory# 每個channel 中儲存的 event 個數a1.channels.c1.capacity = 1000# sink 每次獲取多少個event = 每次提交多少個事務給 sinka1.channels.c1.transactionCapacity = 100### define sinka1.sinks.k1.type = logger a1.sinks.k1.channel = c1## 每次顯示的最大的位元組長度a1.sinks.k1.maxBytesToLog = 1024### bind the sources and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
    

4.2 安裝軟體

  • 安裝 xinetd

    # 啟動 xinetd 服務/etc/rc.d/initd/xinetd restart
    
  • 安裝 telnet

  • 安裝 telnet-server

  • 安裝 netcat

4.3 執行 Agent a1.conf

bin/flume-ng agent \-c conf \-n a1 \-f conf/a1.conf \-Dflume.root.logger=DEBUG,console

4.4 使用 telnet 向埠傳送資料 測試

telnet hadoop-senior.ibeifeng.com 444flume hello 

五、三大元件常用屬性

5.1 sources

  • Exec Source 執行檔案/指令碼/命令,通常用於測試

  • Spooling Directory Source 會監控這個目錄

  • kafka Source

  • Syslog Sources 系統日誌

  • HTTP Source 內網和外網之間的資料

5.2 channels

  • memory channel

  • File Channle 將資料放到一個檔案中,sink直接從檔案中獲取

5.3 Sinks 核心

  • HDFS Sink

  • Hive Sink

  • HBase Sinks

  • MorphlineSolrSink 往Solr中抽取

  • ElasticSearchSink 檢索

六、綜合案例

6.1 需求

  • 收集log
    • hive執行日誌 /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.log
    • tail -f 讀取的方式收集
  • memory
  • hdfs
    • 儲存到 /user/beifeng/flume/hive-logs/ 目錄中

6.2 編寫Agent程式 a2 配置檔案 flume-tail.conf

### define agent### agent name : a2a2.sources = r2a2.channels = c2a2.sinks = k2### define sourcesa2.sources.r2.type = execa2.sources.r2.command = tail -f /opt/cdh-5.3.6/hive-0.13.1-cdh5.3.6/logs/hive.loga2.sources.r2.sources.shell = /bin/bash -c### define channelsa2.channels.c2.type = memory# 每個channel 中儲存的 event 個數a2.channels.c2.capacity = 2000# sink 每次獲取多少個event = 每次提交多少個事務給 sinka2.channels.c2.transactionCapacity = 200### define sinka2.sinks.k2.type = hdfs ## HDFS directory path (eg hdfs://namenode/flume/webdata/)a2.sinks.k2.hdfs.path = hdfs:hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/hive-logs/a2.sinks.k2.hdfs.fileType = DataStreama2.sinks.k2.hdfs.writeFormat = Text# number of events written to file before it is flushed to HDFSa2.sinks.k2.hdfs.batchSize = 10### bind the sources and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2

6.3 放入 hadoop 依賴包

  • 將上面的 jar 包放入到 flume 的 lib 目錄中

6.3 執行 Agent 程式 a2

bin/flume-ng agent \-c conf \-n a2 \-f conf/flume-tail.conf \-Dfilum.root.logger = DEBUG,console

6.4 啟動 Hive 並進行一些操作

  • 操作
hive (default) > show databases;hive (default) > show tables;hive (default) > select count(1) from user;
  • 實時產生的日誌

6.5 關於 hadoop 做了 HA

配置檔案中

a2.sinks.k2.hdfs.path = hdfs:hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/hive-logs/

修改為

a2.sinks.k2.hdfs.path = hdfs://ns1/user/beifeng/flume/hive-logs/

其中的ns1是在hadoop配置檔案配置的,flume需要去讀取並解析ns1相關的東西

解決方式:將hadoop中相關的檔案放到 flume 的conf 目錄中就可以了

  • core-site.xml
  • hdfs-site.xml

6.5 作業

  • 在 flume抽取資料時,如何在hdfs sink 如何設定檔案的大小

七、專案架構(企業中)

7.1 資料倉庫架構(根據Hadoop來的)

  • 抽取
    • RDBMS :Sqoop
    • Nginx :Flume
  • 分析
    • MapReduce
    • Hive
    • Pig
    • Impala:底層是記憶體
  • 對於分析任務的排程
    • OozieI
  • 結果
    • Query
    • Reporting
    • Analyze

7.2 flume 在資料倉庫中的架構

7.2.1 flume

7.2.2 企業大資料倉庫之資料收集架構

  1. 所有的需要收集日誌的伺服器全部是Linux

    • 每個伺服器上都執行一個hive,然後將資料收集到HDFS中
  2. 所有的伺服器全部是windows

    • 先將資料通過FTP收集到Linux伺服器上
    • 然後將Linux伺服器上是資料通過flume收集到HDFS中

八、企業案例 flume收集資料儲存到HDFS中

8.1 HDFS Sink

  • 依據伺服器上的時間抽取日誌 server logs

    • 0926

      • /user/beifeng/flume/applogs
        • /20150926
          • xxx.log
          • yyy.log
          • zzz.log
    • 上面的所有目錄都可以自動創建出來的(按照伺服器的時間建立目錄)

    • hdfs.useLocalTimestamp = truehdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/applogs/%Y%m%d
      

8.2 案例要求

  • 監控某個日誌檔案的目錄:
    • /app/logs/2014-12-20
    • 。。。。
    • /app/logs/2014-12-29
      • zz.log 變化的,未完成的
      • xxx.log.comp 200MB 已經完成的
      • yy.log.comp 200MB 已經完成的
  • 要求:只收集已經完成的日誌
    • 200MB太大了,實時性差了
      • 變通:將檔案大小設定小一點

8.3 Spooling Directory Source

8.3.1 相關引數

8.3.2 VS exec

  • 在使用 exec 來監聽資料來源雖然實時性較高,但是可靠性較差,當 source 程式執行一場或者Linux明令終端都會造成資料丟失,在恢復正常執行之前資料的完整性無法得到保障
  • Spooling Directory Paths 通過監聽某個目錄下的新增檔案,將檔案的內容讀取出來,實現日誌資訊的收集。實際生產中會結合 log4j 來使用。被傳輸結束的檔案會修改後綴名,新增 .completed 字尾(可修改)

8.4 監控目錄 實時抽取資料

  • 監控目錄

    • 日誌目錄,抽取完整的日誌檔案,正在寫的日誌檔案不抽取
  • 使用 FIleChannel

    • 本地檔案系統緩衝,比記憶體安全性更高
  • 資料儲存在HDFS

    • 儲存對應hive表的目錄或者hdfs目錄
  • 思考

    • 每天抽取檔案到一個目錄中,自動生成目錄,如何做?

8.5 Agent 程式編寫 a3

  • 編寫 a3 的配置檔案 flume-app.conf
### define agent### agent name : a3a3.sources = r3a3.channels = c3a3.sinks = k3### define sourcesa3.sources.r3.type = spooldir## 需要監控的目錄a3.sources.r3.spoolDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/spoollogs## 設定抽取時忽略的檔案 ## Regular expression specifying which files ignore(skip)a3.sources.r3.ignorePattern = ^(.)*\\.log$## 設定抽取已完成的檔案的字尾a3.channels.c3.fileSuffix = .delete### define channelsa3.channels.c3.type = filea3.channels.c3.checkpointDir = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/checkpointa3.channels.c3.dataDirs = /opt/cdh-5.3.6/flume-1.5.0-cdh5.3.6/filechannel/data## 每個channel 中儲存的 event 個數a3.channels.c3.capacity = 3000## sink 每次獲取多少個event = 每次提交多少個事務給 sinka3.channels.c3.transactionCapacity = 300### define sinka3.sinks.k3.type = hdfs ## 自動按照server時間建立對應的目錄# a3.sinks.k3.hdfs.hdfs.useLocalTimestamp = true# a3.sinks.k3.hdfs.hdfs.path = hdfs://hadoop-senior.ibeifeng.com:8020/user/beifeng/flume/applogs/%Y%m%da3.sinks.k3.hdfs.path = /user/beifeng/flume/spoollogs/a3.sinks.k3.hdfs.fileType = DataStreama3.sinks.k3.hdfs.writeFormat = Text# number of events written to file before it is flushed to HDFSa3.sinks.k3.hdfs.batchSize = 10### bind the sources and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3

8.6 執行 a3

bin/flume-ng agent \-conf conf \-name a3 \-conf-file conf/flume-app.conf \-Dflume.root.logger=DEBUG,console