python實現tail -f功能
這篇文章最初是因為reboot的群裏,有人去面試,筆試題有這個題,不知道怎麽做,什麽思路,就發群裏大家討論
我想了一下,簡單說一下我的想法吧,當然,也有很好用的pyinotify模塊專門監聽文件變化,不過我更想介紹的,是解決的思路,畢竟作為面試官,還是想看到一下解決問題的思路,而且我覺得這一題的難點不在於監控文件增量,而在於怎麽打印最後面10行
希望大家讀這篇文章前,對python基礎、處理文件和常用模塊有一個簡單的了解,知道下面幾個名詞是啥
open(‘a.txt‘)
file.seek
file.tell
time.sleep()
下面思路限於我個人知識,免不了有錯誤和考慮不周的,希望大家有更好的方法提出來,我隨時優化代碼,題目的感覺沒啥太多的坑,下面讓天真爛漫的蝸牛教大家用python實現
怎麽用python實現
其實思路也不難啦
- 打開這個文件,指針移到最後
- 每隔一秒就嘗試readline一下,有內容就打印出來,沒內容就sleep
- 大概就是這麽個意思
監聽文件
思路如下:
- 用open打開文件
- 用seek文件指針,給大爺把跳到文件最後面
- while True進行循環
- 持續不停的readline,如果能讀到內容,打印出來即可
代碼呼之欲出
with open(‘test.txt‘) as f:
f.seek(0,2)
while True:
last_pos = f.tell()
line = f.readline()
if line:
print line
效果圖如下
代碼說明
- seek第二個參數2,意思就是從文件結尾處開始seek,更標準的寫法使用os模塊下面的SEEK_END,可讀性更好
- 只寫出了簡單的邏輯,代碼簡單粗暴,如果這個題目是10分的話,最多也就拿4分吧,不能再多了
優化點
- print有缺陷,每次都是新的一行,換成sys.stdout.write(line)更和諧
- 文件名傳參,不能寫死
- 直接打印可以當成默認行為,具體要做什麽,可以寫成函數處理,這樣我們就可以把新行做其他處理,比如展示在瀏覽器裏
- 加上容錯處理,比如文件不存在會報錯
- while True一直都文件,比較耗性能,每讀一次,間隔一秒比較靠譜
- 調用time.sleep(1)
- 用類來組織代碼
實例代碼如下
# coding=utf-8
import sys
import time
class Tail():
def __init__(self,file_name,callback=sys.stdout.write):
self.file_name = file_name
self.callback = callback
def follow(self):
try:
with open(self.file_name) as f:
f.seek(0,2)
while True:
line = f.readline()
if line:
self.callback(line)
time.sleep(1)
except Exception,e:
print ‘打開文件失敗,囧,看看文件是不是不存在,或者權限有問題‘
print e
使用方法:
# 使用默認的sys.stdout.write打印到屏幕
py_tail = Tail(‘test.txt‘)
py_tail.follow()
# 自己定義處理函數
def test_tail(line):
print ‘xx‘+line+‘xx‘
py_tail1 = Tail(‘test.txt‘, test_tail)
py_tail1.follow()
咦 等等,tail -f 默認還會打印最後10行,這個好像才是這個題目的難點所在,眾所周知,python裏讀文件指針,只能移動到固定位置,不能判斷是哪一行,囧,先實現簡單的,逐漸加強吧
默認打印最後10行
現在這個代碼,大概能拿6分啦,我們還有一個功能沒做,那就是打印最後n行,默認是10行,現在把這個功能加上,加一個函數就行啦
文件小的時候
我們知道,readlines可以獲取所有內容,並且分行,代碼呼之欲出,獲取list最後10行很簡單有麽有,切片妥妥的
# 演示代碼,說明核心邏輯,完整代碼在下面
last_lines = f.readlines()[-10:]
for line in last_lines:
self.callback(line)
此時代碼變成這樣了
import sys
import time
class Tail():
def __init__(self,file_name,callback=sys.stdout.write):
self.file_name = file_name
self.callback = callback
def follow(self,n=10):
try:
with open(self.file_name) as f:
self._file = f
self.showLastLine(n)
self._file.seek(0,2)
while True:
line = self._file.readline()
if line:
self.callback(line)
except Exception,e:
print ‘打開文件失敗,囧,看看文件是不是不存在,或者權限有問題‘
print e
def showLastLine(self, n):
last_lines = self._file.readlines()[-n:]
for line in last_lines:
self.callback(line)
更進一步:大的日誌怎麽辦
但是如果文件特別大呢,特別是日誌文件,很容易幾個G,我們只需要最後幾行,全部讀出來內存受不了,所以我們要繼續優化showLastLine函數,我覺得這才是這題的難點所在
大概的思路如下
- 我先估計日誌一行大概是100個字符,註意,我只是估計一個大概,多了也沒關系,我們只是需要一個初始值,後面會修正
- 我要讀十行,所以一開始就seek到離結尾1000的位置seek(-1000,2),把最後1000個字符讀出來,然後有一個判斷
- 如果這1000個字符長度大於文件長度,不用管了 屬於級別1的情況,直接read然後split
- 如果1000個字符裏面的換行符大於10,說明1000個字符裏,大於10行,那也好辦,處理思路類似級別1,直接split取後面十個
- 問題就在於 如果小於10怎麽處理
- 比如1000個裏,只有四個換行符,說明一行大概是1000/4=250個字符,我們還缺6行,所以我們再讀250*5=1500,一共有2500的大概比較靠譜,然後在對2500個字符進行相同的邏輯判斷,直到讀出的字符裏,換行符大於10,讀取結束
邏輯清晰以後,代碼就呼之欲出啦
# coding=utf-8
import sys
import time
class Tail():
def __init__(self,file_name,callback=sys.stdout.write):
self.file_name = file_name
self.callback = callback
def follow(self,n=10):
try:
with open(self.file_name) as f:
self._file = f
self._file.seek(0,2)
self.file_length = self._file.tell()
self.showLastLine(n)
while True:
line = self._file.readline()
if line:
self.callback(line)
time.sleep(1)
except Exception,e:
print ‘打開文件失敗,囧,看看文件是不是不存在,或者權限有問題‘
print e
def showLastLine(self, n):
len_line = 100
read_len = len_line*n
while True:
if read_len>self.file_length:
self._file.seek(0)
last_lines = self._file.read().split(‘\n‘)[-n:]
break
self._file.seek(-read_len, 2)
last_words = self._file.read(read_len)
count = last_words.count(‘\n‘)
if count>=n:
last_lines = last_words.split(‘\n‘)[-n:]
break
else:
if count==0:
len_perline = read_len
else:
len_perline = read_len/count
read_len = len_perline * n
for line in last_lines:
self.callback(line+‘\n‘)
if __name__ == ‘__main__‘:
py_tail = Tail(‘test.txt‘)
py_tail.follow()
加上註釋的版本
# coding=utf-8
import sys
import time
class Tail():
def __init__(self,file_name,callback=sys.stdout.write):
self.file_name = file_name
self.callback = callback
def follow(self,n=10):
try:
# 打開文件
with open(self.file_name) as f:
self._file = f
self._file.seek(0,2)
# 存儲文件的字符長度
self.file_length = self._file.tell()
# 打印最後10行
self.showLastLine(n)
# 持續讀文件 打印增量
while True:
line = self._file.readline()
if line:
self.callback(line)
time.sleep(1)
except Exception,e:
print ‘打開文件失敗,囧,看看文件是不是不存在,或者權限有問題‘
print e
def showLastLine(self, n):
# 一行大概100個吧 這個數改成1或者1000都行
len_line = 100
# n默認是10,也可以follow的參數傳進來
read_len = len_line*n
# 用last_lines存儲最後要處理的內容
while True:
# 如果要讀取的1000個字符,大於之前存儲的文件長度
# 讀完文件,直接break
if read_len>self.file_length:
self._file.seek(0)
last_lines = self._file.read().split(‘\n‘)[-n:]
break
# 先讀1000個 然後判斷1000個字符裏換行符的數量
self._file.seek(-read_len, 2)
last_words = self._file.read(read_len)
# count是換行符的數量
count = last_words.count(‘\n‘)
if count>=n:
# 換行符數量大於10 很好處理,直接讀取
last_lines = last_words.split(‘\n‘)[-n:]
break
# 換行符不夠10個
else:
# break
#不夠十行
# 如果一個換行符也沒有,那麽我們就認為一行大概是100個
if count==0:
len_perline = read_len
# 如果有4個換行符,我們認為每行大概有250個字符
else:
len_perline = read_len/count
# 要讀取的長度變為2500,繼續重新判斷
read_len = len_perline * n
for line in last_lines:
self.callback(line+‘\n‘)
if __name__ == ‘__main__‘:
py_tail = Tail(‘test.txt‘)
py_tail.follow(20)
效果如下,終於可以打印最後幾行了,大家可以試一下,無論日誌每行只有1個,還是每行有300個字符,都是可以打印最後10行的
能做到這個地步,一般的面試官就直接被你搞定了,這個代碼大概8分吧,如果還想再進一步,還有一些可以優化的地方,代碼放github上了,有興趣的可以拿去研究
待優化:留給大家作為擴展
- 如果你tail -f的過程中,日誌被備份清空或者切割了,怎麽處理
- 其實很簡單,你維護一個指針位置,如果下次循環發現文件指針位置變了,從最新的指針位置開始讀就行
- 具體每種錯誤的類型
- 我這裏只是簡單的try 可以寫個函數,把文件不存在,文件沒權限這些報錯信息分開
- 性能小優化,比如我第一次讀了1000,只有4行,我大概估計每行250個字符,總體需要2500行,下次沒必要直接讀2500個,而是讀1500行和之前1000行拼起來即可
最後大殺器 如果寫出來這個,基本面試官會直接
import os
def tail(file_name):
os.system(‘tail -f ‘+file_name)
tail(‘log.log‘)
打死你
閱讀原文
python實現tail -f功能