flume學習(四):Flume Interceptors的使用
對於flume攔截器,我的理解是:在app(應用程式日誌)和 source 之間的,對app日誌進行攔截處理的。也即在日誌進入到source之前,對日誌進行一些包裝、清新過濾等等動作。
官方上提供的已有的攔截器有:
Timestamp Interceptor
Host Interceptor
Static Interceptor
Regex Filtering Interceptor
Regex Extractor Interceptor
像很多java的開源專案如springmvc中的攔截器一樣,flume的攔截器也是chain形式的,可以對一個source指定多個攔截器,按先後順序依次處理。
Timestamp Interceptor :在event的header中新增一個key叫:timestamp,value為當前的時間戳。這個攔截器在sink為hdfs 時很有用,後面會舉例說到
Host Interceptor:在event的header中新增一個key叫:host,value為當前機器的hostname或者ip。
Static Interceptor:可以在event的header中新增自定義的key和value。
Regex Filtering Interceptor:通過正則來清洗或包含匹配的events。
Regex Extractor Interceptor:通過正則表示式來在header中新增指定的key,value則為正則匹配的部分
下面舉例說明這些攔截器的用法,首先我們調整一下第一篇文章中的那個WriteLog類:又多輸出了一行日誌資訊,現在每次迴圈都會輸出兩行日誌資訊,第一行是一個時間戳資訊,第二行是一行JSON格式的字串資訊。 接下來我們用regex_filter和 timestamp這兩個攔截器來實現這樣一個功能: 1 過濾掉LOG4J輸出的第一行那個時間戳日誌資訊,只收集JSON格式的日誌資訊 2 將收集的日誌資訊儲存到HDFS上,每天的日誌儲存到以該天命名的目錄下面,如2014-7-25號的日誌,儲存到/flume/events/14-07-25目錄下面。 修改後的flume.conf如下:public class WriteLog { protected static final Log logger = LogFactory.getLog(WriteLog.class); /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // TODO Auto-generated method stub while (true) { logger.info(new Date().getTime()); logger.info("{\"requestTime\":" + System.currentTimeMillis() + ",\"requestParams\":{\"timestamp\":1405499314238,\"phone\":\"02038824941\",\"cardName\":\"測試商家名稱\",\"provinceCode\":\"440000\",\"cityCode\":\"440106\"},\"requestUrl\":\"/reporter-api/reporter/reporter12/init.do\"}"); Thread.sleep(2000); } } }
tier1.sources=source1 tier1.channels=channel1 tier1.sinks=sink1 tier1.sources.source1.type=avro tier1.sources.source1.bind=0.0.0.0 tier1.sources.source1.port=44444 tier1.sources.source1.channels=channel1 tier1.sources.source1.interceptors=i1 i2 tier1.sources.source1.interceptors.i1.type=regex_filter tier1.sources.source1.interceptors.i1.regex=\\{.*\\} tier1.sources.source1.interceptors.i2.type=timestamp tier1.channels.channel1.type=memory tier1.channels.channel1.capacity=10000 tier1.channels.channel1.transactionCapacity=1000 tier1.channels.channel1.keep-alive=30 tier1.sinks.sink1.type=hdfs tier1.sinks.sink1.channel=channel1 tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d tier1.sinks.sink1.hdfs.fileType=DataStream tier1.sinks.sink1.hdfs.writeFormat=Text tier1.sinks.sink1.hdfs.rollInterval=0 tier1.sinks.sink1.hdfs.rollSize=10240 tier1.sinks.sink1.hdfs.rollCount=0 tier1.sinks.sink1.hdfs.idleTimeout=60
我們對source1添加了兩個攔截器i1和i2,i1為regex_filter,過濾的正則為\\{.*\\},注意正則的寫法用到了轉義字元,不然source1無法啟動,會報錯。 i2為timestamp,在header中添加了一個timestamp的key,然後我們修改了sink1.hdfs.path在後面加上了/%y-%m-%d這一串字元,這一串字元要求event的header中必須有timestamp這個key,這就是為什麼我們需要新增一個timestamp攔截器的原因,如果不新增這個攔截器,無法使用這樣的佔位符,會報錯。還有很多佔位符,請參考官方文件。 然後執行WriteLog,去hdfs上檢視對應目錄下面的檔案,會發現內容只有JSON字串的日誌,與我們的功能描述一致。
相關推薦
flume學習(四):Flume Interceptors的使用
對於flume攔截器,我的理解是:在app(應用程式日誌)和 source 之間的,對app日誌進行攔截處理的。也即在日誌進入到source之前,對日誌進行一些包裝、清新過濾等等動作。 官方上提供的已有的攔截器有: Timestamp Interceptor Host
flume學習(六):使用hive來分析flume收集的日誌資料
前面已經講過如何將log4j的日誌輸出到指定的hdfs目錄,我們前面的指定目錄為/flume/events。 如果想用hive來分析採集來的日誌,我們可以將/flume/events下面的日誌資料都load到hive中的表當中去。 如果瞭解hive的load data原理
flume學習(八):自定義source
按照以往的慣例,還是需求驅動學習,有位網友在我的flume學習五中留言提了一個問題如下: 我想實現一個功能,就在讀一個檔案的時候,將檔案的名字和檔案生成的日期作為event的header傳到hdfs上時,不同的event存到不同的目錄下,如一個檔案是a.log.2014-0
Petri網學習(四):Petri網的結構性質
一、結構有界性&守恆性 1. 結構有界性 定義:設N=(P,T;F)為一個網。對N賦予任意的初始標識M0,網(N,M0)都是有界的,則稱N為結構有界網; 再回憶一下什麼是有界petri網:在PN=(P,T;F,M0)中,,庫所p都有界,則稱PN為有界petri
PE檔案格式學習(四):匯入表
UPDATE: 在文章的末尾更新了一張圖,在網上找的,有助於理解匯入表的結構 1.概述 匯入表是逆向和病毒分析中比較重要的一個表,在分析病毒時幾乎第一時間都要看一下程式的匯入表的內容,判斷程式大概用了哪些功能。 匯入表是資料目錄表中的第2個元素,排在匯出表的
ionic學習(四):Tab控制元件 學習二
實現功能: 1.新增tabs頁面:下部新增一個新聞按鈕 2.去掉二級頁面tabs選單: 3.修改返回按鈕:上圖的左上方箭頭 步驟 1. 將news頁面放在下面 在tabs.ts和tabs.html中引入並顯示news元件 圖示在這
rabbitmq學習(四):利用rabbitmq實現遠端rpc呼叫
一、rabbitmq實現rpc呼叫的原理 ·rabbitmq實現rpc的原理是:客戶端向一個佇列中傳送訊息,並註冊一個回撥的佇列用於接收服務端返回的訊息,該訊息需要宣告一個叫做correaltionId的屬性,該屬性將是該次請求的唯一標識。服務端在接受到訊息(在需要時可以驗證correaltionId)後,
rabbitmq學習(四):利用rabbitmq實現遠程rpc調用
ext new urn trace cat ued 創建 exc false 一、rabbitmq實現rpc調用的原理 ·rabbitmq實現rpc的原理是:客戶端向一個隊列中發送消息,並註冊一個回調的隊列用於接收服務端返回的消息,該消息需要聲明一個叫做correaltio
python學習(四):python變數和函式
python用下劃線作為變數字首和字尾指定特殊變數 _xxx 不能用’from module import *’匯入 __xxx__ 系統定義名字 __xxx 類中的私有變數名 核心風格:避免用下劃線作為變數名的開始。 因為下劃線對直譯器有特殊的意義,而且是內建
Spring Boot學習(四):使用@SpringBootTest註解進行單元測試
一、簡介 專案中經常會遇到需要單元測試的情況,那麼SpringBoot如何實現這種需求,使用@SpringBootTest註解可以執行環境,測試後臺程式碼。 二、環境準備 eclipse + maven + Spring Boot 三、程式碼示例 pom.xml
webpack學習(四):配置CleanWebpackPlugin
demo地址: https://github.com/Lkkkkkkg/webpack-demo 上次配置HtmlWebpackPlugin: https://blog.csdn.net/qq593249106/article/details/84900169 繼上次配置完 HtmlWe
Spring的學習(四):Web中的Spring
Spring通常用來開發Web應用。 SpringMVC的執行過程: 我們可以從以下的圖來分析SpringMVC的的執行過程。 1、客戶端在傳送請求的時候,會呼叫DispatcherServlet,Dispatch是SpringMVC的入口,Dispatche
SpringBoot+Shiro學習(四):Realm授權
上一節我們講了自定義Realm中的認證(doGetAuthenticationInfo),這節我們繼續講另一個方法doGetAuthorizationInfo授權 授權流程 流程如下: 首先呼叫Subject.isPermitted/hasRole介面,其會委託給Security
Python+OGR庫學習(四):重投影shp檔案並另存,屬性表保持不變
程式碼關鍵點 1、首先要定義好轉換引數 2、主要操作物件是要素,需要提前建立好輸出檔案,然後遍歷所有要素,對每一個幾何物件進行座標轉換 3、輸出檔案的欄位屬性定義需要從輸入檔案讀取 程式碼思路 1、匯入相關包,切換路徑,註冊驅動 2、定義轉換關係 3、開啟輸入檔案,讀取到圖層
caffe學習(四):py-faster-rcnn配置,執行測試程式(Ubuntu)
上一篇部落格中講了在Ubuntu下安裝caffe的經驗總結(各種問題,簡直懷疑人生了)。部落格連結:點我開啟 faster-rcnn有兩個版本,分別是Python的和MATLAB的。這裡介紹python版本的faster-rcnn的配置。 網上有很多相關的教程,起初我在配置
GitHub學習(四):Phpstorm中的git使用(2)--拉取工程與composer使用
之前我在一臺電腦上將一份不完整的工程儲存在github上,現在我回到家中,換了一臺電腦,接下來就是要用另一臺電腦拉取github中的工程,並用composer把整個工程的依賴檔案什麼亂七八糟的檔案都下下好。 1.首先開啟phpstorm,按圖
vue學習(四):子元件向父元件傳參
子元件向父元件傳參主要依靠 v-on 和 $.emit 這個是vue官網上給的方法呼叫,我們看看頁面上怎麼使用。 子元件 main_Header.vue <template> <div> <div>{{count}}</
thinkphp5.0學習(四):入口檔案、路由模式、路由設定和url生成
一、路由的作用 簡化URL地址,方便記憶 有利於搜尋引擎的優化 二、入口檔案 前後臺分離 在網站public目錄下(專案\public)新建admin.php 開啟admin.ph
多執行緒學習(四):停止執行緒
停止執行緒 停止一個執行緒可以使用Thread.stop()方法,但最好不用它,因為這個方法是不安全的,而且已被棄用。 大多數停止一個執行緒的操作使用Thread.interrupt()方法,但是這個方法不會終止一個正在執行的執行緒,還需要加入一個判斷才可以完成執行緒的停止。 Jav
機器學習(四):BP神經網路_手寫數字識別_Python
機器學習演算法Python實現 三、BP神經網路 全部程式碼 1、神經網路model 先介紹個三層的神經網路,如下圖所示 輸入層(input layer)有三個units(為