1. 程式人生 > 程式設計 >pandas apply多執行緒實現程式碼

pandas apply多執行緒實現程式碼

一、多執行緒化選擇

並行化一個程式碼有兩大選擇:multithread 和 multiprocess。

Multithread,多執行緒,同一個程序(process)可以開啟多個執行緒執行計算。每個執行緒代表了一個 CPU 核心,這麼多執行緒可以訪問同樣的記憶體地址(所謂共享記憶體),實現了執行緒之間的通訊,算是最簡單的並行模型。

Multiprocess,多程序,則相當於同時開啟多個 Python 直譯器,每個直譯器有自己獨有的資料,自然不會有資料衝突。

二、並行化思想

並行化的基本思路是把 dataframe 用 np.array_split 方法切割成多個子 dataframe。再呼叫 Pool.map 函式並行地執行。注意到順序執行的 pandas.DataFrame.apply 是如何轉化成 Pool.map 然後並行執行的。

Pool 物件是一組並行的程序,開源Pool類

開源Pool類定義

 def Pool(self,processes=None,initializer=None,initargs=(),maxtasksperchild=None):
    '''Returns a process pool object'''
    from .pool import Pool
    return Pool(processes,initializer,initargs,maxtasksperchild,context=self.get_context())

設定程序初始化函式

def init_process(global_vars):
  global a
  a = global_vars

設定程序初始化函式

Pool(processes=8,initializer=init_process,initargs=(a,))

其中,指定產生 8 個程序,每個程序的初始化需執行 init_process函式,其引數為一個 singleton tuple a. 利用 init_process 和 initargs,我們可以方便的設定需要在程序間共享的全域性變數(這裡是 a)。

with 關鍵詞是 context manager,避免寫很繁瑣的處理開關程序的邏輯。

 with Pool(processes=8,)) as pool:    
    result_parts = pool.map(apply_f,df_parts)

三、多執行緒化應用

多執行緒時間比較和多執行緒的幾種apply應用

import numpy as np
import pandas as pd
import time
from multiprocessing import Pool

def f(row):
  #直接對某列進行操作
  return sum(row)+a

def f1_1(row):
  #對某一列進行操作,我這裡的columns=range(0,2),此處是對第0列進行操作
  return row[0]**2

def f1_2(row1):
  #對某一列進行操作,我這裡的columns=range(0,2),此處是對第0列進行操作
  return row1**2

def f2_1(row):
  #對某兩列進行操作,我這裡的columns=range(0,2),此處是對第0,2列進行操作
  return pd.Series([row[0]**2,row[1]**2],index=['1_1','1_2'])

def f2_2(row1,row2):
  #對某兩列進行操作,我這裡的columns=range(0,2),此處是對第0,2列進行操作
  return pd.Series([row1**2,row2**2],index=['2_1','2_2'])

def apply_f(df):
  return df.apply(f,axis=1)

def apply_f1_1(df):
  return df.apply(f1_1,axis=1)

def apply_f1_2(df):
  return df[0].apply(f1_2)

def apply_f2_1(df):
  return df.apply(f2_1,axis=1)

def apply_f2_2(df):
  return df.apply(lambda row :f2_2(row[0],row[1]),axis=1)
 
def init_process(global_vars):
  global a
  a = global_vars
  
def time_compare():
  '''直接呼叫和多執行緒呼叫時間對比'''
  a = 2
  np.random.seed(0)
  df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
  print(df.columns)
   
  t1= time.time()
  result_serial = df.apply(f,axis=1)
  t2 = time.time()
  print("Serial time =",t2-t1)
  print(result_serial.head())

  
  df_parts=np.array_split(df,20)
  print(len(df_parts),type(df_parts[0]))
  with Pool(processes=8,)) as pool: 
  #with Pool(processes=8) as pool:    
    result_parts = pool.map(apply_f,df_parts)
  result_parallel= pd.concat(result_parts)
  t3 = time.time()
  print("Parallel time =",t3-t2)
  print(result_parallel.head())


def apply_fun():
  '''多種apply函式的呼叫'''
  a = 2
  np.random.seed(0)
  df = pd.DataFrame(np.random.rand(10**5,2))
  print(df.columns)
  df_parts=np.array_split(df,)) as pool: 
  #with Pool(processes=8) as pool:    
    res_part0 = pool.map(apply_f,df_parts)
    res_part1 = pool.map(apply_f1_1,df_parts)
    res_part2 = pool.map(apply_f1_2,df_parts)
    res_part3 = pool.map(apply_f2_1,df_parts)
    res_part4 = pool.map(apply_f2_2,df_parts)

  res_parallel0 = pd.concat(res_part0)
  res_parallel1 = pd.concat(res_part1)
  res_parallel2 = pd.concat(res_part2)
  res_parallel3 = pd.concat(res_part3)
  res_parallel4 = pd.concat(res_part4)
  
  print("f:\n",res_parallel0.head())
  print("f1:\n",res_parallel1.head())
  print("f2:\n",res_parallel2.head())
  print("f3:\n",res_parallel3.head())
  print("f4:\n",res_parallel4.head())

  df=pd.concat([df,res_parallel0],axis=1)
  df=pd.concat([df,res_parallel1],res_parallel2],res_parallel3],res_parallel4],axis=1)

  print(df.head())
      
  
if __name__ == '__main__':
  time_compare()
  apply_fun()

參考網址

https://blog.fangzhou.me/posts/20170702-python-parallelism/

https://docs.python.org/3.7/library/multiprocessing.html

到此這篇關於pandas apply多執行緒實現程式碼的文章就介紹到這了,更多相關pandas apply多執行緒內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!