1. 程式人生 > >Falcon(三)——Falcon資料匯入和匯出

Falcon(三)——Falcon資料匯入和匯出

綜述

Falcon提供了週期性的將源資料從外部資料庫(資料庫,drop boxes etc)匯入到Hadoop上以及將講過Hadoop運算的資料匯出到外部的資料庫。

在至今所有的版本中,Falcon僅僅支援關係型資料庫(Mysql,Oracle等)講過JDBC作為外部的資料來源。將來的版本可能增加支援其他型別的資料庫。

先決條件

要將資料匯入和資料匯出需要滿足以下的條件:
- Sqoop 1.4.6+
- Oozie 4.2.0+
- 合適的資料庫聯結器

Note
Falcon通過Sqoop進行資料匯入和資料匯出操作,Sqoop將需要合適的資料庫驅動來連線相應的資料庫。必須確保資料庫的jar包被複制到Oozie中的Sqoop的share lib中:

For example, in order to import and export with MySQL, please make sure the latest MySQL connector
*mysql-connector-java-5.1.31.jar+* is copied into oozie's Sqoop share lib

/user/oozie/share/lib/{lib-dir}/sqoop/mysql-connector-java-5.1.31.jar+

where {lib-dir} value varies in oozie deployments.

用法

實體定義和設定

  • 資料來源實體

資料來源實體實現了資料庫連線和資料庫憑據驗證的高度抽象。資料來源實體支援了資料庫特定憑據驗證的讀和寫的介面,如果讀和寫的介面不能使用使用的話,它會呼叫預設的憑據進行驗證。通常情況下,資料來源實體將被系統管理員定義。

接下來的例子定義了一個Mysql的資料來源的實體,這個匯入操作將使用Mysql的讀介面(URI:jdbc:mysql://dbhost/test)使用者名稱:import_usr,密碼:sqoop。匯出操作將使用帶有(URI:jdbc:mysql://dbhost/test)的寫介面採用使用者名稱:export_usr和密碼指定在HDFS的 “/user/ambari-qa/password-store/password_write_user”.這個檔案中。

當讀介面和寫介面的憑據不能使用時,將使用預設的憑據The default credential specifies the password using password alias feature available via hadoop credential functionality. 使用者可以建立一個祕鑰使用如下命令:

hadoop credential -create <alias> -provider <provider-path>

where is a string and is a HDFS jceks file,在執行期間 ,the specified alias will be used to look up the password stored encrypted in the jceks hdfs file specified under the providerPath element.

讀和寫的介面可以使管理員分開讀和寫的工作量:

File: mysql-database.xml

  <?xml version="1.0" encoding="UTF-8"?>
      <datasource colo="west-coast" description="MySQL database on west coast" type="mysql" name="mysql-db" xmlns="uri:falcon:datasource:0.1">
          <tags>[email protected], [email protected]</tags>
          <interfaces>
              <!-- ***** read interface ***** -->
              <interface type="readonly" endpoint="jdbc:mysql://dbhost/test">
                  <credential type="password-text">
                      <userName>import_usr</userName>
                      <passwordText>sqoop</passwordFile>
                  </credential>
              </interface>

              <!-- ***** write interface ***** -->
              <interface type="write"  endpoint="jdbc:mysql://dbhost/test">
                  <credential type="password-file">
                      <userName>export_usr</userName>
                     <passwordFile>/user/ambari-qa/password-store/password_write_user</passwordFile>
                  </credential>
              </interface>

              <!-- *** default credential *** -->
              <credential type="password-alias">
                <userName>sqoop2_user</userName>
                <passwordAlias>
                    <alias>sqoop.password.alias</alias>
                    <providerPath>hdfs://namenode:8020/user/ambari-qa/sqoop_password.jceks</providerPath>
                </passwordAlias>
              </credential>

          </interfaces>

          <driver>
              <clazz>com.mysql.jdbc.Driver</clazz>
              <jar>/user/oozie/share/lib/lib_20150721010816/sqoop/mysql-connector-java-5.1.31</jar>
          </driver>
      </datasource>
  • 資料集實體

Feed實體可以使使用者在指定去保持資料生命週期和複製資料的同時定義資料匯入和資料匯出的策略,匯入和匯出策略將引用已經定義的資料庫實體去連線和指定特定的資料表。接下來的例子定義了一個具有匯入和匯出策略的feed實體,匯入和匯出都將引用“mysql-db”實體,匯入操作將使用讀介面和匯出操作將使用寫介面,一個Feed例項將每一個小時被建立一次和由於保留策略的設定這個feed策略將被刪除在90天之後。

File: customer_email_feed.xml

 <?xml version="1.0" encoding="UTF-8"?>
      <!--
       A feed representing Hourly customer email data retained for 90 days
       -->
      <feed description="Raw customer email feed" name="customer_feed" xmlns="uri:falcon:feed:0.1">
          <tags>externalSystem=USWestEmailServers,classification=secure</tags>
          <groups>DataImportPipeline</groups>
          <frequency>hours(1)</frequency>
          <late-arrival cut-off="hours(4)"/>
          <clusters>
              <cluster name="primaryCluster" type="source">
                  <validity start="2015-12-15T00:00Z" end="2016-03-31T00:00Z"/>
                  <retention limit="days(90)" action="delete"/>
                  <import>
                      <source name="mysql-db" tableName="simple">
                          <extract type="full">
                              <mergepolicy>snapshot</mergepolicy>
                          </extract>
                          <fields>
                              <includes>
                                  <field>id</field>
                                  <field>name</field>
                              </includes>
                          </fields>
                      </source>
                      <arguments>
                          <argument name="--split-by" value="id"/>
                          <argument name="--num-mappers" value="2"/>
                      </arguments>
                  </import>
                  <export>
                        <target name="mysql-db" tableName="simple_export">
                            <load type="insert"/>
                            <fields>
                              <includes>
                                <field>id</field>
                                <field>name</field>
                              </includes>
                            </fields>
                        </target>
                        <arguments>
                             <argument name="--update-key" value="id"/>
                        </arguments>
                    </export>
              </cluster>
          </clusters>

          <locations>
              <location type="data" path="/user/ambari-qa/falcon/demo/primary/importfeed/${YEAR}-${MONTH}-${DAY}-${HOUR}-${MINUTE}"/>
              <location type="stats" path="/none"/>
              <location type="meta" path="/none"/>
          </locations>

          <ACL owner="ambari-qa" group="users" permission="0755"/>
          <schema location="/none" provider="none"/>

      </feed>
  • 匯入策略(Import policy)

匯入策略將使用datasource實體指定特定連線特定的資料來源,表名應該在存在引用的資料庫中,匯入過程可以指定匯入特定的欄位並且可以指定是每次都是全部一次性倒入或者增量匯入,
The merge policy specifies how to organize (snapshot or append, i.e time series partiitons) the data on hadoop.有效的組合是:

  1. [full,snapshot] :資料每次被全部抽取並且被儲存在Feed指定的位置。

  2. [incremental, append] :data is extracted incrementally using the key specified in the deltacolumn and added as a partition to the feed instance location.

  3. [incremental, snapshot]. :資料被增量的抽取和已經在Hadoop中存在的資料進行融合產生一個最新的Feed例項,.這個特性現在並不支援. 這個操作對於一個具有大量的資料的表一以及更新後可以插入到Hadoop的資料裡,對於使用者來說,它可以作為一個整體。

接下來的例子定義了[incremental, append]的策略:

 <import>
                <source name="mysql-db" tableName="simple">
                    <extract type="incremental">
                        <deltacolumn>modified_time</deltacolumn>
                        <mergepolicy>append</mergepolicy>
                    </extract>
                    <fields>
                        <includes>
                            <field>id</field>
                            <field>name</field>
                        </includes>
                    </fields>
                </source>
                <arguments>
                    <argument name="--split-by" value="id"/>
                    <argument name="--num-mappers" value="2"/>
                </arguments>
            </import>

Filed標籤將一定哪一個列將被匯入,預設情況下,所有的列將被匯入,Includes標籤定義了其中需要匯入的列的欄位,Excludes正好與Includes相反,

arguments標籤將可以呼叫在Sqoop中任何額外的引數。

  • 匯出策略(Export policy)

匯出策略和匯入策略同理,使用Datasource去連線資料庫,載入特定列的資料對資料表進行插入或者更新資料,Fields操作的選項和Import具有同樣的策略。表名應該存在於資料庫中。

操作步驟:

    ## submit the mysql-db datasource defined in the file mysql_datasource.xml
    falcon entity -submit -type datasource -file mysql_datasource.xml

    ## submit the customer_feed specified in the customer_email_feed.xml
    falcon entity -submit -type feed -file customer_email_feed.xml

    ## schedule the customer_feed
    falcon entity -schedule -type feed -name customer_feed

相關推薦

Falcon——Falcon資料匯入匯出

綜述 Falcon提供了週期性的將源資料從外部資料庫(資料庫,drop boxes etc)匯入到Hadoop上以及將講過Hadoop運算的資料匯出到外部的資料庫。 在至今所有的版本中,Falcon僅僅支援關係型資料庫(Mysql,Oracle等)講過JDB

zigbee 之ZStack-2.5.1a原始碼分析無線資料傳送接收

前面說過SampleApp_Init和SampleApp_ProcessEvent是我們重點關注的函式,接下來分析無線傳送和接收相關的程式碼: 在SampleApp_ProcessEvent函式中: if ( events & SYS_EVENT_MSG ) {  &nbs

[原創]分散式系統之快取的微觀應用經驗談資料分片叢集篇】

分散式系統之快取的微觀應用經驗談(三)【資料分片和叢集篇】 前言   近幾個月一直在忙些瑣事,幾乎年後都沒怎麼閒過。忙忙碌碌中就進入了2018年的秋天了,不得不感嘆時間總是如白駒過隙,也不知道收穫了什麼和失去了什麼。最近稍微休息,買了兩本與技術無關的書,其一是 Yann Martel 寫的《The

Vue學習——屬性繫結雙向資料繫結

<!DOCTYPE html> <html> <head> <meta charset="utf-8" /> <title>屬性繫結和雙向資料繫結</title> <script src="./v

《設計資料密集型應用/DDIA》精要翻譯資料的儲存獲取之底層資料結構

看了這三章,我最大的收貨並不是說學到了什麼新的知識,而是對以前通過各種渠道所掌握的知識重新進行了梳理和歸納,使它們有種脈絡相通的感覺。 一、驅動你的資料庫的資料結構 這也許是世界上最簡單的資料庫實現: db_set () {

Linux下實現視訊讀取---Buffer的準備資料讀取

前面主要介紹的是:V4L2 的一些設定介面,如亮度,飽和度,曝光時間,幀數,增益,白平衡等。今天看看V4L2 得到資料的幾個關鍵ioctl,Buffer的申請和資料的抓取。 1. 初始化 Memory Mapping 或 User Pointer I/O. int ioct

Hive總結Hive資料匯入種方式

零.Hive的幾種常見的資料匯入方式 常用的的有三種: 1.從本地檔案系統中匯入資料到Hive表; 2.從HDFS上匯入資料到Hive表; 3.在建立表的時候通過從別的表中查詢出相應的記錄並插入到所建立的表中。 Hive配置: HDFS中Hive資料

mysql基礎存儲引擎

mysql存儲引擎的概念: 關系型數據庫表是用於存儲和組織信息的數據結構,可以將表理解為由行和列組成的表格,各種各樣,不同的表結構意味著存儲不同類型的數據,在數據的處理上也會存在著差異,對於mysql來說,它提供了多種類型的存儲引擎,可以根據對數據處理的需求,選擇不同的存儲引擎,從而最大

ASP.NET MVC5:表單HTML輔助方法

http get 暴露 sta 選擇 .text 響應 pos 多行文本 二進制 表單的使用 Action和Method特性   Action特性用以告知瀏覽器信息發往何處,因此,Action特性後面需要包含一個Url地址。這裏的Url地址可以是相對的,也可以是絕對的。如

【20171115】BugFree使用手冊Bugfree界面後臺管理

結果 所有 idt 重新 case 系統 切換 cal define 三、Bugfree界面 1. 登錄界面   如圖所示:    2. 主界面   輸入系統提供的默認管理員用戶名:admin,密碼(原始):123456;語言選擇默認“簡體中文”。點擊“登錄”按鈕,來到

elastic search&logstash&kibana 學習歷程Logstash使用場景安裝部署

download ssa 技術 tar.gz 我認 搬運 OS last 文檔 Logstash基本介紹和使用場景 自我認為:logstash就是日誌的采集收集,日誌的搬運工,實時去采集日誌。讀取不同的數據源,並進行過濾,開發者自定義規範輸出到目的地。日誌的來源很多,如系統

Linux 筆記 - 第十三章 Linux 系統日常管理之Linux 系統日誌服務

pac ica link tor 包含 3.1 request closed comm 博客地址:http://www.moonxy.com 一、前言 日誌文件記錄了系統每天發生的各種各樣的事情,比如監測系統狀況、排查問題等。作為系統運維人員可以通過日誌來檢查錯誤發生的原因

Python基礎文件操作處理json

load .com 修改 有一個 不存在 user 元素 lac 取出 文件操作步驟:1.有一個文件,2.打開文件,3.讀寫修改文件,4.關閉文件 一.有一個文件:新建或導入文件 二.打開文件:如果是新建的文件默認和py文件在同一個目錄;如果是打開文件,要將文件放在py同目

Spring MVC使用篇—— 處理器對映器介面卡

文章目錄 1、重溫請求流程 2、Spring MVC預設的註解配置 2.1 在Spring 3.1之前 2.2 在Spring 3.1之後 3、配置註解的處理器對映器和介面卡方式 3.1 第一種配置方式

Spring Boot + Spring Cloud 構建微服務系統:服務消費負載Feign

Spring Cloud Feign Spring Cloud Feign是一套基於Netflix Feign實現的宣告式服務呼叫客戶端。它使得編寫Web服務客戶端變得更加簡單。我們只需要通過建立介面並用註解來配置它既可完成對Web服務介面的繫結。它具備可插拔的註解支援,包括Feign註解、JAX-RS註解

搭建簡單圖片分類的卷積神經網路-- 模型的測試運用

兩個功能都在同一個檔案中 一、新建Disimage.py檔案 import tensorflow as tf from PIL import Image import os import numpy as np import matplotlib.pyplot as plt from Get

API閘道器Kong:功能梳理外掛使用-基本使用過程

作者: 李佶澳   轉載請保留:原文地址   釋出時間:2018-10-10 14:37:53 +0800   說明 Kong的Admin API Kong定義的資源之間的關聯關係

Hadoop資料離線計算與實時計算

分享一下我老師大神的人工智慧教程吧。零基礎,通俗易懂!風趣幽默!http://www.captainbed.net/ 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

EFK 配置geo-ip落地實踐經緯度資料查詢及格式化輸出

經過之前的工作,目前已經完成了資料地圖的資料格式化和錄入記錄,目前我們的資料地圖專案已經進行到最後階段,所以現在需要一個介面,進行格式化資料並輸出,其中需要用到Elasticsearch的全文檢索,檢索出資料後,使用php介面格式化資料輸出 一、全文檢索 搜尋條件(時間,空間) 輸出結果(使用者

Elasticsearch Query DSL 整理總結—— Match Phrase Query Match Phrase Prefix Query

目錄 引言 Match Phase Query slop 引數 analyzer 引數 zero terms query Match Phrase 字首查詢 max_expansions 小結 參考文件 系列文章列表