1. 程式人生 > 實用技巧 >python協程爬取某網站的老賴資料

python協程爬取某網站的老賴資料

import re
import json
import aiohttp
import asyncio
import time
import pymysql
from asyncio.locks import Semaphore
from functools import partial


headers = {
    'Cookie': 'auth_token=your_token_here',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.135 Safari/537.36
' } def save_data(cursor, addr_dic, obj): try: data = obj.result()['data'] name = data['iname'] addr = addr_dic.get(name, '') idcard = data['cardnum'] assert re.match('\d{10}[\d*]{4}\d{3}[\dxX]', idcard) birth = idcard[6:10] assert birth.isdigit() birth
+= '' sex = data.get('sex') if not sex: n = int(idcard[-2]) sex = '' if (n % 2) == 1 else '' tm = time.localtime(data.get('regdate', 0) / 1000) createtime = f'{tm.tm_year}-{tm.tm_mon}-{tm.tm_mday}' cursor.execute("insert into tianyancha(name, birth, sex, idcard, court, createtime, caseno, base, duty, status, detail, addr) values('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')
" % ( name, birth, sex, idcard, data['courtname'], createtime, data['casecode'], data['gistunit'], data['duty'], data['performance'], data['disrupttypename'], addr )) except Exception as e: print('插入錯誤', e.args) async def parse_case_data(sem, session, cid): # 爬取詳情記錄 async with sem: # 控制併發量 async with session.get(f"https://shixin.tianyancha.com/shixin/getDishonestinfoDetailWeb.json?bussinessId={cid}") as rsp: return await rsp.json() async def parse_province(sem, session, cursor, url): page = 1 while True: # 翻頁爬取 page_url = f'{url}/p{page}' async with session.get(page_url) as rsp: try: txt = await rsp.text() # 解析出人名對應的地址 addr_dic = {} pps = [i.strip() for i in re.findall('dishonest_base_info_detail">(.*?)</', txt, re.S)] for itm in pps: try: name, _, _, addr = itm.split('') assert addr.endswith('人。') addr = addr.rstrip('人。') addr_dic[name] = addr except: pass # 解析出每條失信記錄的id cid_lis = re.findall('data-id="([\da-z]{32})"', txt) tasks = [] for cid in cid_lis: # 開啟協程爬取解析每條記錄 task = asyncio.create_task(parse_case_data(sem, session, cid)) # 回撥存入mysql task.add_done_callback(partial(save_data, cursor, addr_dic)) tasks.append(task) await asyncio.wait(tasks) print(f'第{page}頁爬取完成') if 'tic-icon-arrow-right' not in txt: break page += 1 except: print(f'爬取到第{page}頁失敗') break async def main(): province = "廣東" url_data = json.load(open('url.json', 'r', encoding='utf-8')) # url.json: 儲存省份對應的url的json檔案 url_lis = [url_data.get(province)] # 此處是支援全國所有省份一起爬取的,但是我只爬取廣東的 sem = Semaphore(4) conn = pymysql.connect(host='localhost', port=3306, user='user', password='password', charset='utf8', database='db', autocommit=True) cursor = conn.cursor() async with aiohttp.ClientSession(headers=headers) as session: for url in url_lis: await parse_province(sem, session, cursor, url) cursor.close() conn.close() if __name__ == '__main__': asyncio.run(main())