1. 程式人生 > >python實例pyspark

python實例pyspark

python 實例 pyspark

%pyspark


#查詢認證用戶


import sys
#import MySQLdb
import mysql.connector
import pandas as pd
import datetime
import time

optmap = {
‘dbuser‘ : ‘haoren‘,
‘dbpass‘ : ‘G4d‘,
‘dbhost‘ : ‘172.12.112.5‘,
‘dbport‘ : 3306,
‘dbname‘ : ‘GMDB‘
}



def sql_select(reqsql):
ret = ‘‘
try:
db_conn = mysql.connector.connect(user=optmap[‘dbuser‘], password=optmap[‘dbpass‘], host=optmap[‘dbhost‘], port=optmap[‘dbport‘], database=optmap[‘dbname‘])
db_cursor=db_conn.cursor()
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()

except mysql.connector.Error as e:
print (‘Error : {}‘.format(e))
finally:
db_cursor.close()
db_conn.close
return ret


userlist = []
def renzhengsingger(startday,endday):
t1 = int(time.mktime(time.strptime(startday,‘%Y-%m-%d %H:%M:%S‘)) )
t2 = int(time.mktime(time.strptime(endday,‘%Y-%m-%d %H:%M:%S‘)))

for n in range(0,10):
reqsql = "select PERFORMERID,sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%d where STARTTIME >=%s and STARTTIME <%s group by PERFORMERID ;" %(n,t1,t2)
ret = sql_select(reqsql)

userlist.append(ret)
#print userlist
for i in range(0,10):
for p in userlist[i]:
print p[0],p[1]

renzhengsingger(‘2017-08-01 00:00:00‘,‘2017-09-01 00:00:00‘)




======================================================================================================================

%pyspark

#查詢認證用戶

import sys
#import MySQLdb
import mysql.connector
import pandas as pd
import datetime
import time

optmap = {
‘dbuser‘ : ‘haoren‘,
‘dbpass‘ : ‘G4d‘,
‘dbhost‘ : ‘172.12.112.8‘,
‘dbport‘ : 3306,
‘dbname‘ : ‘IMDB‘
}

optmap1 = {
‘dbuser‘ : ‘haoren‘,
‘dbpass‘ : ‘G4d‘,
‘dbhost‘ : ‘172.12.112.5‘,
‘dbport‘ : 3306,
‘dbname‘ : ‘GMDB‘
}


def sql_select(reqsql):
ret = ‘‘
try:
db_conn = mysql.connector.connect(user=optmap[‘dbuser‘], password=optmap[‘dbpass‘], host=optmap[‘dbhost‘], port=optmap[‘dbport‘], database=optmap[‘dbname‘])
db_cursor=db_conn.cursor()
count = db_cursor.execute(reqsql)
ret = db_cursor.fetchall()
except mysql.connector.Error as e:
print (‘Error : {}‘.format(e))
finally:
db_cursor.close()
db_conn.close
return ret

def sql_select1(reqsql1):
ret = ‘‘
try:
db_conn1 = mysql.connector.connect(user=optmap1[‘dbuser‘], password=optmap1[‘dbpass‘], host=optmap1[‘dbhost‘], port=optmap1[‘dbport‘], database=optmap1[‘dbname‘])
db_cursor1=db_conn1.cursor()
count = db_cursor1.execute(reqsql1)
ret1 = db_cursor1.fetchall()
except mysql.connector.Error as e:
print (‘Error : {}‘.format(e))
finally:
db_cursor1.close()
db_conn1.close
return ret1


#定義查詢認證用戶函數
def renzhengsingger(startday,endday):
t1 = int(time.mktime(time.strptime(startday,‘%Y-%m-%d %H:%M:%S‘)) )
t2 = int(time.mktime(time.strptime(endday,‘%Y-%m-%d %H:%M:%S‘)))
reqsql = "select PERFORMERID,from_unixtime(ADDTIME) from PERFORMERINFO where ADDTIME >=%s and ADDTIME < %s" %(t1,t2)
ret = sql_select(reqsql)
for i in ret:
#print i[0]
id = int(i[0])%10
reqsql1 = "select sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%d where STARTTIME >=%s and STARTTIME <%s and PERFORMERID=%d" %(id,t1,t2,i[0])
#print reqsql1
ret1 = sql_select1(reqsql1)
print i[0],",",i[1],",",ret1[0][0]

renzhengsingger(‘2017-08-01 00:00:00‘,‘2017-09-01 00:00:00‘)

python實例pyspark