1. 程式人生 > 程式設計 >使用 Redis 實現 Feed 流

使用 Redis 實現 Feed 流

背景

最近接到一個需求,用一句話來說就是:展示關注人釋出的動態,這個涉及到 feed 流系統的設計。本文主要介紹一個一般企業可用的 Feed 流解決方案。

相關概念

下面先介紹一下關於 Feed 流的簡單概念。

什麼是 feed 流

  • Feed:Feed 流中的每一條狀態或者訊息都是 Feed,比如微博中的一條微博就是一個 Feed。
  • Feed流:持續更新並呈現給使用者內容的資訊流。每個人的朋友圈,微博關注頁等等都是一個 Feed 流。

feed 流分類

Feed 流常見的分類有兩種:

  • Timeline:按釋出的時間順序排序,產品如果選擇 Timeline 型別,那麼就是認為 Feed 流中的 Feed 不多,但是每個 Feed 都很重要,都需要使用者看到。類似於微信朋友圈,微博等。
  • Rank:按某個非時間的因子排序,一般是按照使用者的喜好度排序,一般用於新聞推薦類、商品推薦等。

設計

設計一個 Feed 流系統,兩個關鍵步驟,一個是 Feed 流的 初始化,一個是 推送。關於 Feed 流的儲存其實也是一個核心的點,但是筆主持久化使用的還是 MySQL,後續可以考慮優化。

Feed 流初始化

Feed 流【關注頁 Feed 流】的初始化指的是,當使用者的 Feed 流還不存在的時候,為該使用者建立一個屬於他自己的關注頁 Feed 流,具體怎麼做呢?其實很簡單,遍歷一遍關注列表,取出所有關注使用者的 feed,將 feedId 存放到 redis 的 sortSet

中即可。這裡面有幾個關鍵點:

  • 初始化資料:初始化的資料需要從資料庫中載入出來。
  • key 值:sortSet 的 key 值需要使用當前使用者的 id 做標識。
  • score 值:如果是 Timeline 型別,直接取 feed 建立的時間戳即可。如果是 rank 型別,則把你的業務對應的權重值設進去。

推送

經過上面的初始化,已經把 feed 流放在了 redis 快取中了。接下來就是需要更新 feed 流了,在下面四種情況需要進行更新:

  1. 關注的使用者釋出新的 feed:
  2. 關注的使用者刪除 feed。
  3. 使用者新增關注。
  4. 使用者取消關注。

釋出/刪除 Feed 流程

上面四步具體怎麼操作,會在下面的實現步驟中詳細描述,在這裡先我們重點討論一下第一、二種情況。因為在處理 大V 【千萬級別粉絲】的時候,我們是需要對 大V 的所有粉絲的 feed 流進行處理的,這時候涉及到的量就會非常巨大,需要多加斟酌。關於推送,一般有兩種 推/拉。

  • :A使用者釋出新的動態時,要往 A使用者所有的粉絲 feed 流中推。
  • :A使用者釋出新的動態時,先不進行推送,而是等 粉絲進來的時候,才主動到 A使用者的個人頁TimeLine 拉取最新的 feed,然後進行一個 merge。如果關注了多個大V,可以併發的向多個大V 個人頁TimeLine 中拉取。
推拉結合模式

當使用者釋出一條新的 Feed 時,處理流程如下:

  1. 先從關注列表中讀取到自己的粉絲列表,以及判斷自己是否是大V。
  2. 將自己的Feed訊息寫入個人頁Timeline。如果是大V,寫入流程到此就結束了。
  3. 如果是普通使用者,還需要將自己的Feed訊息寫給自己的粉絲,如果有100個粉絲,那麼就要寫給100個使用者。

當重新整理自己的Feed流的時候,處理流程如下:

  1. 先去讀取自己關注的大V列表
  2. 去讀取自己的 Feed 流。
  3. 如果有關注的大V,則再次併發讀取每一個大V的個人頁Timeline,如果關注了10個大V,那麼則需要10次訪問。
  4. 合併2和3步的結果,然後按時間排序,返回給使用者。

至此,使用推拉結合方式的釋出,讀取Feed流的流程都結束了。

推模式

如果只是用推模式了,則會變的比較簡單:

  • 釋出Feed:
    • 不用區分是否大V,所有使用者的流程都一樣,都是三步。
  • 讀取Feed流:
    • 不需要第一步,也不需要第三步,只需要第二步即可,將之前的2 + N(N是關注的大V個數) 次網路開銷減少為 1 次網路開銷。讀取延時大幅降級。
兩種模式總結:

推拉結合存在一個弊端,就是重新整理自己的Feed流時,大V的個人頁Timeline 的讀壓力會很大。

如何解決:

  1. 不使用大V/普通使用者的優化方式,使用對活躍粉絲採用推模式,非活躍粉絲採用拉模式。
  2. 完全使用推模式就可以徹底解決這個問題,但是會帶來儲存量增大,大V Feed 傳送總時間增大,從發給第一個粉絲到發給最後一個粉絲可能要幾分鐘時間。

實現

筆主主要採用純推模式實現了一個普通企業基本可用的 Feed 流系統,下面介紹一下具體的實現程式碼,主要包括3大個部分:

  1. 初始化 Feed 流。
  2. 關注的使用者釋出/刪除 feed,該使用者的粉絲更新自己的Feed流。
  3. 使用者新增/取消關注,更新自己的Feed流。

初始化 Feed 流

當使用者第一進來重新整理Feed 流,且 Feed 流還不存在時,我們需要進行初始化,初始化的具體程式碼如下:核心思想就是從資料庫中load出 feed 資訊,塞到 zSet 中,然後分頁返回。

    /**
     * 獲取關注的人的資訊流
     */
    public List<FeedDto> listFocusFeed(Long userId,Integer page,Integer size) {
        String focusFeedKey = "focusFeedKey" + userId;

        // 如果 zset 為空,先初始化
        if (!zSetRedisTemplate.exists(focusFeedKey)) {
            initFocusIdeaSet(userId);
        }

        // 如果 zset 存在,但是存在 0 值
        Double zscore = zSetRedisTemplate.zscore(focusFeedKey,"0");
        if (zscore != null && zscore > 0) {
            return null;
        }

        //分頁
        int offset = (page - 1) * size;

        long score = System.currentTimeMillis();
        // 按 score 值從大到小從 zSet 中取出 FeedId 集合
        List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey,score,0,offset,size);

        List<FeedDto> result = new ArrayList<>();
        if (QlchatUtil.isNotEmpty(list)) {
            for (String s : list) {
                // 根據 feedId 從快取中 load 出 feed
                FeedDto feedDto = this.loadCache(Long.valueOf(s));
                if (feedDto != null) {
                    result.add(feedDto);
                }
            }
        }
        return result;
    }

    /**
     * 初始化關注的人的資訊流 zSet
     */
    private void initFocusFeedSet( Long userId) {
        String focusFeedKey = "focusFeedKey" + userId;
        zSetRedisTemplate.del(focusIdeaKey);

        // 從資料庫中載入當前使用者關注的人釋出過的 Feed
        List<Feed> list = this.feedMapper.listFocusFeed(userId);

        if (QlchatUtil.isEmpty(list)) {
            //儲存0,避免空資料頻繁查庫
            zSetRedisTemplate.zadd(focusFeedKey,1,"0");
            zSetRedisTemplate.expire(focusFeedKey,RedisKeyConstants.ONE_MINUTE * 5);
            return;
        }

        // 遍歷 FeedList,把 FeedId 存到 zSet 中
        for (Feed feed : list) {
            zSetRedisTemplate.zadd(focusFeedKey,feed.getCreateTime().getTime(),feed.getId().toString());
        }

        zSetRedisTemplate.expire(focusFeedKey,60 * 60 * 60);
    }
複製程式碼

關注的使用者釋出/刪除新的 feed

每當使用者釋出/刪除新的 feed,我們需要更新該使用者所有的粉絲的 Feed流,該步驟一般比較耗時,所以建議非同步處理,為了避免一次性load出太多的粉絲資料,這裡採用迴圈分頁查詢。為了避免粉絲的 Feed流過大,我們會限制 Feed 流的長度為1000,當Feed流長度超過1000時,會移除最舊的 Feed。

    /**
     * 新增/刪除 feed時,處理粉絲 feed 流
     *
     * @param userId 新增/刪除 feed的使用者id
     * @param feedId 新增/刪除 的feedId
     * @param type   feed_add = 新增feed feed_sub = 刪除feed
     */
    public void handleFeed(Long userId,Long feedId,String type) {

        Integer currentPage = 1;
        Integer size = 1000;
        List<FansDto> fansDtos;

        while (true) {
            Page page = new Page();
            page.setSize(size);
            page.setPage(currentPage);
            fansDtos = this.fansService.listFans(userId,page);

            for (FansDto fansDto : fansDtos) {
                String focusFeedKey = "focusFeedKey" + userId;

                // 如果粉絲 zSet 不存在,退出
                if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
                    continue;
                }

                // 新增Feed
                if ("feed_add".equals(type)) {
                    this.removeOldestZset(focusFeedKey);
                    zSetRedisTemplate.zadd(focusFeedKey,System.currentTimeMillis(),feedId);
                }
                // 刪除Feed
                else if ("feed_sub".equals(type)) {
                    zSetRedisTemplate.zrem(focusFeedKey,feedId);
                }

            }

            if (fansDtos.size() < size) {
                break;
            }
            currentPage++;
        }

    }

    /**
     * 刪除 zSet 中最舊的資料
     */
    private void removeOldestZset(String focusFeedKey){
        // 如果當前 zSet 大於1000,刪除最舊的資料
        if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {
            // 獲取當前 zSet 中 score 值最小的
            List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey,-1,String.class);
            if (QlchatUtil.isNotEmpty(zrevrange)) {
                this.zSetRedisTemplate.zrem(focusFeedKey,zrevrange.get(0));
            }
        }
    }
複製程式碼

使用者新增關注/取消關注

這裡比較簡單,新增/取消關注,把新關注的 Feed 往自己的 Feed流中增加/刪除 即可,但是同樣需要非同步處理。

    /**
     * 關注/取關 時,處理followerId的zSet
     *
     * @param followId   被關注的人
     * @param followerId 當前使用者
     * @param type       focus = 關注 unfocus = 取關
     */
    public void handleFocus( Long followId,Long followerId,String type) {

        String focusFeedKey = "focusFeedKey" + userId;

        // 如果粉絲 zSet 不存在,退出
        if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
            return;
        }
        List<FeedDto> FeedDtos = this.listFeedByFollowId(source,followId);
        for (FeedDto feedDto : FeedDtos) {

            // 關注
            if ("focus".equals(type)) {
                this.removeOldestZset(focusFeedKey);
                this.zSetRedisTemplate.zadd(focusFeedKey,feedDto.getCreateTime().getTime(),feedDto.getId());
            }
            // 取關
            else if ("unfocus".equals(type)) {
                this.zSetRedisTemplate.zrem(focusFeedKey,feedDto.getId());
            }


        }
    }
複製程式碼

上面展示的是核心程式碼,僅僅是為大家提供一個實現思路,並不是直接可執行的程式碼,畢竟真正實現還會涉及到很多其他的無關要緊的類。

最後

在這裡已經介紹完一個簡單可用的 Feed流系統,歡迎各路大神指出錯誤,多提意見!

參考文章: