Redis實戰5 - 構建簡單的社交網站
阿新 • • 發佈:2022-11-29
構建簡單的社交網站
使用者和狀態
使用者資訊
使用hash儲存
def create_user(conn, login, name): llogin = login.lower() # 鎖住小寫使用者名稱,防止多使用者同時申請一個名字 lock = acquire_lock_with_timeout(conn, 'user:' + llogin, 1) #A if not lock: #B return None #B # users: 雜湊結構使用者儲存使用者名稱和使用者ID間對映,已存在則不能分配 if conn.hget('users:', llogin): #C release_lock(conn, 'user:' + llogin, lock) #C return None #C # 通過計數器生成獨一無二ID id = conn.incr('user:id:') #D pipeline = conn.pipeline(True) pipeline.hset('users:', llogin, id) # 將小寫使用者名稱對映到ID pipeline.hmset('user:%s'%id, { #F 'login': login, #F 'id': id, #F 'name': name, #F 'followers': 0, #F 'following': 0, #F 'posts': 0, #F 'signup': time.time(), #F }) pipeline.execute() release_lock(conn, 'user:' + llogin, lock) # 釋放之前的鎖 return id #H
狀態訊息
將使用者所說的話記錄到狀態訊息裡面,也用hash。
def create_status(conn, uid, message, **data): pipeline = conn.pipeline(True) pipeline.hget('user:%s'%uid, 'login') # 根據使用者ID獲取使用者名稱 pipeline.incr('status:id:') # 為訊息建立唯一id login, id = pipeline.execute() if not login: # 釋出訊息前驗證賬號是否存在 return None #C data.update({ 'message': message, #D 'posted': time.time(), #D 'id': id, #D 'uid': uid, #D 'login': login, #D }) pipeline.hmset('status:%s'%id, data) #D pipeline.hincrby('user:%s'%uid, 'posts')# 更新使用者已傳送狀態訊息數量 pipeline.execute() return id # 返回新建立的狀態訊息ID
主頁時間線
使用者以及使用者正在關注的人所釋出的狀態訊息組成。使用zset,成員為訊息ID,分值為釋出時間戳。
# 三個可選引數:哪條時間線、獲取多少頁、每頁多少條狀態資訊 def get_status_messages(conn, uid, timeline='home:', page=1, count=30):#A # 獲取時間線上最小狀態訊息ID statuses = conn.zrevrange( #B '%s%s'%(timeline, uid), (page-1)*count, page*count-1) #B pipeline = conn.pipeline(True) # 獲取狀態訊息本身 for id in statuses: #C pipeline.hgetall('status:%s'%id) #C # 使用過濾器移除那些被刪除了的狀態訊息 return filter(None, pipeline.execute()) #D
另一個重要時間線是個人時間線,區別是隻展示自己釋出的,只要將timeline引數設定為profile:
關注者列表和正在關注列表
用zset,成員為yonghuID,分值為關注的時間戳。
關注和取消關注時,修改關注hash、被關注hash、使用者資訊hash中關注數被關注數、個人時間線。
HOME_TIMELINE_SIZE = 1000
def follow_user(conn, uid, other_uid): # uid 關注 other_uid
fkey1 = 'following:%s'%uid #A
fkey2 = 'followers:%s'%other_uid #A
if conn.zscore(fkey1, other_uid): #B
return None #B
now = time.time()
pipeline = conn.pipeline(True)
pipeline.zadd(fkey1, other_uid, now) #C
pipeline.zadd(fkey2, uid, now) #C
pipeline.zrevrange('profile:%s'%other_uid, 0, HOME_TIMELINE_SIZE-1, withscores=True) # 從被關注者的個人時間線裡去最新狀態訊息
following, followers, status_and_score = pipeline.execute()[-3:]
# 更新他們資訊hash
pipeline.hincrby('user:%s'%uid, 'following', int(following)) #F
pipeline.hincrby('user:%s'%other_uid, 'followers', int(followers)) #F
# 更新使用者主頁時間線
if status_and_score:
pipeline.zadd('home:%s'%uid, **dict(status_and_score)) #G
pipeline.zremrangebyrank('home:%s'%uid, 0, -HOME_TIMELINE_SIZE-1)#G
pipeline.execute()
return True #H
def unfollow_user(conn, uid, other_uid):
fkey1 = 'following:%s'%uid #A
fkey2 = 'followers:%s'%other_uid #A
if not conn.zscore(fkey1, other_uid): #B
return None #B
# 從正在關注和被關注中移除雙方ID
pipeline = conn.pipeline(True)
pipeline.zrem(fkey1, other_uid) #C
pipeline.zrem(fkey2, uid) #C
# 獲取被取消關注使用者最近釋出狀態訊息
pipeline.zrevrange('profile:%s'%other_uid, #E
0, HOME_TIMELINE_SIZE-1) #E
following, followers, statuses = pipeline.execute()[-3:]
pipeline.hincrby('user:%s'%uid, 'following', -int(following)) #F
pipeline.hincrby('user:%s'%other_uid, 'followers', -int(followers)) #F
if statuses:
# 移除使用者主頁時間線中相應訊息
pipeline.zrem('home:%s'%uid, *statuses) #G
pipeline.execute()
return True #H
# 使用者取消關注後,主頁時間線上訊息減少,此函式將其填滿
def refill_timeline(conn, incoming, timeline, start=0):
if not start and conn.zcard(timeline) >= 750: #A
return #A
users = conn.zrangebyscore(incoming, start, 'inf', #B
start=0, num=REFILL_USERS_STEP, withscores=True) #B
pipeline = conn.pipeline(False)
for uid, start in users:
pipeline.zrevrange('profile:%s'%uid, #C
0, HOME_TIMELINE_SIZE-1, withscores=True) #C
messages = []
for results in pipeline.execute():
messages.extend(results) #D
messages.sort(key=lambda x:-x[1]) #E
del messages[HOME_TIMELINE_SIZE:] #E
pipeline = conn.pipeline(True)
if messages:
pipeline.zadd(timeline, **dict(messages)) #F
pipeline.zremrangebyrank( #G
timeline, 0, -HOME_TIMELINE_SIZE-1) #G
pipeline.execute()
if len(users) >= REFILL_USERS_STEP:
execute_later(conn, 'default', 'refill_timeline', #H
[conn, incoming, timeline, start]) #H
狀態訊息的釋出與刪除
訊息釋出後要更新要新增到使用者個人時間線和關注者主頁時間線。當關注者非常多時,更新所有關注者主頁會很慢。為了讓釋出操作可以儘快地返回,程式需要做兩件事情。首先,在釋出狀態訊息的時候,程式會將狀態訊息的ID新增到前1000個關注者的主頁時間線裡面。根據Twitter的一項統計表明,關注者數量在1000人以上的使用者只有10萬~25萬,而這10萬~25萬用戶只佔了活躍使用者數量的0.1%,這意味著99.9%的訊息釋出人在這一階段就可以完成自己的釋出操作,而剩下的0.1%則需要接著執行下一個步驟。其次,對於那些關注者數量超過1000人的使用者來說,程式會使用類似於6.4節中介紹的系統來開始一項延遲任務。程式碼清單8-6展示了程式是如何將狀態更新推送給各個關注者的。
def post_status(conn, uid, message, **data):
id = create_status(conn, uid, message, **data) # 建立新的狀態訊息
if not id: #B
return None #B
posted = conn.hget('status:%s'%id, 'posted') # 獲取釋出時間
if not posted: #D
return None #D
post = {str(id): float(posted)}
conn.zadd('profile:%s'%uid, **post) # 新增到個人時間線
syndicate_status(conn, uid, post) # 推送給關注者
return id
POSTS_PER_PASS = 1000 # 每次最多發給1000給關注者
def syndicate_status(conn, uid, post, start=0):
# 獲取上傳被更新的最後一個關注者為啟動,獲取接下來1000個關注者
followers = conn.zrangebyscore('followers:%s'%uid, start, 'inf',
start=0, num=POSTS_PER_PASS, withscores=True) #B
pipeline = conn.pipeline(False)
for follower, start in followers: # 遍歷時更新start,可用於下一次syndicate_status()呼叫
# 修改主頁時間線
pipeline.zadd('home:%s'%follower, **post) #C
pipeline.zremrangebyrank( #C
'home:%s'%follower, 0, -HOME_TIMELINE_SIZE-1)#C
pipeline.execute()
# 超過1000人在延遲任務中繼續操作
if len(followers) >= POSTS_PER_PASS: #D
execute_later(conn, 'default', 'syndicate_status', #D
[conn, uid, post, start]) #D
def delete_status(conn, uid, status_id):
key = 'status:%s'%status_id
lock = acquire_lock_with_timeout(conn, key, 1) # 防止多個程式同時刪除
if not lock: #B
return None #B
# 許可權檢測
if conn.hget(key, 'uid') != str(uid): #C
release_lock(conn, key, lock) #C
return None #C
pipeline = conn.pipeline(True)
pipeline.delete(key) # 刪除指定狀態訊息
pipeline.zrem('profile:%s'%uid, status_id) # 從使用者個人時間線中刪除
pipeline.zrem('home:%s'%uid, status_id) # 從使用者主頁時間線中刪除訊息id
pipeline.hincrby('user:%s'%uid, 'posts', -1) # 減少已釋出訊息數量
pipeline.execute()
release_lock(conn, key, lock)
return True
# 清理在關注者的主頁時間線中被刪除的訊息ID
def clean_timelines(conn, uid, status_id, start=0, on_lists=False):
key = 'followers:%s'%uid #A
base = 'home:%s' #A
if on_lists: #A
key = 'list:out:%s'%uid #A
base = 'list:statuses:%s' #A
followers = conn.zrangebyscore(key, start, 'inf', #B
start=0, num=POSTS_PER_PASS, withscores=True) #B
pipeline = conn.pipeline(False)
for follower, start in followers: #C
pipeline.zrem(base%follower, status_id) #C
pipeline.execute()
if len(followers) >= POSTS_PER_PASS: #D
execute_later(conn, 'default', 'clean_timelines' , #D
[conn, uid, status_id, start, on_lists]) #D
elif not on_lists:
execute_later(conn, 'default', 'clean_timelines', #E
[conn, uid, status_id, 0, True]) #E