根據地理位置和關鍵詞爬取twitter資料並生成詞雲
阿新 • • 發佈:2019-01-02
根據地理位置和關鍵詞爬取twitter資料存入MongoDB並生成詞雲
轉載註明出處
- tweepy獲取資料
- 生成詞雲
tweepy獲取資料
1. 建立model model.py
class twitter_post(Document):
_id = ObjectIdField(primary_key = True)
screen_name = StringField(max_length = 128)
text = StringField(required = True, max_length = 2048)
text_id = IntField(required = True )
created_at = DateTimeField(required = True)
in_reply_to_screen_name = StringField(max_length = 64)
retweet_count = IntField()
favorite_count = IntField()
source = StringField(max_length = 1024)
longitude = StringField(max_length = 32)
latitude = StringField(max_length = 32 )
location = StringField(max_length = 256)
country_code = StringField(max_length = 64)
lang = StringField(max_length = 4)
time_zone = StringField(max_length = 64)
province = StringField(max_length = 64)
city = StringField(max_length = 64)
district = StringField(max_length = 64 )
street = StringField(max_length = 64)
street_number = StringField(max_length = 64)
meta = {
'ordering': ['created_at','screen_name'],
'collection': 'twitter_posts'
}
2. 訪問百度地圖介面根據經緯度拿到省市街道資訊
import requests
def GetAddress(lon,lat):
url = 'http://api.map.baidu.com/geocoder/v2/'
header = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36'}
payload = { 'output':'json', 'ak':'pAjezQsQBe8v1c1Lel87r4vprwXiGCEn' }
payload['location'] = '{0:s},{1:s}'.format(str(lon),str(lat))
print(lon,lat)
content = requests.get(url,params=payload,headers=header).json()
try:
content = requests.get(url,params=payload,headers=header).json()
content = content['result']['addressComponent']
if content['street'] == None:#有一些地理位置街道資訊拿不到
content['street'] = 'NULL'
if content['street_number'] == None:
content['street_number'] = 'NULL'
except:
content["province"]="NULL"
content["city"]="NULL"
content["district"]="NULL"
content["street"]="NULL"
content["street_number"]="NULL"
return content
print(GetAddress(40.07571952, 116.60609467))
下面是三組經緯度拿到的地理位置資訊
3. 訪問tweepy開放的介面爬取資料
consumer_key = 'I1XowkiAc72fEp2CXPv0'
consumer_secret = 'drfnZHVUQrq1dyeqepCrbKyGWeYJCeTFQZpkLcXkgKFw3P'
access_key = '936432882482143235-jNLGPsCpZaSqR1D2WarSEshgQcyi'
access_secret = 'YF4ddleSgGxj8BsfmH2DELr7TsNNKAp08ZvqC'
# consumer_key = 'qEgHKHnL55g7k4U9xih'
# consumer_secret= 'QcUDHJS04wK5hrmlxV5C4gweiRPDca9JQoc4gp7ft'
# access_key= '863573499436122112-LA60oJLBzwVnhZjGOUPzRsJc'
# access_secret= '8CKFpp6qyxkAk1KfjWJPoHKloppPrvd7Tjiwllyk'
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth_handler=auth, parser=JSONParser(), proxy = '127.0.0.1:1080', wait_on_rate_limit=True)
conn = connect('ANXIETY', alias='default', host='localhost', port=27017, username='', password='')#連線本地mongoDB
由於tweepy只提供過去一週的資料,而且每執行一次api.search()介面只會最多返回100條資料,而tweepy官方生成最多可以連續請求450次左右,因此我們大概最多可以拿到4萬多資料,為了拿到儘可能多的資料,我們設定MAX_QUERIES進行多次查詢,,本程式只爬取北京,上海,港澳臺資料,爬取的主題在line.csv檔案中:
regions = ["beijing","shanghai","hongkong","macau","taiwan"]
for line in open("normal_user.csv"):
#try:
#u = api.get_user(line)
#ms = myStream.filter(track=[line])
#print(results[0])
for r in regions:#對每一個地理位置
places = api.geo_search(query=r)["result"]["places"][0]#首先獲取地理位置ID
print(places)
place_id = places["id"]
tweet_id = []
i = MAX_QUERIES
MAX_ID = 10
while i > 0:
if MAX_ID == 10:
#tweets = api.search(q="place:%s AND line" % place_id)#根據地理位置和關鍵詞同時過去爬取
for tweet in api.search(q="place:%s" % place_id,count = 100, until=until)["statuses"]:#把截止日期放到七日後同時設定沒次爬取最多數目一百,保證資料量
#print(tweet)
#j.write(json.dumps(tweet)+'\n')
tweet_id.append(tweet["id"])
Obj_id = ObjectId()
tweet_item = twitter_post(#生成一條mongo中的資料
_id = Obj_id,
text_id = tweet["id"],
created_at = datetime.datetime.strptime(tweet["created_at"], GMT_FORMAT),
screen_name = tweet["user"]["screen_name"],
favorite_count = tweet["favorite_count"],
retweet_count = tweet["retweet_count"],
text = tweet["text"],
source = tweet["source"],
country_code = (tweet["place"]["country_code"] if tweet['place'] != None else 'NULL'),
location = tweet["user"]["location"],
latitude = str(tweet["coordinates"]["coordinates"][0] if tweet["coordinates"] != None else 'NULL'),#根據返回的json檔案拿到維度,注意返回時緯度在前,但是訪問百度介面時,經度在前
longitude = str(tweet["coordinates"]["coordinates"][1] if tweet["coordinates"] != None else 'NULL'),
time_zone = tweet["user"]["time_zone"],
lang = tweet["lang"],
province = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['province'] if tweet["coordinates"] != None else 'NULL'),
city = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['city'] if tweet["coordinates"] != None else 'NULL'),
district = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['district'] if tweet["coordinates"] != None else 'NULL'),
street = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street'] if tweet["coordinates"] != None else 'NULL'),
street_number = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street_number'] if tweet["coordinates"] != None else 'NULL')
)
try:
tweet_item.save()#存入資料庫
except:
continue
MAX_ID = min(tweet_id)
#print(MAX_ID)
else:
for tweet in api.search(q="place:%s",count = 100, max_id = MAX_ID-1)["statuses"]:
#print(tweet)
#j.write(json.dumps(tweet)+'\n')
tweet_id.append(tweet["id"])
Obj_id = ObjectId()
tweet_item = twitter_post(
_id = Obj_id,
text_id = tweet["id"],
created_at = datetime.datetime.strptime(tweet["created_at"], GMT_FORMAT),
screen_name = tweet["user"]["screen_name"],
favorite_count = tweet["favorite_count"],
retweet_count = tweet["retweet_count"],
text = tweet["text"],
source = tweet["source"],
country_code = (tweet["place"]["country_code"] if tweet['place'] != None else 'NULL'),
location = tweet["user"]["location"],
latitude = str(tweet["coordinates"]["coordinates"][0] if tweet["coordinates"] != None else 'NULL'),
longitude = str(tweet["coordinates"]["coordinates"][1] if tweet["coordinates"] != None else 'NULL'),
time_zone = tweet["user"]["time_zone"],
lang = tweet["lang"],
province = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['province'] if tweet["coordinates"] != None else 'NULL'),
city = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['city'] if tweet["coordinates"] != None else 'NULL'),
district = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['district'] if tweet["coordinates"] != None else 'NULL'),
street = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street'] if tweet["coordinates"] != None else 'NULL'),
street_number = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street_number'] if tweet["coordinates"] != None else 'NULL')
)
try:
tweet_item.save()
except:
continue
MAX_ID = min(tweet_id)
#print(MAX_ID)
i -= 1
得到的資料如下
生成詞雲 (輸入:地理位置和詞雲的個數,返回詞雲)
(難點,中英文文字同時處理)
思路:根據欄位lang將不同語言的文字分別處理生成詞雲,在進行排序合併
1. 訪問資料庫
class MongoConn():
def __init__(self, db_name):
try:
url = '127.0.0.1:27017'
self.client = pymongo.MongoClient(url, connect=True)
self.db = self.client[db_name]
except Exception as e:
print ('連線mongo資料失敗!')
traceback.print_exc()
def destroy(self):
self.client.close()
def getDb(self):
return self.db
def __del__(self):
self.client.close()
2. 英文文字預處理
def docs_preprocessor(docs):
tokenizer = RegexpTokenizer(r'\w+')
for idx in range(len(docs)):
docs[idx] = docs[idx].lower() # Convert to lowercase.
docs[idx] = tokenizer.tokenize(docs[idx]) # Split into words.
# Remove numbers, but not words that contain numbers.
docs = [[token for token in doc if not token.isdigit()] for doc in docs]
# Remove words that are only one character.
docs = [[token for token in doc if token not in _STOP_WORDS] for doc in docs]
# Lemmatize all words in documents.
return docs
2. 中文文字預處理
def nltk_tokenize(self,text):
tokens = []
# pos_tokens = []
# entities = []
features = []
stop_words = stop.load_stopwords()
try:
#分詞,去空格
# tokens = text.split() #英語
tokens_cut = jieba.cut(text)
for word in tokens_cut:
#如果不是停止詞並且長度大於1小於5而且不是數字,在特徵中加上這個單詞
if word not in stop_words and len(word) > 1 and len(word) < 5 and not is_number(word):
#features.append(word + "." + postag)
features.append(word)
for word in tokens_cut:
tokens.append(word)
# print 'feature here ', features
except: pass
return features
2. 詞雲生成器
class cloudProducer():
def __init__(self):
self.mon = MongoConn('ANXIETY')
self.db = self.mon.getDb()
def getMainData(self, region_type, region):
#取最近一週的資料
en_docs = []
ch_docs = []
twitter_in_english = self.db.twitter_posts.find({region_type:region,"lang":"en"})
twitter_in_chinese = self.db.twitter_posts.find({region_type:region,"lang":"zh"})
for x in twitter_in_english:
#print(x)
en_docs.append(x["text"])
print(len(en_docs))
for x in twitter_in_chinese:
ch_docs.append(x["text"])
return [en_docs,ch_docs]
def produce_en_Cloud(self,region_type,region,num):
#main page
docs = self.getMainData(region_type,region)[0]
print(len(docs))
words_dump = []
docs = docs_preprocessor(docs)
for text in docs:
#print(text)
#features = text
#print(features)
words_dump = words_dump + text
cloud = collections.Counter(words_dump).most_common(num)
print(cloud)
#json.dump(cloud,open("wordCloud.json","w",encoding="utf-8"))
return cloud
def produce_ch_Cloud(self,region_type,region,num):
#main page
docs = self.getMainData(region_type,region)[1]
print(len(docs))
words_dump = []
for text in docs:
features = nltk_tokenize(text)
#print(features)
words_dump = words_dump + features
cloud = collections.Counter(words_dump).most_common(num)#返回一個元組陣列
print(cloud)
#json.dump(cloud,open("wordCloud.json","w",encoding="utf-8"))
return cloud
def produce_cloud(self,region_type,region,num):
en_cloud = self.produce_en_Cloud(region_type,region,15)
ch_cloud = self.produce_ch_Cloud(region_type,region,15)
cloud = en_cloud + ch_cloud
cloud = sorted(cloud,key=lambda t: t[1],reverse=True)#中英文詞雲排序
return cloud[0:15]
cp = cloudProducer()
cloud = cp.produce_cloud("province","北京市",15)
print(cloud)