python 之pydhfs 對hdfs 進行操作
阿新 • • 發佈:2018-12-29
################################################################# ################################################################# ################################################################# #### 615 明輝科技### 針對所有子專案開發的公共 utils,不具有業務熟悉 #### 作者: 曹明傑 #---------------------針對hdfs------------------------# # !coding:utf-8 import os from pyhdfs import HdfsClient, HdfsFileNotFoundException from brushtickes.com.mh.brush.brushutils \ import brush_util as bu # 關於python操作hdfs的API可以檢視官網: # https://hdfscli.readthedocs.io/en/latest/api.html # 讀取hdfs檔案內容,將資料放入到本地的目錄的檔案中 #並將讀取的資料返回 def read_hdfs_file( filename,local_path='',**kwargs): #HTTPResponse """Return a file-like object for reading the given HDFS path. :param offset: The starting byte position. :type offset: long :param length: The number of bytes to be processed. :type length: long :param buffersize: The size of the buffer used in transferring data. :type buffersize: int :rtype: file-like object """ client__open = get_client().open(filename,**kwargs) print(client__open.data) if local_path != '': try: open(local_path,'a+').write(client__open.data) except TypeError: open(local_path, 'ab+').write(client__open.data) return client__open.data # 建立目錄 def mkdirs( hdfs_path): get_client().mkdirs(hdfs_path) # 刪除hdfs檔案 def delete_hdfs_file(hdfs_path): get_client().delete(hdfs_path) # 上傳檔案到hdfs def put_to_hdfs_no_flag( local_path, hdfs_path): upload = upload = get_client().copy_from_local(hdfs_path,local_path) print_base_log(upload, '上傳檔案到hdfs,並把本地上傳的檔案標誌位 success 已經完成', 'put_to_hdfs') # 上傳檔案到hdfs,並把本地上傳的檔案標誌位 success 已經完成 def put_to_hdfs_flag( local_path, hdfs_path): upload = get_client().copy_from_local(local_path,hdfs_path) os.rename(local_path,"success_"+local_path+str(bu.get_new_time())) print_base_log(upload,'上傳檔案到hdfs,並把本地上傳的檔案標誌位 success 已經完成','put_to_hdfs_flag') # 從hdfs獲取檔案到本地 def get_from_hdfs( hdfs_path, local_path): get_client().copy_to_local(hdfs_path,local_path) # 追加資料到hdfs檔案 def append_to_hdfs( hdfs_path, data): get_client().append(hdfs_path,data) # 覆蓋資料寫到hdfs檔案 def write_to_hdfs(hdfs_path, data): exists = get_client().exists(hdfs_path) if exists: get_client().delete(hdfs_path) get_client().append(hdfs_path,data) # 移動或者修改檔案 def move_or_rename(hdfs_src_path, hdfs_dst_path): get_client().rename(hdfs_src_path,hdfs_dst_path) # 返回目錄下的檔案資訊 def list( hdfs_path): try: resp = get_client().listdir(hdfs_path) print_base_log(str(resp),hdfs_path+'下的目錄檔案資訊','list') except NotADirectoryError: print_base_log("該"+hdfs_path+"不是資料夾", hdfs_path + '下的目錄檔案資訊', 'list') except HdfsFileNotFoundException: print_base_log("該" + hdfs_path + "不存在", hdfs_path + '下的目錄檔案資訊', 'list') return resp #這裡訪問的是50070 埠 def get_client(hdfs_url='hdfs://node1:50070'): return HdfsClient(hosts='node1:50070', user_name='root') def print_base_log(obj,item='hdfs',option=''): bu.print_custom_masg(obj, item,option, 'base_utils.py') def put_to_hdfs_flag2(local_path,hdfs_path): client = HdfsClient(hosts='node1:50070', user_name='root') client.copy_from_local(local_path,hdfs_path) # 本地檔案絕對路徑,HDFS目錄必須不存在
親測可行。就發出來給大家參考