使用python監控HDFS檔案的增量【優化中】
目錄
1、需求和步驟
2、專案結構
3、專案程式碼
3.1建表語句 hdfs_Ctreate_table
3.2刪除檔案記錄 hdfs_delete_file_record.py
3.3檔案路徑的小時監控 hdfs_path_Monitor.py
3.4檔案路徑的天監控 hdfs_path_Monitor_day.py
3.5檔案大小記錄 hdfs_size.py
3.6mysql連線資訊
3.7mysql工具類 mysqlHelper.py
3.8工具類utils.py
4、結果展示
————————————————————————————-
1、需求和步驟
需求:
1、獲取HDFS每個資料夾每小時和每天的增量
2、定時刪除HDFS任務歷史記錄
需求1步驟:
1、獲取到HDFS所有檔名稱,參考:hdfs_size.py
2、採用遞迴獲取檔案大小,參考:hdfs_path_Monitor.py
3、每天計算檔案大小,參考:hdfs_path_Monitor_day.py
需求2步驟:
1、查一個月前的所有檔案儲存到hdfs_delete_file_record中,標記狀態為0
2、通過mysql查詢標記為0的資料,獲取對應路徑
3、通過hadoop fs -rm -r -skipTrash + path 進行刪除
4、刪除之後標記為1
2、專案結構
3、專案程式碼
3.1建表語句 hdfs_Ctreate_table
CREATE TABLE hdfs_delete_file_record(
date VARCHAR(25) not null,
p_name VARCHAR(255) not null,
status VARCHAR(5) not null,
primary key (date,p_name)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE hdfs_path_Monitor(
id INT not NULL,
p_id INT not NULL,
p_name VARCHAR(255) not null,
date VARCHAR(25) not null,
dir_kb float not null,
hour_increase float not null,
day_increase float not null,
primary key (id,p_id,p_name,date)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE hdfs_path_Monitor_temp(
id INT not NULL,
p_id INT not NULL,
p_name VARCHAR(255) not null,
date VARCHAR(25) not null,
dir_kb float not null,
hour_increase float not null,
day_increase float not null,
primary key (id,p_id,p_name,date)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE hdfs_size(
id INT not NULL,
p_name VARCHAR(255) not null,
date VARCHAR(25) not null,
dir_byte float not null,
primary key (id,p_name,date)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.2刪除檔案記錄 hdfs_delete_file_record.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''
1、查一個月前的所有檔案儲存到hdfs_delete_file_record中,標記狀態為0
2、通過mysql查詢標記為0的資料,獲取對應路徑
3、通過hadoop fs -rm -r -skipTrash + path 進行刪除
4、刪除之後標記為1
'''
import os
import time
import utils
import mysqlHelper
'''30天前的時間'''
monthAgo = str(utils.getBeforeDay(30))[0:16]
'''歷史路徑的列表'''
L = []
# hadoop fs -rm -r -skipTrash 不進入回收站
def cleanOldLog():
values=mysqlHelper.mysqlSearch('select * from hdfs_delete_file_record where STATUS=0')
for line in values:
os.popen("hadoop fs -rm -r -skipTrash " + line[1])
print line[1]
mysqlHelper.mysqlInsert("update hdfs_delete_file_record set STATUS=1 where p_name = '%s' " % (line[1]))
def cleanApplicationHistory():
logs = os.popen('hdfs dfs -ls /user/spark/applicationHistory')
args = []
mysqlHelper.mysqlInsert("DELETE FROM hdfs_delete_file_record where status = '0'")
for line in logs:
lines = line.replace('\n', "").split(" ")
length = len(lines)
'''檔案時間'''
date = str(lines[length - 3]) + " " + str(lines[length - 2])
if date > monthAgo:
continue
'''檔案路徑'''
filePath = lines[length - 1]
L.append(filePath)
sql = "insert into hdfs_delete_file_record (`date`, p_name , status) values (%s, %s, %s)"
args.append((date, filePath, "0"))
mysqlHelper.mysqlBatchInset(sql , args)
def main():
StartTime = time.time()
cleanApplicationHistory()
cleanOldLog()
mysqlHelper.mysqlClose()
EndTime = time.time()
print "所需要的時間為 : " + str(EndTime - StartTime)
if __name__ == "__main__":
main()
3.3檔案路徑的小時監控 hdfs_path_Monitor.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
'''
備註:使用【樹】的結構記錄父子節點
1、使用遞迴查出每層目錄結構
2、使用字典記錄父節點和子節點
2.1、使用getHDFSSize()方法,去除路徑下面的檔案避免多次查詢
2.2、規定層級避免多次查詢
2.3、符合條件的路徑批量插入到mysql中
2.4、符合條件的路徑進入List中,通過遞迴的方法繼續獲取路徑
3、每小時資料進入mysql後,通過hourComputer()方法進行計算
'''
import os
import time
import mysqlHelper
import utils
'''全域性變數系統時間 LocalTime 2017-07-28 14:37:47'''
LocalTime = str(utils.getnowTime())[0:16]
BeforeOneHour = str(utils.getBeforeHour(1))[0:16]
FileNames = set()
'''獲取目錄的曾層級'''
hierarchy = 4
'''p_id的字典'''
data_dict = {}
'''堆方法'''
def digui(list):
if (len(list) == 0) == True:
return 1
else:
gethdfs(list)
'''
有一個細節,當path下面沒有檔案或者path是一個檔案,
則只會放回的引數為本身路徑,所有過濾掉這一層
'''
def gethdfs(pathSet):
print pathSet
for path in pathSet:
L = set()
lines = os.popen('hdfs dfs -du -h %s ' % path)
args = []
for line in lines:
if (line.replace('\n', "").split(' ').pop() == path) == False:
num = line.split(' ')[0]
tags = line.split(' ')[1]
# 檔案大小
fileSize = convertSize(num, tags)
# 檔案路徑
filePath = line.replace('\n', "").split(' ').pop()
# 判斷是否大於層級
if len(filePath.split("/")) > hierarchy:
continue
A = set()
A.add(filePath)
# 判斷是否是檔案
if len((A.difference(FileNames))) == 0:
continue
# print A
# 節點自增ID
Node_ID = add()
mysqlTup = (Node_ID, filePath)
# 通過字典獲取父節點路徑
data_dict[mysqlTup[1]] = Node_ID
p_list = mysqlTup[1].split('/')
# print "p_list ----->", type(p_list), p_list
length = len(p_list)
p_str = '/'.join(p_list[:length - 1]) if isinstance(p_list, list) and len(p_list) > 0 else None
# print "p_str ----->", p_str
p_id = data_dict.get(p_str, 1)
global sql
# 批量插入語句
sql = "insert into hdfs_path_Monitor (id, p_id, p_name, `date`, dir_kb, hour_increase, day_increase) values (%s,%s, %s, %s, %s, %s, %s)"
args.append((mysqlTup[0], p_id, mysqlTup[1], LocalTime, fileSize, '0', '0'))
L.add(line.replace('\n', "").split(' ').pop())
else:
return 1
# 批量語句的執行
mysqlHelper.mysqlBatchInset(sql, args)
if digui(L) == 1:
continue
'''自增函式'''
def add(x=1):
try:
add.sum += x
except AttributeError:
add.sum = x
return add.sum
def convertSize(num, tags):
totals = 0
if tags == "G":
totals = float(str(num)) * 1024 * 1024
elif tags == "M":
totals = float(num) * 1024
elif tags == "K":
totals = float(num)
return totals
'''獲取所有檔名稱列表'''
def getHDFSSize():
results = mysqlHelper.mysqlSearch("select * from hdfs_size")
for row in results:
p_name = row[1]
# 列印結果
FileNames.add(p_name)
'''每小時計算'''
def hourComputer():
''' 每個小時增長計算方法,結果資料插入到臨時表 '''
sql = '''
INSERT INTO hdfs_path_Monitor_temp(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
select h3.id as id, h3.p_id as p_id, h3.p_name as p_name,h3.date as date, h3.dir_kb as dir_kb,ABS((h3.dir_kb - h2.dir_kb)) as
hour_increase,h3.day_increase as day_increase
from hdfs_path_Monitor h2
RIGHT JOIN hdfs_path_Monitor h3 on h3.p_name = h2.p_name
where h2.date=\'%s\' and h3.date=\'%s\'
''' % (BeforeOneHour, LocalTime)
mysqlHelper.mysqlInsert(sql)
'''刪除hdfs_path_Monitor中為當前時間的資料'''
delete_Monitor_table = "DELETE from hdfs_path_Monitor where date=\'%s\'" % (LocalTime)
mysqlHelper.mysqlInsert(delete_Monitor_table)
'''將hdfs_path_Monitor_temp表中的資料插入到hdfs_path_Monitor'''
insert_Monitor_table = '''
INSERT INTO hdfs_path_Monitor(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
SELECT *
FROM hdfs_path_Monitor_temp
'''
mysqlHelper.mysqlInsert(insert_Monitor_table)
'''刪除臨時表的資料'''
insert_Temp_table = "DELETE from hdfs_path_Monitor_temp where date=\'%s\'" % (LocalTime)
mysqlHelper.mysqlInsert(insert_Temp_table)
def main():
if LocalTime[11:16] == "00:00":
exit()
StartTime = time.time()
path = "/"
pathSet = set()
pathSet.add(path)
mysqlHelper.mysqlInsert("delete from hdfs_path_Monitor where DATE =\'%s\';" % (LocalTime))
mysqlHelper.mysqlInsert("insert into hdfs_path_Monitor values(%s, %s, \'%s\', \'%s\', %s, %s, %s)" % (
add(), 0, path, LocalTime, 0.00, 0.00, 0.00))
getHDFSSize()
gethdfs(pathSet)
hourComputer()
mysqlHelper.mysqlClose()
EndTime = time.time()
print "所需要的時間為 : " + str(EndTime - StartTime) + "插入的數量為 :" + str(add() - 1)
if __name__ == "__main__":
main()
3.4檔案路徑的天監控 hdfs_path_Monitor_day.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
import time
import mysqlHelper
import utils
'''全域性變數系統時間 LocalTime 2017-07-28 14:37:47'''
LocalTime = str(utils.getnowTime())[0:16]
BeforeOneDay = str(utils.getBeforeHour(24))[0:16]
def hourComputer():
sql = '''
INSERT INTO hdfs_path_Monitor_temp(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
select h3.id as id, h3.p_id as p_id, h3.p_name as p_name,h3.date as date, h3.dir_kb as dir_kb,h3.hour_increase as
hour_increase,ABS((h3.hour_increase - h2.hour_increase)) as day_increase
from hdfs_path_Monitor h2
RIGHT JOIN hdfs_path_Monitor h3 on h3.p_name = h2.p_name
where h2.date=\'%s\' and h3.date=\'%s\'
''' %( BeforeOneDay , LocalTime )
mysqlHelper.mysqlInsert(sql)
delete_Monitor_table = "DELETE from hdfs_path_Monitor where date=\'%s\'" %(LocalTime)
mysqlHelper.mysqlInsert(delete_Monitor_table)
insert_Monitor_table = '''
INSERT INTO hdfs_path_Monitor(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
SELECT *
FROM hdfs_path_Monitor_temp
'''
mysqlHelper.mysqlInsert(insert_Monitor_table)
insert_Temp_table = "DELETE from hdfs_path_Monitor_temp where date=\'%s\'" %(LocalTime)
mysqlHelper.mysqlInsert(insert_Temp_table)
def main():
StartTime = time.time()
hourComputer()
mysqlHelper.mysqlClose()
EndTime = time.time()
print "所需要的時間為 : " + str(EndTime - StartTime)
if __name__ == "__main__":
main()
3.5檔案大小記錄 hdfs_size.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import os
import time
import mysqlHelper
import utils
'''
1、通過命令(hdfs dfs -ls -R /)獲取到所有檔案的全部內容
2、將檔案按條處理,空格切分,第一個欄位的第一個字元為橫杆‘-’,則為檔案
3、通過mysql批量插入
'''
LocalTime = utils.getnowTime()
def gethdfsSize():
lines = os.popen('hdfs dfs -ls -R / ')
args = []
mysqlHelper.mysqlInsert("delete from hdfs_size")
for line in lines:
str = line.replace('\n', "").split(' ')
filePath = str.pop()
dir_byte = str[len(str) - 3]
sql = "insert into hdfs_size (id, p_name, `date`, dir_byte) values (%s, %s, %s, %s)"
if ((str[0][0] == "-") == True):
args.append((add(), filePath, LocalTime, dir_byte))
mysqlHelper.mysqlBatchInset(sql, args)
def add(x=1):
try:
add.sum += x
except AttributeError:
add.sum = x
return add.sum
def main():
StartTime = time.time()
gethdfsSize()
mysqlHelper.mysqlClose()
EndTime = time.time()
print "所需要的時間為 : " + str(EndTime - StartTime) + "插入的數量為 :" + str(add() - 1)
if __name__ == "__main__":
main()
3.6mysql連線資訊
# database source
[downdb]
host = xxx.xxx.xxx.xxx
port = 3306
user = funnel
pass = [email protected]<2wsx
dbName = user_privileges
[ondb]
host = xxx.xxx.xxx.xxx
port = 3306
user = funnel
pass = [email protected]<2wsx
dbName = bi_data
3.7mysql工具類 mysqlHelper.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import ConfigParser
import codecs
import pymysql
DBSource = "ondb"
cp = ConfigParser.SafeConfigParser()
with codecs.open('myapp.conf', 'r', encoding='utf-8') as f:
cp.readfp(f)
'''mysql的連線'''
conn = pymysql.connect(host=cp.get(DBSource, 'host'), user=cp.get(DBSource, 'user'), password=cp.get(DBSource, 'pass'),
database=cp.get(DBSource, 'dbName'), use_unicode=True)
'''mysql的遊標'''
cursor = conn.cursor()
def mysqlInsert(sql):
print "遊標 ---> ", type(cursor), cursor
print "sql ---> ", type(sql), sql
cursor.execute(sql)
conn.commit()
def mysqlSearch(sql):
cursor.execute(sql)
return cursor.fetchall()
def mysqlBatchInset(sql, args):
cursor.executemany(sql, args)
conn.commit()
def mysqlClose():
conn.close()
if __name__ == "__main__":
sql = "select * from hdfs_path_Monitor limit 10 "
# 使用 fetchone() 方法獲取一條資料庫。
values = mysqlSearch(sql)
for line in values:
print line
mysqlClose()
3.8 工具類utils.py
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import time
from datetime import datetime, timedelta
def getBeforeDay(num):
return str(datetime.now() + timedelta(days=-num))[0:19]
def getLastDay(num):
return str(datetime.now() + timedelta(days=num))[0:19]
def getBeforeHour(num):
return str(datetime.now() + timedelta(hours=-num))[0:19]
def getLastHour(num):
return str(datetime.now() + timedelta(hours=num))[0:19]
def getnowTime():
return str(datetime.now())[0:19]
if __name__ == "__main__":
print str(getBeforeHour(1))[0:16]
4、結果展示
如果您喜歡我寫的博文,讀後覺得收穫很大,不妨小額贊助我一下,讓我有動力繼續寫出高質量的博文,感謝您的讚賞!!!
相關推薦
使用python監控HDFS檔案的增量【優化中】
目錄 1、需求和步驟 2、專案結構 3、專案程式碼 3.1建表語句 hdfs_Ctreate_table 3.2刪除檔案記錄 hdfs_delete_file_record.py 3.3檔案路徑的小時監控
python 模擬casio復數計算器【施工中】
定義 cas 處理 施工 取模 你會 運算 問題 數字 1. 前期準備 對於括號有這樣的規則: ——只有( 可以多於) 的個數,此時在行尾補) 。 ——取模|...|不能嵌套。(在casio中沒有這個問題,因為每按一次取模鍵,你會得到兩個|,所以可以定義他們的大小) 計
【優化版】Java檔案上傳資料庫(並儲存本地)、word轉pdf並進行頁面預覽
上一篇檔案上傳【點選跳轉】,是將路徑等檔案資訊存進log_file臨時表,內容二進位制存入資料庫Test表,這種邏輯是在呼叫資料庫表Test內容展示時,判斷檔案為word(轉換成pdf)還是pdf(直接展示)。 上一篇連結:連結地址。 下面進一步優化: 具體邏輯
spark streaming監控HDFS檔案目錄
叢集環境:CDH5.8.0 / spark1.6.0 / scala2.10.4基於Scala的基本使用方式如下:package com.egridcloud.sparkstreaming import org.apache.hadoop.conf.Configuratio
python讀取HDFS檔案
浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>
斯坦福tensorflow課程 課後作業代碼【更新中】
solution lob cor exc ons build ops 代碼 eth Assignment1 Problem1: Ops excercises Problem2: Task1:Improving the accuracy of our
【20171025中】alert(1) to win 腳本渲染自建
rip doc function 最簡 turn cti fire func cape 遊戲誤人生,一下午玩了將近四個小時的三國殺,後悔不已,然後重新拾起xss challenge,突發奇想,自己構建渲染後的html。 從最簡單的開始。 自動檢測htm
CSS - 移動端 常見小bug整理與解決方法總結【更新中】
mic ros class clas 問題 像素 css strong 常見問題 常見問題總結與整理系列~ 1. border一像素在手機上看著有點粗的問題: 原理是因為:1px在手機上是使用2dp進行渲染的 換成 border: 0.5像素?是不行的!
【20171115中】nmap 使用腳本爆破telnet密碼
pts style logs images lan clas ip add bbbb 提升 今天老黑走出了低谷,設置了懲罰機制後效率提升了很多,現在寫一個使用nmap檢測目標主機漏洞和利用漏洞的文章,話不多說,直接開始! 0x01:環境介紹 主機A:系統 - k
《Linux運維架構師課程 - 門徒班》【招生中】
linux運維課程簡介 阿良的課程內容主要以企業核心技術為講解對象,避免過多在企業中很少用的技術,從而減少學習負擔,這樣就可以把精力主要花費在更重要的技術上, 而不像其他培訓機構那樣,講很多高大上的技術名詞,其中可能50%的知識在工作中都用不到,學員抓不住重點,時間長了就忘了。 所以,阿良的教學模
《從Docker到Kubernetes企業應用實戰課程 - 集訓班》【招生中】
docker課程簡介 本課程是一個Docker技術集訓班,實戰為主,幫助你快速掌握這門主流的技術,能勝任Docker相關工作,同時為簡歷添上靚麗一筆。 Docker是一個開源的應用容器引擎,將項目及依賴打包到一個容器中,然後發布到任意Linux服務器上。 Docker主要特點:開箱即用,快速
【圖(中)】小白專場: 哈利·波特的考試
題目: 這門課是用魔咒將一種動物變成另一種動物。例如將貓變成老鼠的魔咒是haha,將老鼠變成魚的魔咒是hehe,把貓變成魚,魔咒lalala。反方向變化的魔咒就是簡單地將原來的魔咒倒過來念,例如ahah可以將老鼠變成貓。 只允許帶一隻動物,考察把這隻動物變成任意一隻指定動物的本事。於是
【圖(中)】最短路徑問題
1、最短路徑問題的抽象 在網路中,求兩個不同頂點之間的所有路徑中,邊的權值之和最小的那一條路徑 這條路徑就是兩點之間的最短路徑(Shortest Path) 第一個頂點為源點(Source) 最後一個頂點為終點(Destination)
Python 基本資料型別之【編碼問題】
content 字符集 常見的字符集舉例 位元組和字串之間的轉換 編碼問題 1. 字符集 a="A" a=b"A" # 計算機中儲存只是0101二進位制程式碼 # 字符集: 一堆字元的集合,用來制定當前的字元對映成計算機中儲存的ascii規則
Python 基本資料型別之【格式化問題】
content **%**進行格式化 format進行格式化 f常量進行格式化 1. %進行格式化 # 有幾個%佔位,後面就跟著幾個變數或者值,按照順序對應 # 在python2的時候使用的比較多 # %s-----str(變數) # %d %f
android全面屏顯示不全解決方案【更新中...】
一、宣告最大螢幕縱橫比(官方適配方案) Android官方提供了適配方案,即提高App所支援的最大螢幕縱橫比,實現很簡單,在AndroidManifest.xml中可做如下配置: <meta-data android:name="android.max_aspect"
AJAX和from-上傳檔案示例【django專案】
專案簡述 本Django專案為測試例項專案,用於學習測試。 分別用三種Django檔案上傳方式(form方式、jQuery+jQuery.ajax方式、原生JS+原生ajax方式)做上傳功能示例 檔案檔案釋義 form_upload.htmlform上傳檔案靜態頁面 jquery_ajax_upl
Python&Selenium 資料驅動【unittest+ddt】
一、摘要 本博文將介紹Python和Selenium做自動化測試的時候,基於unittest框架,藉助ddt實現資料驅動 二、測試程式碼 # encoding = utf-8 """ __title__ = '' __author__ = 'davieyang' __mtime__ = '2018
人臉識別,解析MS-Celeb-1M人臉資料集及FaceImageCroppedWithAlignment.tsv檔案提取 【人臉識別】解析MS-Celeb-1M人臉資料集及FaceImageCroppedWithAlignment.tsv檔案提取
原 【人臉識別】解析MS-Celeb-1M人臉資料集及FaceImageCroppedWithAlignment.tsv檔案提取 2018年09月19日 13:11:54
ctf sql注入關鍵詞繞過【積累中】
寫在前面:這個部落格知識點來源於個人ctf練習比賽中積累的知識點及網路中各個部落格的總結點,這裡做測試和記錄0x00 sql注入理解 SQL注入能使攻擊者繞過認證機制,完全控制遠端伺服器上的資料庫。 SQL是結構化查詢語言的簡稱,它是訪問資料庫的事實標準。目前,大多數We