python 結合redis 佇列 做一個例子
阿新 • • 發佈:2018-12-24
結合redis 佇列 做了一個例子
#!/usr/bin/env python # coding: utf-8 # @Time : 2018/12/21 0021 13:57 # @Site : # @File : demos.py # @Software: PyCharm import MySQLdb import redis import json import os, time import threading from multiprocessing import Pool, Process import os, time, random import sys reload(sys) sys.setdefaultencoding('utf8') class InsertData(): def __init__(self): # 去掉一些無用資訊 self.__list_industry = [] self.__has_many = [] self.__list_xczx = [] self.__list_cxcy = [] self.__list_industry_dict = {'test': self.__list_xczx } self.__dict_industry = {'test': 212} self.db = MySQLdb.connect(host="127.0.0.1", port=3306, user="root", passwd="123456", db="ww", charset='utf8') redisPool = redis.ConnectionPool(host='localhost', port=6379) self.re_queue = redis.Redis(connection_pool=redisPool) self.re_queue2 = redis.Redis(connection_pool=redisPool) def __get_dict_industry(self): industry_name = self.__list_industry_dict.keys() if len(industry_name) == 1: industry_name = str(tuple(industry_name)).replace(",","") elif len(industry_name) > 1: industry_name = str(tuple(industry_name)) else: return sql_industry = "select industry_name,industry_id from zzh_industry where industry_name in {}".format(industry_name) cursor3 = self.db.cursor() cursor3.execute(sql_industry) result_list = cursor3.fetchall() for result in result_list: self.__dict_industry[result[0]] = result[1] cursor3.close() def inser_industry(self): dta = """xx、xxx""" data = dta.split("、") for index, da in enumerate(data): industry_code = 100001 + index sqlStr = """insert into xx(industry_name,industry_pid,industry_code,industry_sort,is_lock) VALUES('{industry_name}',211,'{industry_code}',{industry_sort},1) ;""".format( industry_name=da, industry_code=industry_code, industry_sort=index + 1) print sqlStr def put_redis(self): cursor = self.db.cursor() item_sql = """SELECT item_title,item_id from xxx""" cursor.execute(item_sql) result_list = cursor.fetchall() num = 1 for result in result_list: data = {"itemTitle": result[0], "itemId": result[1]} self.re_queue.lpush("item", json.dumps(data)) num += 1 print ("put over", num) def get_redis(self): nums = 1 resultNum = 0 cursor_get = self.db.cursor() while True: result = self.re_queue.rpop("item") if not result: time.sleep(1) if resultNum == 10: break else: print "resultNum", resultNum resultNum += 1 continue try: resultNum = 0 result = json.loads(result) value_list = [] for strkey in self.__list_industry_dict.keys(): if strkey in self.__has_many: for __strkey in self.__list_industry_dict[strkey]: if __strkey in result["itemTitle"]: value_list.append(strkey) break if strkey in result["itemTitle"]: value_list.append(strkey) value_list = set(value_list) item_id = result["itemId"] if value_list: print result["itemTitle"] for value in value_list: nums += 1 # select_sql = "select id from zzh_industry_item where item_id={} and industry_id={} limit 1".format(item_id,self.__dict_industry[value]) # cursor_get.execute(select_sql) # if cursor_get.fetchone(): # print ("reseat",select_sql) # continue sql_insert = "insert into zzh_industry_item(item_id,industry_id)values ({item_id},{industry_id})".format( item_id=item_id, industry_id=self.__dict_industry[value]) self.re_queue2.lpush("sqls", str(sql_insert)) except Exception as e: print e cursor_get.close() print ("put over") def test(self): cursor2 = self.db.cursor() count = 0 breakNum = 0 num = 0 try: while True: sql = self.re_queue2.rpop("sqls") if sql: num += 1 breakNum = 0 print sql try: cursor2.execute(sql) if count == 500: self.db.commit() count = 0 else: count += 1 except Exception as e: print e if not sql: time.sleep(1) if breakNum == 10: break else: print "breakNum", breakNum breakNum += 1 finally: print ("insertSql", num) self.db.commit() self.db.close() if __name__ == '__main__': items = InsertData() print('Parent process %s.' % os.getpid()) t1 = threading.Thread(target=items.put_redis) t2 = threading.Thread(target=items.get_redis) t3 = threading.Thread(target=items.test) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()