flume+hive處理日誌
原創文章,轉載請註明: 轉載自始終不夠
現在的情況是,你被告知需要設計一套方案,用來處理公司所有的日誌檔案。當然,業務使用者希望他們可以通過可以想象到的方式對這些資料進行查詢,但是他們不會定義任何用例。這聽起來像是你需要弄清楚的問題嗎?如果是,那麼你來對地方了。
你首先要考慮的是,日誌檔案會不斷的增長、增長。所以你需要採用一個經濟可行的方式。對資料量和不是很明確的需求,資料倉庫並不是最好的選擇。當然,你會選擇hadoop。非常好,這是第一步。那麼現在,你要如何把那些日誌檔案寫入hadoop呢。
當然,對於目前的場景來說,flume是一個完美的選擇。畢竟,flume是圍繞收集日誌而設計的。所以,你開始在網上搜索一些方案,你會發現一些資訊,但問題是,你找到的這些示例,並不是你所需要的。有一個推特的over-used示例,並且有一些其他的json示例,但是你所需要的只是簡單明瞭的日誌檔案。那些對於我們來說並不是很好的方案。希望,這篇文章能夠幫助到你。
如何用flume和hive管理日誌檔案?這就是這篇文建將要講述的。這裡會降到收集日誌到HDFS,並且用HIVE管理他們。這個示例將使用Juniper Netscreen Firewalls的日誌檔案,但是這個一般的方式調整後可以處理任何你需要收集和處理的日誌。
原始碼
下面討論的所有程式碼,都可以在github上找到:
先決條件
我希望這篇文章不會長的不能忍受,所以我將做一些假設:
你已經有了一個已經安裝並配置好的叢集。我正在使用CDH4.5,但是這並沒有什麼特殊的,它應該可以在任何hadoop發行版中使用。
你已經對flume有了一個基本的瞭解。我不會講一些一般的概念和設定。有很多很好的文章,在那裡學習這些更合適。
你已經對hive有了一個基本的瞭解。和flume類似,我想跟你說一些你還不知道的。
你已經瞭解正則表示式。
解決方案概述
這個方案以flume收集事件開始。在這個示例中,為了簡單我將使用netcat,但是在現實生活中,你可能會使用系統日誌的一些變種。時間將會被source接收,並且我們需要做的唯一一件事就是保證在header有一個時間戳,這樣我們將使用時間攔截器。
下一步,我們將使用memory channel。同樣的,這也是為了簡單。在生產環境中,你可能會根據你的需要和情況作出不同的選擇。否則,在channel中不會發生什麼特別的事。
最後,我們開始配置sink,從這裡開始,事情開始變得有趣。我們已經知道我們希望把資料寫入到HDFS中,所以我們需要一個HDFS sink。然而,我們也知道我們會使用到Hive使資料可以訪問。因此,在Hive中,我們會對我們的資料進行分割槽。
分割槽最明顯的方式就是使用日期和時間,這將是有意義的,因為將分開的日誌條目寫入到基於時間的桶中,你可以使用HiveQL提取資訊特定的列。這將影響到我們如何寫入資料。在flume中,我們需要使用基於時間的轉義序列寫入條目,在hive中,我們需要定義匹配的分割槽。
然而,複雜度更深一點,因為Hive要求我們使用模式。所以重要的是,我們需要從日誌條目中提取資料並且將他們替換到列中,這樣Hive才可以讀取他們。這裡,有若干個可以使用的方案,但是這個實力,我們將提取一些準系統資訊。
到這裡,我們清楚的知道,有一些可能的方案。不同的firewalls可能有不同的格式等等。在這個簡單的示例中,我們需要提取資料替換成列格式,並且輸出他們。但是,還有可能是,我們需要使用列和模式重新排列已經提取的資料。例如,你可能發現,你提取到第一塊資料是作為模式的最後一列,所以你需要儲存這一塊資料,在最後輸出。
而在Hive這邊需要考慮的是,寫入HDFS的方式。預設情況下,flume將資料作為一個序列化的檔案寫入,這可能對你的場景來說不能正常工作。在我的示例中,我需要保證這些日誌檔案是簡單的,並且訪問和讀取是可能的,也就是他們需要被儲存成CSV檔案。因此,建立一個自定義的序列化器是必要的,它將提取,重排和輸出CSV格式資料。
flume配置
flume配置在下面會被展示出來,並且它同樣在github專案的conf目錄下。有幾點值得提一下。netcat source並不是特別有趣和複雜的。它只是一個基本的source。
sink同樣有幾點需要指出。第一是它假設你已經為flume user建立了(/user/flume/logs)目錄,你還需要放寬這個目錄的許可權,以允許訪問。
在需要的寫入HDFS的地方寫上wirteFormat=Text和fileType=DataStream。此外,使用custom serializer,CSVSerializer,它將使用正則表示式提取資料,而在這種情況下,僅僅是基本的連線資訊中指定的正則表示式提取資料。regexorder屬性用於重新排序正則表示式組來匹配Hive模式期望的輸出。請注意,有必要使用完全限定類名來定義的序列器及指定嵌入生成器類,這將在原始碼段來解釋。
最後,rolling相關的屬性,旨在一小時在HDFS上建立一個檔案,用來減少生成檔案的數量。當然,這些都需要根據條件被改變,或粒度降低,然後轉到每日分割槽。flume已經建立了很多檔案,如果你不妥善管理這些配置,這絕對不是一個好習慣,那絕對是一個需要注意和警惕的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
tier1.sources
= source1
tier1.channels
= channel1
tier1.sinks
= sink1
tier1.sources.source1.type
= netcat
tier1.sources.source1.bind
= 127.0.0.1
tier1.sources.source1.port
= 9999
tier1.sources.source1.interceptors
= i1
tier1.sources.source1.interceptors.i1.type
= timestamp
tier1.sources.source1.interceptors.i1.preserveExisting
= true
tier1.sources.source1.channels
= channel1
tier1.channels.channel1.type
= memory
tier1.sinks.sink1.type
= hdfs
tier1.sinks.sink1.hdfs.writeFormat
= Text
tier1.sinks.sink1.hdfs. fileType
= DataStream
tier1.sinks.sink1.hdfs.path
= /user/flume/logs/year=%Y/month=%m/day=%d/hour=%H
tier1.sinks.sink1.hdfs.rollInterval
= 3600
tier1.sinks.sink1.hdfs.rollCount
= 10000
tier1.sinks.sink1.hdfs.batchSize
= 10000
tier1.sinks.sink1.serializer
= com.freitas.flume.serializer.CSVSerializer $Builder
tier1.sinks.sink1.serializer.format
= CSV
tier1.sinks.sink1.serializer.regex
= .* proto=(\\d+) .* src=(.*) dst=(.*) src_port=(\\d+) dst_port=(\\d+).*
tier1.sinks.sink1.serializer.regexorder
= 5 1 2 3 4
tier1.sinks.sink1.channel
= channel1
tier1.channels.channel1.capacity
= 100
|
自定義Serializer
自定義Serializer真的不是什麼大不了的事。咋一看,好像很複雜,但只要你使用它,就會發現其實它很容易。我能很容易的跟隨flume的原始碼和其他東西。這個類需要實現EventSerializer介面。我真的只需要定義whrite方法,它可以做大量工作。我也需要定義一個內聯構造器類,並且建立父類的例項物件。flume就是這樣做的,我們實在無法跟它爭辯。
在邏輯上,它其實是非常簡單的。建構函式將檢索配置屬性和設定和建立將順序的正則表示式組必須寫出來索引的雜湊正則表示式。然後再write方法中,它將會在每一行使用正則表示式,提取組,把他們的期望的輸出順序索引的雜湊,最後,通過雜湊讀取,並且用逗號分隔符分隔後輸出。
開發Serializer
開發自定義Serializer並不是完全明顯,值得進行一些討論。它需要被編譯成一個Jar檔案,然後這個Jar檔案需要放到一個特定的位置,最後,flume將會找到和使用它。你需要考慮類的個數。這裡只有一個雷,但是需要執行同樣的步驟。我選擇Moven專案,是的生成Jar檔案很容易,但是你可能用的Ant或者其他生成方式。一旦你有了Jar檔案,你需要做一些事情,例如:
1 2 3 4 5 |
cd
/ var /lib/flume-ng/plugins.d
mkdir
-p plugins.d/flume-logs/lib
chown
-R flume plugins.d
chgrp
-R flume plugins.d
cp
flume-logs-1.0.0.jar / var /lib/flume-ng/plugins.d/flume-logs/lib
|
Hive配置
在Hive這邊,你需要執行Hive Shell並且使用與輸出匹配的模式建立一個外部表。事實上,你大概會先確定所需要的模式開始,但本辦法只是為了使我更容易組織文章。請注意,分割槽使用日期和時間分割槽,分割槽期望flume建立的目錄。使用外部表的工作已經好了,因為你不需要做任何耗時的載入資料的工作。他只是在你需要的地方,並且Hive可以開始使用它了。
1 2 3 4 5 6 7 8 9 10 11 12 |
CREATE
EXTERNAL TABLE networkData (
action_time
BIGINT,
src_ip
STRING,
dest_ip
STRING,
src_port
STRING,
dest_port
STRING,
protocol
STRING
)
PARTITIONED
BY (year int, month int, day int, hour int)
ROW
FORMAT DELIMITED
FIELDS
TERMINATED BY ','
LOCATION '/user/flume/logs/' ;
|
現在,在這個方案中,這是我最喜歡的一件事。執行Alter Table新增一個新的分割槽是必要的,目的是為了讓Hive可以看見資料。這個動作並不是做完之後就不需要了。它必須被定時任務或其他類似的東西執行。這只是Hive的工作方式。
相關推薦flume+hive處理日誌原創文章,轉載請註明: 轉載自始終不夠 現在的情況是,你被告知需要設計一套方案,用來處理公司所有的日誌檔案。當然,業務使用者希望他們可以通過可以想象到的方式對這些資料進行查詢,但是他們不會定義任何用例。這聽起來像是你需要弄清楚的問題嗎?如果是,那麼你來對地方 Flume採集處理日誌檔案http://www.cnblogs.com/windcarp/p/3872578.html Flume簡介 Flume是Cloudera提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各類資料傳送方,用於收集 flume使用之flume+hive 實現日誌離線收集、分析在如今網際網路行業中,資料的收集特別是日誌資料的收集已經成為了系統的標配。將使用者行為日誌或者線上系統生產的資料通過flume收集起來,存放到資料倉庫(hive)中,然後離線通過sql進行統計分析,這一套資料流的建設對系統有非常重要的意義。 1、思路: 1)線上系統通 從Hive處理日誌到視覺化到實時寫在前面 由於公司業務每日的pv和uv需要統計,所以公司需要一套機制來去儲存資料,並且週期性地處理資料。 Hive 資料倉庫,建立在HDFS上,通過SQL語句處理海量資料。它會將SQL語句轉化成MapReduce過程,然後分發給叢集處理。元資料一般儲存在M hive 的日誌處理統計網站的 PV 、UV案例 與 給合 python的數據清洗數據案例大數據 hadoop hive 數據清洗 一:hive 清理日誌處理 統計PV、UV 訪問量 二: hive 數據python 的數據清洗 一: 日誌處理 統計每個時段網站的訪問量: 1.1 在hive 上面創建表結構: 在創建表時不能直接導入問題 create table db_b flume學習(六):使用hive來分析flume收集的日誌資料前面已經講過如何將log4j的日誌輸出到指定的hdfs目錄,我們前面的指定目錄為/flume/events。 如果想用hive來分析採集來的日誌,我們可以將/flume/events下面的日誌資料都load到hive中的表當中去。 如果瞭解hive的load data原理 使用flume+hive採集Web伺服器的access日誌1、配置伺服器格式 博主這裡用的是tomcat的combined預設格式,格式如下 127.0.0.1 - - [28/Mar/2017:09:23:10 +0800] "GET /manager/html HTTP/1.1" 401 2536 "-" "Mozilla DB處理大量數據處理日誌報錯問題ons primary 相關配置 pda rim sin 默認 start ont 因為當插入、更新或刪除大批量數據的時候,有時候會出現事務日誌滿的問題,所以解決步驟 1.連接到當前數據庫 db2 connect to uppdb 2.查看數據庫配置文件 db shell習題-處理日誌shell寫一個腳本查找/data/log目錄下,最後創建時間是3天前,後綴是*.log的文件,打包後發送至192.168.1.2服務上的/data/log下,並刪除原始.log文件,僅保留打包後的文件#!/bin/bash find /data/log -name “*.log” -mtime +3 &g 如何處理日誌文件丟失日誌文件 ACTIVE current INACTIVE select group#,members from v$log;查看日誌文件的狀態select group#,status from v$log;有狀態來決定執行步驟status: active al 使用Java佇列來處理日誌資訊(執行緒池的使用)阿里的規範是使用new ThreadPoolExecutor()來建立執行緒池,二不是使用Excutor的靜態工具類來建立執行緒池,具體可以檢視部落格(兩篇): https://blog.csdn.net/angus_Lucky/article/details/798 python記錄_day019 類的約束 異常處理 日誌一 、約束 python中約束有兩種 第一種,通過拋異常進行約束,這種是子類不按我要求的來,我就給你拋異常(推薦) 操作:提取一個父類. 在父類中給出一個方法。但在方法中不給出任何程式碼,直接拋異常 1 # 貼吧 2 # 專案經理(級別高一點兒) 3 class Ba Flume各種採集日誌方式與輸出目錄1、從網路埠採集資料輸出到控制檯 一個簡單的socket 到 console配置 # 定義這個agent中各元件的名字 a1.sources = r1 a1.sinks = k1 a1. spring-boot使用AOP統一處理日誌轉載地址:https://blog.csdn.net/w05980598/article/details/79053209 AOP我想大家都很清楚,有時候我們需要處理一些請求日誌,或者對某些方法進行一些監控,如果出現例外情況應該進行怎麼樣的處理,現在,我們從spring boot中引入 hive處理小檔案(進行map、reduce、壓縮、歸檔優化解決)背景 Hive query將運算好的資料寫回hdfs(比如insert into語句),有時候會產生大量的小檔案,如果不採用CombineHiveInputFormat就對這些小檔案進行操作的話會產生大量的map task,耗費大量叢集資源,而且小檔案過多會對namenode造成很 Springboot AOP處理日誌資訊錄入現在凡是企業級的或者稍微大點專案,基本都需要日誌管理. 我這邊在springboot基礎上做了個日誌資訊記錄到資料庫的功能,在這裡備份一下,以後有需要就省的再重寫了. 首先我們得準備好所需要的jar,當然了這裡是pom.xml: Flume hive sink採坑記錄一、hive sink概述hive sink與hdfs sink 想對比,hive sink可以近實時的把資料採集到hive表中,hdfs sink要構建hive外部表去關聯hdfs路徑,並且實時性沒辣麼高。二、注意事項1、Hive表必須設定bucket並且 stored a 基於flume+kafka+storm日誌收集系統搭建基於flume+kafka+storm日誌收集系統搭建 1. 環境 192.168.0.2 hadoop1 192.168.0.3 hadoop2 192.168.0.4 hadoop3 已經 log4j+flume+HDFS實現日誌儲存參考:http://blog.csdn.net/sum__mer/article/details/52474443 376 hadoop dfs -chown -R hadoop:hadoop /flume 377 hdfs dfs -chown flume的導日誌資料到hdfs1.更改配置檔案 #agent名, source、channel、sink的名稱 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #具體定義source a1.sources.r1.type = spooldir a1.sour |