odps2.0版本pyodps中apply的使用例項備份
############################################
import json
from odps.df import DataFrame
from odps import options
#options.verbose = True
options.sql.settings = {'odps.sql.mapper.split.size': 1}
module = args['module']
plan_id = args['plan_id']
calctime = args['calctime']
############輸入表:bid realday tasklist##############################
table_input = 'temp_qbb_btime_parentingtask3_bid_grouplist_tasklist_'+module+'_'+ plan_id+'_'+calctime;
table_result = 'temp_qbb_btime_parentingtask3_plan_bid_itemdata_'+module+'_'+ plan_id+'_'+calctime;##現將前3列直接拷貝temp_qbb_btime_parentingtask3_plan_bid_taskgroup_
sql = "INSERT OVERWRITE TABLE " + table_result + " SELECT * FROM " + table_result + " WHERE null;"
print(sql)
o.execute_sql(sql)
df1 = o.get_table(table_input).to_df()
df2 = o.get_table(table_result).to_df()
DEBUG = False
if DEBUG:
df1 = df1[:100].to_pandas(wrap=True)
df2 = df2[:100].to_pandas(wrap=True)
############遍歷預設推薦任務表:2個引數 planday grouptask #############
table_defitemdata = 'temp_qbb_btime_parentingtask3_plan_itemdata_default_'+module+'_'+ plan_id+'_'+calctime;
defitemdata = o.get_table(table_defitemdata)
reader = defitemdata.open_reader()
length = reader.count
print(length)
itemdata = {}
for record in reader:
itemdata[record[0]] = json.loads(record[1])
print(len(itemdata))
#print(itemdata)
print(itemdata[0])
#for group in list(itemdata[0]):
# td = group['td']
# g = group['g']
# print(td)
# print(g)
###################### 正式段 ##############################
#處理邏輯
#對於每個bid,通過realday在defitemdata中取對應的group和task
#每個bid,通過realday增加60天或者90天的planday
# dictitem = json.loads(table_defitemdata['itemdata'])
#from odps import options
from odps.df import output
#options.verbose = True
#options.sql.settings = {'odps.sql.mapper.split.size': 1}
@output(['bid','itemdata'], ['int64','string'])##planday,[groupid, tasklid]
def ftasklimit(row):
print('ftasklimit ------------')
taskidlistpre = row.tipidlist.split("\002")
outputdatas = []
outdata = {}
print(str(row.bid)+ 'start' )
for i in range(61):
##planday
planday = row.totaldayreal+i
if (planday > 2190):
planday = 2190
outdata['days'] = planday
outgt = []
try:
listgroup = itemdata[planday]
for group in listgroup:
td = group['td']
if td in taskidlistpre :
outgt.append(group)
outdata['gt'] = outgt
outputdatas.append(outdata)
# row['taskidlist']=json.dumps(outputdatas)
except:
pass
if len(outputdatas) >0:
data = json.dumps(outputdatas)
else:
data = None
print(str(row.bid)+ 'end' )
return row.bid ,data
df1.apply(ftasklimit, axis=1).persist(table_result)
#輸出結果給df3.taskid
##df2['itemdata'] = str(df1.apply(ftasklimit, axis=1))
# df2.persist(table_result)