1. 程式人生 > >程序池爬取並存入mongodb

程序池爬取並存入mongodb

設定程序池爬取拉鉤網:

# coding = utf-8
import json
import pymongo
import pandas as pd
import requests
from lxml import etree
import time
from multiprocessing import Pool


# 設定mongodb
client = pymongo.MongoClient('localhost')
db = client['lagou']
# 查詢的崗位名稱
POSITION_NAME = '資料探勘'
# 想要爬取的總頁面數
PAGE_SUM 
= 200 # 每頁返回的職位數量 PAGE_SIZE = 15 # 指定資料庫的名字 DATA_NAME = "DataMiningPosition" base_url = 'https://m.lagou.com/search.json?city=%E5%85%A8%E5%9B%BD&positionName={positionName}' \ '&pageNo={pageNo}&pageSize={pageSize}' def page_index(pageno): headers = { "Accept
": "application/json", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", # cookie能不要儘量不要,這裡正好不用cookie也可以正常返回資料 # "Cookie": "user_trace_token=20181119151914-03711263-38a2-4d81-bd81-5f480d930039; _ga=GA1.2.605262108.1542611954; _gid=GA1.2.249787972.1542611954; LGSID=20181119151916-6c3da9fa-ebcb-11e8-8958-5254005c3644; PRE_UTM=; PRE_HOST=www.baidu.com; PRE_SITE=https%3A%2F%2Fwww.baidu.com%2Flink%3Furl%3DOnHWjpEfiW4_pVm7hX8NYOFm0iJ7bz1ZJJlaKPPnmMzLE-6ypKNo0f19ABO5bjW4%26wd%3D%26eqid%3D8f61629100016e18000000065bf263e7; PRE_LAND=https%3A%2F%2Fwww.lagou.com%2Fgongsi%2F147.html; LGUID=20181119151916-6c3dabf3-ebcb-11e8-8958-5254005c3644; index_location_city=%E5%85%A8%E5%9B%BD; JSESSIONID=ABAAABAAAGCABCC2D851CA25D1CFCD2B28DCDD6E00A2C7E; _ga=GA1.3.605262108.1542611954; X_HTTP_TOKEN=a0cc1a4beb8a41f57f144bc0bfd77bd7; sajssdk_2015_cross_new_user=1; sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%221672adb3834203-08b3706084b44a-3961430f-1327104-1672adb3835428%22%2C%22%24device_id%22%3A%221672adb3834203-08b3706084b44a-3961430f-1327104-1672adb3835428%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E7%9B%B4%E6%8E%A5%E6%B5%81%E9%87%8F%22%2C%22%24latest_referrer%22%3A%22%22%2C%22%24latest_referrer_host%22%3A%22%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC_%E7%9B%B4%E6%8E%A5%E6%89%93%E5%BC%80%22%7D%7D; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1542611954,1542612053,1542612277,1542612493; _gat=1; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1542613115; LGRID=20181119153837-20bafb1a-ebce-11e8-8958-5254005c3644
", "Host": "m.lagou.com", "Proxy-Connection": "keep-alive", "Referer": "http://m.lagou.com/search.html", "X-Requested-With": "XMLHttpRequest", 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/45.0.2454.85 Safari/537.36 115Browser/6.0.3', } url = base_url.format(positionName=POSITION_NAME, pageNo=pageno, pageSize=PAGE_SIZE) response = requests.get(url, headers=headers) html = response.text content = json.loads(html) print(content) if content.get("content"): return content else: time.sleep(30) return page_index(pageno) def parse_page_index(content): for i in range(15): try: item = content['content']['data']['page']['result'][i] #print(item) yield { 'positionId': item.get('positionId'), 'positionName': item.get('positionName'), 'city': item.get('city'), 'createTime': item.get('createTime'), 'salary': item.get('salary'), 'companyId': item.get('companyId'), 'companyFullName': item.get('companyFullName') } except IndexError as e: print('可能沒有那麼多欄位', e) def save_to_mongo(data): if db[DATA_NAME].update({'positionId': data['positionId']}, {'$set': data}, True): print('Saved to Mongo', data['positionId']) else: print('Saved to Mongo Failed', data['positionId']) def parse_detail(url): # url = "http://m.lagou.com/jobs/4593934.html" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36", "Accept": "text / html, application / xhtml + xml, application / xml;q = 0.9, image / webp, image / apng, * / *;q = 0.8", "Accept - Encoding": "gzip, deflate", "Accept - Language": "zh - CN, zh;q = 0.9", "Cache - Control": "max - age = 0", "Connection": "eep - alive", # "Cookie": "_ga=GA1.2.474762156.1528795210; _gid=GA1.2.574638607.1528795210; user_trace_token=20180612172010-cdf76dc1-6e21-11e8-9af0-525400f775ce; LGUID=20180612172010-cdf772c0-6e21-11e8-9af0-525400f775ce; Hm_lvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1528795210,1528795215,1528795223; index_location_city=%E5%85%A8%E5%9B%BD; X_HTTP_TOKEN=f3ed266ddeee802fb7d402e4f6d4f4a3; JSESSIONID=ABAAABAAAFDABFG9F9C52FA9D8CAE24F139A0131C45E918; _ga=GA1.3.474762156.1528795210; _gat=1; LGSID=20180612184248-597a7795-6e2d-11e8-9479-5254005c3644; PRE_UTM=; PRE_HOST=; PRE_SITE=http%3A%2F%2Fm.lagou.com%2Fsearch.html; PRE_LAND=http%3A%2F%2Fm.lagou.com%2Fjobs%2F4079910.html; LGRID=20180612184505-ab051d02-6e2d-11e8-9479-5254005c3644; Hm_lpvt_4233e74dff0ae5bd0a3d81c6ccf756e6=1528800306" } try: response = requests.get(url, headers=headers) if response.status_code == 200: print("請求成功") text = response.content.decode() # print(text) html = etree.HTML(text) workyear = html.xpath('//span[@class="item workyear"]/span/text()') if workyear: workyear = workyear[0] else: time.sleep(5) parse_detail(url) positiondesc = html.xpath('//div[@class="positiondesc"]//p/text()') #print(workyear, positiondesc) return workyear, positiondesc except Exception as e: print(e) # 將爬取的資料存到Mongodb def to_mongo(page_sum): # 拉勾網頂多只能顯示到334頁 for page in range(page_sum): html = page_index(page) items = parse_page_index(html) # print(items) for item in items: print(item) save_to_mongo(item) # 運用程序池將爬取的資料存到Mongodb def to_mongo_pool(page): # 拉勾網頂多只能顯示到334頁 content = page_index(page) items = parse_page_index(content) # print(items) for item in items: print(item) save_to_mongo(item) # 解析爬取的字條,以便把資料轉為DataFrame格式 def parse_items(page_sum): for page in range(page_sum): html = page_index(page) items = parse_page_index(html) for item in items: positionId = item["positionId"] detail_url = "http://m.lagou.com/jobs/{}.html".format(positionId) workyear, positiondesc = parse_detail(detail_url) print(positionId,positiondesc) yield [ item["positionId"], item["positionName"], item["city"], item["createTime"], item["salary"], item["companyId"], item["companyFullName"], workyear, positiondesc ] # 把資料儲存為csv格式 def to_csv(page_sum): item_lists = [] # print(parse_items()) for item in parse_items(page_sum): item_lists.append(item) #print(item_lists) data = pd.DataFrame(item_lists, columns=["positionId", "positionName", "city", "createTime", "salary", "companyId", "companyFullName", "workyear", "positiondesc"]) data.to_csv("python_positon.csv") if __name__ == '__main__': #to_csv #to_mongo(200) # 建議儲存到mongodb資料庫中 start_time = time.time() pool = Pool() # pool()引數:程序個數:預設的是電腦cpu的核的個數,如果要指定程序個數,這個程序個數要小於等於cpu的核數 # 第一個引數是一個函式體,不需要加括號,也不需指定引數。。 # 第二個引數是一個列表,列表中的每個引數都會傳給那個函式體 pool.map(to_mongo_pool,[i for i in range(PAGE_SUM)]) # close它只是把程序池關閉 pool.close() # join起到一個阻塞的作用,主程序要等待子程序執行完,才能接著往下執行 pool.join() end_time = time.time() print("總耗費時間%.2f秒" % (end_time - start_time))