Python_解壓zip以及upload到hdfs
阿新 • • 發佈:2021-11-27
pyhdfs
壓縮
使用zipfile解壓和
使用linux自帶的zip
示例
#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import pyhdfs import zipfile import os import os.path import pandas as pd def unzip_file(path_pair): file = path_pair[0] target = path_pair[1] print("正在解壓%s,解壓目錄%s"%(file,target)) try: with zipfile.ZipFile(file,mode="a") as f: f.extractall(target) # 將檔案解壓到指定目錄 except Exception as e: print("異常:%s"%e) finally: f.close() def unzip_file_gbk(zip_path_pair,target_dir): print("正在解壓%s,解壓目錄%s"%(zip_path_pair,target_dir)) try: with zipfile.ZipFile(zip_path_pair,mode="a") as f: for num, f_name in enumerate(f.namelist()): new_f_name = f_name.encode("cp437").decode("gbk") f.extract(f_name,path=target_dir) os.rename(os.path.join(target_dir,f_name), os.path.join(target_dir,new_f_name)) except Exception as e: print("異常:%s"%e) finally: f.close() def unzip_file(src_leaf_dir,target_dir): zip_file_name = [] [zip_file_name.append(os.path.join(src_leaf_dir, file_name)) for file_name in os.listdir(src_leaf_dir) if file_name.lower().endswith("zip")] for num, zip_file in enumerate(zip_file_name): datset_name = os.path.split(zip_file)[-1].split(".")[0] print(num,datset_name) unzip_file_gbk(zip_file,target_dir) def get_dir_stats(file_dir): result = {} for dirpath,dirnames,filenames in os.walk(file_dir): file_count = 0 for file in filenames: file_count = file_count + 1 result_sig = {dirpath:file_count} result.update(result_sig) return result def get_satify_stats_dir(unzip_target_dir): result_list =[] for num, zip_file in enumerate(os.listdir(unzip_target_dir)): datset_name = zip_file new_dir = os.path.join(unzip_target_dir,datset_name) file_dir_nm = get_dir_stats(new_dir) # max函式要求第一個引數是可迭代內容,這裡我們的dict #第二個引數是一個函式,對迭代的每一項進行處理,將處理 後的結果統一起來進行比較大小, # 返回大的一項的原資料 max_key = max(file_dir_nm, key= file_dir_nm.get) sig_result = [zip_file,max_key,file_dir_nm.get(max_key)] result_list.append(sig_result) return result_list if __name__ == "__main__": # client = pyhdfs.HdfsClient(hosts="test",user_name="test") # 解壓縮 # zip_src_leaf_dir= r"D:\data\test\01" # unzip_target_dir = r'D:\data\test\data_unzip' # unzip_file(zip_src_leaf_dir,unzip_target_dir) unzip_dir = r'D:\data\test\data_unzip' src_dest = get_satify_stats_dir(unzip_dir) meta_file_nm =r"D:\data\test\group_result_01.txt" meta_df = pd.read_csv(meta_file_nm,sep="\t",encoding="utf8") #satisfy_df = meta_df[meta_df["集"] == src_dest[0]] file_res_ls =[] for set_data in src_dest: satisfy_df = meta_df[meta_df["集"] == set_data[0]] satify_result = (set_data[0],set_data[1],set_data[2],satisfy_df["hdfs_dir"].values[0],satisfy_df["new_label"].values[0]) print(satify_result) file_res_ls.append(satify_result) res_df = pd.DataFrame(file_res_ls,columns=["data_nm","src_dir","cnt","hdfs_dir","new_label"]) res_df.to_csv(r"D:\data\test\group_result_hdfs.txt",index=False,header=True,sep="\t") # out.write("\t".join(satify_result) +"\n") # file_result_all.append(satify_result) # out.close() # print(file_result_all)
HDFS檔案上傳
使用自帶的工具
使用pyhdfs
程式碼示例
import pyhdfs import os.path if __name__ == "__main__": client = pyhdfs.HdfsClient(hosts="test",user_name="test") file_nm = r"G:\data\test\group_result_hdfs.txt" with open(file=file_nm,mode='r',encoding="utf8") as f: # 使用next函式 跳過首行 first_line = next(f) for file_num, data in enumerate(f): steList = data.strip().replace("\r","").replace("\n","").split("\t") local_dir = steList[1] hdfs_dir = steList[3] if not client.exists(hdfs_dir) : print("建立資料夾",hdfs_dir) client.mkdirs(hdfs_dir) if client.exists(hdfs_dir): print("upload",local_dir,hdfs_dir) for num,local_file in enumerate(os.listdir(local_dir)): local_src_jpg = os.path.join(local_dir,local_file) hdfs_src_jpg = hdfs_dir +"/"+local_file client.copy_from_local(local_src_jpg,hdfs_src_jpg) print(file_num,num,local_src_jpg,hdfs_src_jpg) else: print(client.exists(hdfs_dir)) print("done")
參考
https://pyhdfs.readthedocs.io/en/latest/pyhdfs.html
python操作hdfs https://www.cnblogs.com/wangbin2188/p/14591230.html