1. 程式人生 > 實用技巧 >python對檔案進行平行計算初探(二)

python對檔案進行平行計算初探(二)

上次的平行計算是通過將大檔案分割成小檔案,涉及到檔案分割,其實更有效的方法是在記憶體中對檔案進行分割,分別計算

最後將返回結果直接寫入目標檔案,省去了分割小檔案合併小檔案刪除小檔案的過程

程式碼如下:

import math
from multiprocessing import Pool

"""
不分割檔案,直接起多個程序對檔案進行讀寫
apply_async的callback接收的引數是呼叫行數的返回值,err_callback接收的引數是丟擲來的異常
"""


# 使用者業務邏輯
def business(line):
    return line


def my_callback(lines):
    with open(
'output', 'a') as f: f.writelines(lines) # 讀取分塊檔案 class Reader(object): def __init__(self, file_name, start_pos, end_pos, business_func): self.file_name = file_name self.start_pos = start_pos self.end_pos = end_pos self.business_func = business_func
def execute(self): lines = [] with open(self.file_name, 'r') as f: if self.start_pos != 0: f.seek(self.start_pos - 1) if f.read(1) != '\n': line = f.readline() self.start_pos = f.tell() f.seek(self.start_pos)
while self.start_pos <= self.end_pos: line = f.readline() new_line = self.business_func(line) lines.append(new_line) self.start_pos = f.tell() return '\n'.join(lines) + '\n' # 將檔案分成要求的塊數,以list返回起止pos class FileBlock(object): def __init__(self, file_name, block_num): self.file_name = file_name self.block_num = block_num def block_file(self): pos_list = [] with open(self.file_name, 'r') as f: f.seek(0, 2) start_pos = 0 file_size = f.tell() block_size = math.ceil(file_size / self.block_num) while start_pos <= file_size: if start_pos + block_size > file_size: pos_list.append((start_pos, file_size)) else: pos_list.append((start_pos, start_pos + block_size)) start_pos = start_pos + block_size + 1 return pos_list if __name__ == '__main__': concurrency = 8 p = Pool(concurrency) input_file = '/opt/test/target.txt' output_file = '/opt/test/target2.txt' fb = FileBlock(input_file, concurrency) for s, e in fb.block_file(): reader = Reader(input_file, s, e, business) p.apply_async(reader.execute, callback=my_callback) p.close() p.join()