1. 程式人生 > >根據地理位置和關鍵詞爬取twitter資料並生成詞雲

根據地理位置和關鍵詞爬取twitter資料並生成詞雲

根據地理位置和關鍵詞爬取twitter資料存入MongoDB並生成詞雲

轉載註明出處

  • tweepy獲取資料
  • 生成詞雲

tweepy獲取資料

1. 建立model model.py

class twitter_post(Document):
    _id = ObjectIdField(primary_key = True)
    screen_name = StringField(max_length = 128)
    text = StringField(required = True, max_length = 2048)
    text_id = IntField(required = True
) created_at = DateTimeField(required = True) in_reply_to_screen_name = StringField(max_length = 64) retweet_count = IntField() favorite_count = IntField() source = StringField(max_length = 1024) longitude = StringField(max_length = 32) latitude = StringField(max_length = 32
) location = StringField(max_length = 256) country_code = StringField(max_length = 64) lang = StringField(max_length = 4) time_zone = StringField(max_length = 64) province = StringField(max_length = 64) city = StringField(max_length = 64) district = StringField(max_length = 64
) street = StringField(max_length = 64) street_number = StringField(max_length = 64) meta = { 'ordering': ['created_at','screen_name'], 'collection': 'twitter_posts' }

2. 訪問百度地圖介面根據經緯度拿到省市街道資訊

import requests
def GetAddress(lon,lat):
    url = 'http://api.map.baidu.com/geocoder/v2/'
    header = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36'}
    payload = { 'output':'json', 'ak':'pAjezQsQBe8v1c1Lel87r4vprwXiGCEn' }
    payload['location'] = '{0:s},{1:s}'.format(str(lon),str(lat))
    print(lon,lat)
    content = requests.get(url,params=payload,headers=header).json()
    try:
        content = requests.get(url,params=payload,headers=header).json()
        content = content['result']['addressComponent']
        if content['street'] == None:#有一些地理位置街道資訊拿不到
            content['street'] = 'NULL'
        if content['street_number'] == None:
            content['street_number'] = 'NULL'
    except:
        content["province"]="NULL"
        content["city"]="NULL"
        content["district"]="NULL"
        content["street"]="NULL"
        content["street_number"]="NULL"
    return content
print(GetAddress(40.07571952, 116.60609467))

下面是三組經緯度拿到的地理位置資訊
三個經緯度拿到的資訊

3. 訪問tweepy開放的介面爬取資料

consumer_key = 'I1XowkiAc72fEp2CXPv0'
consumer_secret = 'drfnZHVUQrq1dyeqepCrbKyGWeYJCeTFQZpkLcXkgKFw3P'
access_key = '936432882482143235-jNLGPsCpZaSqR1D2WarSEshgQcyi'
access_secret = 'YF4ddleSgGxj8BsfmH2DELr7TsNNKAp08ZvqC'

# consumer_key = 'qEgHKHnL55g7k4U9xih'
# consumer_secret= 'QcUDHJS04wK5hrmlxV5C4gweiRPDca9JQoc4gp7ft'
# access_key= '863573499436122112-LA60oJLBzwVnhZjGOUPzRsJc'
# access_secret= '8CKFpp6qyxkAk1KfjWJPoHKloppPrvd7Tjiwllyk'

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth_handler=auth, parser=JSONParser(), proxy = '127.0.0.1:1080', wait_on_rate_limit=True)

conn = connect('ANXIETY', alias='default', host='localhost', port=27017, username='', password='')#連線本地mongoDB

由於tweepy只提供過去一週的資料,而且每執行一次api.search()介面只會最多返回100條資料,而tweepy官方生成最多可以連續請求450次左右,因此我們大概最多可以拿到4萬多資料,為了拿到儘可能多的資料,我們設定MAX_QUERIES進行多次查詢,,本程式只爬取北京,上海,港澳臺資料,爬取的主題在line.csv檔案中:

regions = ["beijing","shanghai","hongkong","macau","taiwan"]
for line in open("normal_user.csv"):
    #try:
        #u = api.get_user(line)
    #ms = myStream.filter(track=[line])
    #print(results[0])
    for r in regions:#對每一個地理位置
        places = api.geo_search(query=r)["result"]["places"][0]#首先獲取地理位置ID
        print(places)
        place_id = places["id"]
        tweet_id = []
        i = MAX_QUERIES
        MAX_ID = 10
        while i > 0:

            if MAX_ID == 10:
                #tweets = api.search(q="place:%s AND line" % place_id)#根據地理位置和關鍵詞同時過去爬取
                for tweet in api.search(q="place:%s" % place_id,count = 100, until=until)["statuses"]:#把截止日期放到七日後同時設定沒次爬取最多數目一百,保證資料量
                    #print(tweet)
                    #j.write(json.dumps(tweet)+'\n')
                    tweet_id.append(tweet["id"])
                    Obj_id = ObjectId()
                    tweet_item = twitter_post(#生成一條mongo中的資料
                        _id = Obj_id,
                        text_id = tweet["id"],
                        created_at = datetime.datetime.strptime(tweet["created_at"], GMT_FORMAT),
                        screen_name = tweet["user"]["screen_name"],
                        favorite_count = tweet["favorite_count"],
                        retweet_count = tweet["retweet_count"],
                        text = tweet["text"],
                        source = tweet["source"],
                        country_code = (tweet["place"]["country_code"] if tweet['place'] != None else 'NULL'),
                        location = tweet["user"]["location"],
                        latitude = str(tweet["coordinates"]["coordinates"][0] if tweet["coordinates"] != None else 'NULL'),#根據返回的json檔案拿到維度,注意返回時緯度在前,但是訪問百度介面時,經度在前
                        longitude = str(tweet["coordinates"]["coordinates"][1] if tweet["coordinates"] != None else 'NULL'),
                        time_zone = tweet["user"]["time_zone"],
                        lang = tweet["lang"],
                        province = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['province'] if tweet["coordinates"] != None else 'NULL'),
                        city = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['city'] if tweet["coordinates"] != None else 'NULL'),
                        district = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['district'] if tweet["coordinates"] != None else 'NULL'),
                        street = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street'] if tweet["coordinates"] != None else 'NULL'),
                        street_number = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street_number'] if tweet["coordinates"] != None else 'NULL')
                        )
                    try:
                        tweet_item.save()#存入資料庫
                    except:
                        continue
                MAX_ID = min(tweet_id)
                #print(MAX_ID)

            else:
                for tweet in api.search(q="place:%s",count = 100, max_id = MAX_ID-1)["statuses"]:
                    #print(tweet)
                    #j.write(json.dumps(tweet)+'\n')
                    tweet_id.append(tweet["id"])
                    Obj_id = ObjectId()
                    tweet_item = twitter_post(
                        _id = Obj_id,
                        text_id = tweet["id"],
                        created_at = datetime.datetime.strptime(tweet["created_at"], GMT_FORMAT),
                        screen_name = tweet["user"]["screen_name"],
                        favorite_count = tweet["favorite_count"],
                        retweet_count = tweet["retweet_count"],
                        text = tweet["text"],
                        source = tweet["source"],
                        country_code = (tweet["place"]["country_code"] if tweet['place'] != None else 'NULL'),
                        location = tweet["user"]["location"],
                        latitude = str(tweet["coordinates"]["coordinates"][0] if tweet["coordinates"] != None else 'NULL'),
                        longitude = str(tweet["coordinates"]["coordinates"][1] if tweet["coordinates"] != None else 'NULL'),
                        time_zone = tweet["user"]["time_zone"],
                        lang = tweet["lang"],
                        province = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['province'] if tweet["coordinates"] != None else 'NULL'),
                        city = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['city'] if tweet["coordinates"] != None else 'NULL'),
                        district = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['district'] if tweet["coordinates"] != None else 'NULL'),
                        street = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street'] if tweet["coordinates"] != None else 'NULL'),
                        street_number = (GetAddress(tweet["coordinates"]["coordinates"][1],tweet["coordinates"]["coordinates"][0])['street_number'] if tweet["coordinates"] != None else 'NULL')
                        )
                    try:
                        tweet_item.save()
                    except:
                        continue
                MAX_ID = min(tweet_id)
                #print(MAX_ID)
            i -= 1

得到的資料如下
這裡寫圖片描述
這裡寫圖片描述

生成詞雲 (輸入:地理位置和詞雲的個數,返回詞雲)

(難點,中英文文字同時處理)
思路:根據欄位lang將不同語言的文字分別處理生成詞雲,在進行排序合併

1. 訪問資料庫

class MongoConn():
    def __init__(self, db_name):
        try:
            url = '127.0.0.1:27017'
            self.client = pymongo.MongoClient(url, connect=True)
            self.db = self.client[db_name]
        except Exception as e:
            print ('連線mongo資料失敗!')
            traceback.print_exc()

    def destroy(self):
        self.client.close()

    def getDb(self):
        return self.db

    def __del__(self):
        self.client.close()

2. 英文文字預處理

def docs_preprocessor(docs):
    tokenizer = RegexpTokenizer(r'\w+')
    for idx in range(len(docs)):
        docs[idx] = docs[idx].lower()  # Convert to lowercase.
        docs[idx] = tokenizer.tokenize(docs[idx])  # Split into words.

    # Remove numbers, but not words that contain numbers.
    docs = [[token for token in doc if not token.isdigit()] for doc in docs]

    # Remove words that are only one character.
    docs = [[token for token in doc if token not in _STOP_WORDS] for doc in docs]

    # Lemmatize all words in documents.

    return docs

2. 中文文字預處理

def nltk_tokenize(self,text):
        tokens = []
        # pos_tokens = []
        # entities = []
        features = []
        stop_words = stop.load_stopwords()

        try:
            #分詞,去空格
            # tokens = text.split() #英語
            tokens_cut = jieba.cut(text)

            for word in tokens_cut:
                #如果不是停止詞並且長度大於1小於5而且不是數字,在特徵中加上這個單詞
                if word not in stop_words and len(word) > 1 and len(word) < 5 and not is_number(word):        
                    #features.append(word + "." + postag)
                    features.append(word)

            for word in tokens_cut:
                tokens.append(word)
            # print 'feature here ', features
        except: pass
        return features

2. 詞雲生成器

class cloudProducer():

    def __init__(self):

        self.mon = MongoConn('ANXIETY')
        self.db = self.mon.getDb()

    def getMainData(self, region_type, region):
        #取最近一週的資料
        en_docs = []
        ch_docs = []

        twitter_in_english = self.db.twitter_posts.find({region_type:region,"lang":"en"})
        twitter_in_chinese = self.db.twitter_posts.find({region_type:region,"lang":"zh"})

        for x in twitter_in_english:
            #print(x)
            en_docs.append(x["text"])
        print(len(en_docs))
        for x in twitter_in_chinese:
            ch_docs.append(x["text"])

        return [en_docs,ch_docs]

    def produce_en_Cloud(self,region_type,region,num):
        #main page
        docs = self.getMainData(region_type,region)[0]
        print(len(docs))
        words_dump = []

        docs = docs_preprocessor(docs)

        for text in docs:
            #print(text)
            #features = text
            #print(features)
            words_dump = words_dump + text
        cloud = collections.Counter(words_dump).most_common(num)
        print(cloud)
        #json.dump(cloud,open("wordCloud.json","w",encoding="utf-8"))

        return cloud

    def produce_ch_Cloud(self,region_type,region,num):
        #main page
        docs = self.getMainData(region_type,region)[1]
        print(len(docs))
        words_dump = []

        for text in docs:
            features = nltk_tokenize(text)
            #print(features)
            words_dump = words_dump + features
        cloud = collections.Counter(words_dump).most_common(num)#返回一個元組陣列
        print(cloud)
        #json.dump(cloud,open("wordCloud.json","w",encoding="utf-8"))

        return cloud

    def produce_cloud(self,region_type,region,num):
        en_cloud = self.produce_en_Cloud(region_type,region,15)
        ch_cloud = self.produce_ch_Cloud(region_type,region,15)
        cloud = en_cloud + ch_cloud
        cloud = sorted(cloud,key=lambda t: t[1],reverse=True)#中英文詞雲排序
        return cloud[0:15]

cp = cloudProducer()
cloud = cp.produce_cloud("province","北京市",15)
print(cloud)