Python-threadpool多執行緒多個引數傳入示例
阿新 • • 發佈:2019-02-12
主要找到了兩種方法,一種是將引數構造成List進行傳入;還有一種是將引數構造成dict進行傳入。
樣例程式碼:
/Users/nisj/PycharmProjects/EsDataProc/bi-static/ThreadPool_multiPar.py
此例項也是文章【Python控制資料(留存及支付資訊)按周進行跑批處理--->http://blog.csdn.net/babyfish13/article/details/54096306】的後續。
樣例程式碼:
/Users/nisj/PycharmProjects/EsDataProc/bi-static/ThreadPool_multiPar.py
# -*- coding=utf-8 -*- import threadpool import time def Main_Def(par1, par2, par3): print "par1 = %s, par2 = %s, par3 = %s" % (par1, par2, par3) if __name__ == '__main__': # 方法1 list_var1 = ['1', '2', '3'] list_var2 = ['4', '5', '6'] par_list = [(list_var1, None), (list_var2, None)] # 方法2 # dict_var1 = {'par1': '1', 'par2': '2', 'par3': '3'} # dict_var2 = {'par1': '4', 'par2': '5', 'par3': '6'} # par_list = [(None, dict_var1), (None, dict_var2)] pool = threadpool.ThreadPool(2) requests = threadpool.makeRequests(Main_Def, par_list) [pool.putRequest(req) for req in requests] time.sleep(1) pool.wait()
用於實際生產處理的例項:
/Users/nisj/PycharmProjects/EsDataProc/bi-static/Hive_remain_pay_byWeek_runing.py
# -*- coding=utf-8 -*- import warnings import datetime import time import os import re import threadpool warnings.filterwarnings("ignore") today = datetime.date.today() yesterday = today - datetime.timedelta(days=1) tomorrow = today + datetime.timedelta(days=1) now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "當前時間是:",now_time def dateRange(beginDate, endDate): dates = [] dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d") date = beginDate[:] while date <= endDate: dates.append(date) dt = dt + datetime.timedelta(1) date = dt.strftime("%Y-%m-%d") return dates def weekRang(beginDate, endDate): week = set() for date in dateRange(beginDate, endDate): week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2]) wk_l = [] for wl in sorted(list(week)): wk_l.append(str(wl[0])+'#'+str(wl[1])) return wk_l def hisWeekList(curr_week): last_wk = datetime.datetime.now() - datetime.timedelta(days=7) end_day = str(last_wk)[0:10] his_week_list = [] for week in weekRang('2015-07-01', end_day): if (int(week[0:4]) == int(curr_week[0:4]) and int(week[5:]) <= int(curr_week[5:])) or (int(week[0:4]) < int(curr_week[0:4])): his_week_list.append(week) return his_week_list def getLastWeek(d): dayscount = datetime.timedelta(days=d.isoweekday()) dayto = d - dayscount sixdays = datetime.timedelta(days=6) dayfrom = dayto - sixdays return str(dayfrom)[0:10], str(dayto)[0:10] def hisRunWeekList(): d = datetime.datetime.now() dayfrom = getLastWeek(d)[0] dayto = getLastWeek(d)[1] curr_week = weekRang(dayfrom, dayto)[0] batch_week_list = [] for his_week in hisWeekList(curr_week): if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])): batch_week_list.append(([curr_week, his_week],None)) return batch_week_list def user_remain_proc(curr_week ,his_week): os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \ delete from bi_user_remain_pay_byweek where data_week='%s' and remain_week='%s'; \ " """ % (his_week, curr_week)) newuser_remain_pay_data = os.popen("""source /etc/profile; \ /usr/lib/hive-current/bin/hive -e " \ add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \ create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \ with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_all_access_log_of_new \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,RadixChange(uid,16,10)), \ curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \ from bi_all_access_log \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by appsource,appkey,identifier,RadixChange(uid,16,10)), \ curr_week_pay as (select uid,sum(amount) amount \ from data_chushou_pay_info \ where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \ group by uid) \ select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,sum(a3.amount) pay_amount \ from his_new_user a1 \ inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \ left join curr_week_pay a3 on a1.uid=a3.uid \ group by a1.appkey,a1.appsource \ ;" \ """ % (his_week, curr_week, curr_week)).readlines(); nrpd_list = [] for nrp_list in newuser_remain_pay_data: nrp = re.split('\t', nrp_list.replace('\n', '')) nrpd_list.append(nrp) for nrpd in nrpd_list: remain_week = curr_week appkey = nrpd[0] appsource = nrpd[1] remain_cnt = nrpd[2] pay_amount = nrpd[3] etl_time = time.strftime('%Y-%m-%d %X', time.localtime()) os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \ insert into bi_user_remain_pay_byweek(data_week,appsource,appkey,remain_week,remain_cnt,pay_amount,etl_time) \ select '%s','%s','%s','%s','%s','%s','%s'; \ " """ % (his_week, appsource, appkey, remain_week, remain_cnt, pay_amount, etl_time)) batch_week_list = hisRunWeekList() requests = threadpool.makeRequests(user_remain_proc, batch_week_list) main_pool = threadpool.ThreadPool(13) [main_pool.putRequest(req) for req in requests] if __name__ == '__main__': while True: try: time.sleep(9) main_pool.poll() except KeyboardInterrupt: print("**** Interrupted!") break except threadpool.NoResultsPending: break if main_pool.dismissedWorkers: print("Joining all dismissed worker threads...") main_pool.joinAllDismissedWorkers() now_time = time.strftime('%Y-%m-%d %X', time.localtime()) print "當前時間是:",now_time
此例項也是文章【Python控制資料(留存及支付資訊)按周進行跑批處理--->http://blog.csdn.net/babyfish13/article/details/54096306】的後續。