1. 程式人生 > 其它 >Redis實戰5 - 構建簡單的社交網站

Redis實戰5 - 構建簡單的社交網站

構建簡單的社交網站

使用者和狀態

使用者資訊

使用hash儲存

def create_user(conn, login, name):
    llogin = login.lower()
    # 鎖住小寫使用者名稱,防止多使用者同時申請一個名字
    lock = acquire_lock_with_timeout(conn, 'user:' + llogin, 1) #A
    if not lock:                            #B
        return None                         #B
    # users: 雜湊結構使用者儲存使用者名稱和使用者ID間對映,已存在則不能分配
    if conn.hget('users:', llogin):         #C
        release_lock(conn, 'user:' + llogin, lock)  #C
        return None                         #C
    # 通過計數器生成獨一無二ID
    id = conn.incr('user:id:')              #D
    pipeline = conn.pipeline(True)
    pipeline.hset('users:', llogin, id)     # 將小寫使用者名稱對映到ID
    pipeline.hmset('user:%s'%id, {          #F
        'login': login,                     #F
        'id': id,                           #F
        'name': name,                       #F
        'followers': 0,                     #F
        'following': 0,                     #F
        'posts': 0,                         #F
        'signup': time.time(),              #F
    })
    pipeline.execute()
    release_lock(conn, 'user:' + llogin, lock)  # 釋放之前的鎖
    return id                               #H

狀態訊息

將使用者所說的話記錄到狀態訊息裡面,也用hash。

def create_status(conn, uid, message, **data):
    pipeline = conn.pipeline(True)
    pipeline.hget('user:%s'%uid, 'login')   # 根據使用者ID獲取使用者名稱
    pipeline.incr('status:id:')             # 為訊息建立唯一id
    login, id = pipeline.execute()

    if not login:                           # 釋出訊息前驗證賬號是否存在
        return None                         #C

    data.update({
        'message': message,                 #D
        'posted': time.time(),              #D
        'id': id,                           #D
        'uid': uid,                         #D
        'login': login,                     #D
    })
    pipeline.hmset('status:%s'%id, data)    #D
    pipeline.hincrby('user:%s'%uid, 'posts')# 更新使用者已傳送狀態訊息數量
    pipeline.execute()
    return id                               # 返回新建立的狀態訊息ID

主頁時間線

使用者以及使用者正在關注的人所釋出的狀態訊息組成。使用zset,成員為訊息ID,分值為釋出時間戳。

# 三個可選引數:哪條時間線、獲取多少頁、每頁多少條狀態資訊
def get_status_messages(conn, uid, timeline='home:', page=1, count=30):#A
    # 獲取時間線上最小狀態訊息ID
    statuses = conn.zrevrange(                                  #B
        '%s%s'%(timeline, uid), (page-1)*count, page*count-1)   #B

    pipeline = conn.pipeline(True)
    # 獲取狀態訊息本身
    for id in statuses:                                         #C
        pipeline.hgetall('status:%s'%id)                        #C
    # 使用過濾器移除那些被刪除了的狀態訊息
    return filter(None, pipeline.execute())                     #D

另一個重要時間線是個人時間線,區別是隻展示自己釋出的,只要將timeline引數設定為profile:

關注者列表和正在關注列表

用zset,成員為yonghuID,分值為關注的時間戳。

關注和取消關注時,修改關注hash、被關注hash、使用者資訊hash中關注數被關注數、個人時間線。

HOME_TIMELINE_SIZE = 1000
def follow_user(conn, uid, other_uid):  # uid 關注 other_uid
    fkey1 = 'following:%s'%uid          #A
    fkey2 = 'followers:%s'%other_uid    #A

    if conn.zscore(fkey1, other_uid):   #B
        return None                     #B

    now = time.time()

    pipeline = conn.pipeline(True)
    pipeline.zadd(fkey1, other_uid, now)    #C
    pipeline.zadd(fkey2, uid, now)          #C
    pipeline.zrevrange('profile:%s'%other_uid, 0, HOME_TIMELINE_SIZE-1, withscores=True)   # 從被關注者的個人時間線裡去最新狀態訊息
    following, followers, status_and_score = pipeline.execute()[-3:]
    # 更新他們資訊hash
    pipeline.hincrby('user:%s'%uid, 'following', int(following))        #F
    pipeline.hincrby('user:%s'%other_uid, 'followers', int(followers))  #F
    # 更新使用者主頁時間線
    if status_and_score:
        pipeline.zadd('home:%s'%uid, **dict(status_and_score))  #G
    pipeline.zremrangebyrank('home:%s'%uid, 0, -HOME_TIMELINE_SIZE-1)#G

    pipeline.execute()
    return True                         #H



def unfollow_user(conn, uid, other_uid):
    fkey1 = 'following:%s'%uid          #A
    fkey2 = 'followers:%s'%other_uid    #A

    if not conn.zscore(fkey1, other_uid):   #B
        return None                         #B
    # 從正在關注和被關注中移除雙方ID
    pipeline = conn.pipeline(True)
    pipeline.zrem(fkey1, other_uid)                 #C
    pipeline.zrem(fkey2, uid)                       #C
    # 獲取被取消關注使用者最近釋出狀態訊息
    pipeline.zrevrange('profile:%s'%other_uid,      #E
        0, HOME_TIMELINE_SIZE-1)                    #E
    following, followers, statuses = pipeline.execute()[-3:]

    pipeline.hincrby('user:%s'%uid, 'following', -int(following))        #F
    pipeline.hincrby('user:%s'%other_uid, 'followers', -int(followers))  #F
    if statuses:
        # 移除使用者主頁時間線中相應訊息
        pipeline.zrem('home:%s'%uid, *statuses)                 #G

    pipeline.execute()
    return True                         #H


# 使用者取消關注後,主頁時間線上訊息減少,此函式將其填滿
def refill_timeline(conn, incoming, timeline, start=0):
    if not start and conn.zcard(timeline) >= 750:               #A
        return                                                  #A

    users = conn.zrangebyscore(incoming, start, 'inf',          #B
        start=0, num=REFILL_USERS_STEP, withscores=True)        #B

    pipeline = conn.pipeline(False)
    for uid, start in users:
        pipeline.zrevrange('profile:%s'%uid,                    #C
            0, HOME_TIMELINE_SIZE-1, withscores=True)           #C

    messages = []
    for results in pipeline.execute():
        messages.extend(results)                            #D

    messages.sort(key=lambda x:-x[1])                       #E
    del messages[HOME_TIMELINE_SIZE:]                       #E

    pipeline = conn.pipeline(True)
    if messages:
        pipeline.zadd(timeline, **dict(messages))           #F
    pipeline.zremrangebyrank(                               #G
        timeline, 0, -HOME_TIMELINE_SIZE-1)                 #G
    pipeline.execute()

    if len(users) >= REFILL_USERS_STEP:
        execute_later(conn, 'default', 'refill_timeline',       #H
            [conn, incoming, timeline, start])                  #H

狀態訊息的釋出與刪除

訊息釋出後要更新要新增到使用者個人時間線和關注者主頁時間線。當關注者非常多時,更新所有關注者主頁會很慢。為了讓釋出操作可以儘快地返回,程式需要做兩件事情。首先,在釋出狀態訊息的時候,程式會將狀態訊息的ID新增到前1000個關注者的主頁時間線裡面。根據Twitter的一項統計表明,關注者數量在1000人以上的使用者只有10萬~25萬,而這10萬~25萬用戶只佔了活躍使用者數量的0.1%,這意味著99.9%的訊息釋出人在這一階段就可以完成自己的釋出操作,而剩下的0.1%則需要接著執行下一個步驟。其次,對於那些關注者數量超過1000人的使用者來說,程式會使用類似於6.4節中介紹的系統來開始一項延遲任務。程式碼清單8-6展示了程式是如何將狀態更新推送給各個關注者的。

def post_status(conn, uid, message, **data):
    id = create_status(conn, uid, message, **data)  # 建立新的狀態訊息
    if not id:              #B
        return None         #B

    posted = conn.hget('status:%s'%id, 'posted')    # 獲取釋出時間
    if not posted:                                  #D
        return None                                 #D

    post = {str(id): float(posted)}
    conn.zadd('profile:%s'%uid, **post)             # 新增到個人時間線

    syndicate_status(conn, uid, post)       # 推送給關注者
    return id


POSTS_PER_PASS = 1000           # 每次最多發給1000給關注者
def syndicate_status(conn, uid, post, start=0):
    # 獲取上傳被更新的最後一個關注者為啟動,獲取接下來1000個關注者
    followers = conn.zrangebyscore('followers:%s'%uid, start, 'inf',
        start=0, num=POSTS_PER_PASS, withscores=True)   #B

    pipeline = conn.pipeline(False)
    for follower, start in followers:                    # 遍歷時更新start,可用於下一次syndicate_status()呼叫
        # 修改主頁時間線
        pipeline.zadd('home:%s'%follower, **post)        #C
        pipeline.zremrangebyrank(                        #C
            'home:%s'%follower, 0, -HOME_TIMELINE_SIZE-1)#C
    pipeline.execute()
    # 超過1000人在延遲任務中繼續操作
    if len(followers) >= POSTS_PER_PASS:                    #D
        execute_later(conn, 'default', 'syndicate_status',  #D
            [conn, uid, post, start])                       #D
        
        
def delete_status(conn, uid, status_id):
    key = 'status:%s'%status_id
    lock = acquire_lock_with_timeout(conn, key, 1)  # 防止多個程式同時刪除
    if not lock:                #B
        return None             #B
    # 許可權檢測
    if conn.hget(key, 'uid') != str(uid):   #C
        release_lock(conn, key, lock)       #C
        return None                         #C

    pipeline = conn.pipeline(True)
    pipeline.delete(key)                            # 刪除指定狀態訊息
    pipeline.zrem('profile:%s'%uid, status_id)      # 從使用者個人時間線中刪除
    pipeline.zrem('home:%s'%uid, status_id)         # 從使用者主頁時間線中刪除訊息id
    pipeline.hincrby('user:%s'%uid, 'posts', -1)    # 減少已釋出訊息數量
    pipeline.execute()

    release_lock(conn, key, lock)
    return True


# 清理在關注者的主頁時間線中被刪除的訊息ID
def clean_timelines(conn, uid, status_id, start=0, on_lists=False):
    key = 'followers:%s'%uid            #A
    base = 'home:%s'                    #A
    if on_lists:                        #A
        key = 'list:out:%s'%uid         #A
        base = 'list:statuses:%s'       #A
    followers = conn.zrangebyscore(key, start, 'inf',   #B
        start=0, num=POSTS_PER_PASS, withscores=True)   #B

    pipeline = conn.pipeline(False)
    for follower, start in followers:                    #C
        pipeline.zrem(base%follower, status_id)          #C
    pipeline.execute()

    if len(followers) >= POSTS_PER_PASS:                    #D
        execute_later(conn, 'default', 'clean_timelines' ,  #D
            [conn, uid, status_id, start, on_lists])        #D

    elif not on_lists:
        execute_later(conn, 'default', 'clean_timelines',   #E
            [conn, uid, status_id, 0, True])                #E