1. 程式人生 > >分散式重建快取的併發衝突 詳解

分散式重建快取的併發衝突 詳解

在分散式系統中,如果快取服務在本地的 Ehcache 中都讀取不到資料,此時需要重新到源頭的服務中去拉去資料,拉取到資料之後,趕緊先給 Nginx 的請求返回,同時將資料寫入 Ehcache 和 Redis中。此時會出現分散式重建快取的併發衝突問題重建快取 : 比如資料在所有的快取中都不存在 (如使用 LRU演算法 給清理掉),就需要重新查詢資料寫入快取,重建快取分散式的重建快取 : 在不同的機器上,不同的服務例項中,去做上面的事情,就會出現多個機器分散式重建去讀取相同的資料,然後寫入快取中1> 流量均勻分佈到所有快取服務例項上 : 應用層Nginx 將請求流量均勻地打到各個快取服務例項中的,相同的服務可能部署到不同的機器上2> 應用層Nginx 通過 商品id 的 hash計算,走固定的快取服務例項;分發層的 Nginx 的 Lua指令碼,通過對 商品id做一個hash,然後對應用nginx數量取模,最終找到固定在 Nginx的地址列表 中的地址。將每個商品的請求固定分發到同一個 應用層Nginx 上面去。在 應用層Nginx 裡,發現自己本地 lua shared dict 快取中沒有資料的時候,就採取一樣的方式,對 product id 取模,然後將請求固定分發到同一個快取服務例項中去,這樣的話,就不會出現說多個快取服務例項分散式的去更新那個快取3> 源資訊服務傳送的變更訊息,需要按照 商品id 去分割槽,固定的商品變更走固定的 Kafka分割槽,也就是固定的一個快取服務例項獲取到。快取服務,是監聽 Kafka topic 的,一個快取服務例項,作為一個 kafka consumer,就消費 topic 中的一個 partition,所以有多個快取服務例項的話,每個快取服務例項就消費一個 kafka partition。所以這裡,一般來說,源頭資訊服務,在傳送訊息到 kafka topic 的時候,都需要按照 product id 去分割槽,也就時說,同一個 product id變更的訊息一定是到同一個 kafka partition 中去的,也就是說同一個 product id的變更訊息,一定是同一個快取服務例項消費到的。實現方式很簡單就是通過 kafka producer api,裡面 send message 的時候,多加一個引數就即可,product id 傳遞進去4> 問題 : 自己寫的簡易的 hash分發,與 kafka的分割槽,可能並不一致。自己寫的簡易的 hash分發策略,是按照 crc32 去取 hash值,然後再取模的。但是,不知道 kafka producer 的 hash策略是什麼,很可能說跟我們的策略是不一樣的,這就可能導致說,資料變更的訊息所到的快取服務例項,跟應用層 Nginx分發到的那個快取服務例項也許就不在一臺機器上了,這樣的話,在高併發,極端的情況下,可能就會出現衝突5> 分散式的快取重建併發衝突問題發生 : 比如說同意服務部署在多臺機器上,都需要修改快取資料,此時就會出現,快取被覆蓋即使用並非最新的原始資料區覆蓋快取中已經是最新資料,此時將會導致快取資料不是最新資料,從而發生資料不一致的情況基於 zookeeper 分散式鎖的解決 分散式重建快取的併發衝突分散式鎖 : 如果有多個機器在訪問同一個共享資源,那麼這個時候,如果需要加個鎖,讓多個分散式的機器在訪問共享資源的時候序列起來,這個時候,多個不同機器上的服務共享這個鎖,這就是分散式鎖分散式鎖當然有很多種不同的實現方案,redis分散式鎖,zookeeper分散式鎖。採用 zookeeper 做分散式協調這一塊,還是很流行的,大資料應用裡面,hadoop,storm,都是基於zk去做分散式協調zk分散式鎖的解決併發衝突的方案 :1> 變更快取重建以及空快取請求重建,更新 Redis 之前,都需要先獲取對應 商品id 的分散式鎖2> 拿到分散式鎖之後,需要根據時間版本去比較一下,如果自己的版本新於 Redis 中的版本,那麼就更新,否則就不更新3> 如果拿不到分散式鎖,那麼就等待,不斷輪詢等待,直到獲取到分散式的鎖zk分散式鎖的原理 : 通過去建立 zk 的一個臨時node,來模擬給一個 商品id 加鎖。zk會保證只會建立一個臨時node,其他請求過來如果再要建立臨時node,就會報錯丟擲 NodeExistsException。所以說,所謂上鎖,其實就是去建立某個 product id 對應的一個 臨時node,如果 臨時node 建立成功了,那麼說明成功加鎖,此時就可以去執行對 Redis 裡面資料的操作。如果臨時node 建立失敗,說明鎖已經被某個服務拿到,在操作 Reids 中的資料,那麼就不斷的等待,直到可以獲取到鎖為止zk分散式鎖的程式碼封裝 : zookeeper java client api去封裝連線zk,以及獲取分散式鎖,還有釋放分散式鎖的程式碼。釋放一個分散式鎖,去刪除掉那個 臨時node 即可,就代表釋放一個鎖,那麼此時其他的機器就可以建立臨時node,獲取到鎖。使用 zk 去實現一個分散式鎖,也有很多種做法,有複雜的,也有簡單的,此處使用的是一種最簡單有效的分散式鎖,能滿足大多數情況的使用引入 zookeeper 庫<dependency>
    <groupId>org.apache.zookeeper</groupId>    <artifactId>zookeeper</artifactId>    <version>3.4.12</version></dependency>建立 ZookeeperSession,來在 zookeeper 上建立、刪除臨時節點public class ZookeeperSession {    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private static CountDownLatch countDownLatch = new CountDownLatch(1);    private ZooKeeper zooKeeper;    private ZookeeperSession() {        try {            this.zooKeeper = new ZooKeeper("ci-server:2181", 5000, new ZookeeperWatcher());            // 給一個狀態 CONNECTING,連線中            logger.debug("Zookeeper session State => ", zooKeeper.getState());
            countDownLatch.await();            logger.debug("ZooKeeper session established......");        } catch (IOException | InterruptedException e) {            e.printStackTrace();        }    }    /**     * 獲取分散式鎖     *     * @param productId     */    public void acquireDistributedLock(Long productId) {        String path = "/product-lock-" + productId;        try {            zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);            logger.debug("success to acquire lock for product[id={}]", productId);        } catch (KeeperException | InterruptedException e) {            logger.debug("fail[1] to acquire lock for product[id={}]", productId);            // 如果對應那個商品的鎖的 node 已經存在,就是已經被別人加鎖,那麼就會報錯,丟擲 NodeExistsException            int count = 0;            while (true) {                try {                    Thread.sleep(20);                    zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);                } catch (InterruptedException | KeeperException e1) {                    e1.printStackTrace();                    count++;                    continue;                }                logger.debug("success to acquire lock for product[id={}] after {} times try......", productId, count);                break;            }        }    }    /**     * 釋放掉一個分散式鎖     *     * @param productId     */    public void releaseDistributedLock(Long productId) {        try {            // 刪除掉所有匹配節點版本的 node            zooKeeper.delete("/product-lock-" + productId, -1);        } catch (InterruptedException | KeeperException e) {            e.printStackTrace();        }    }    /**     * 建立 zookeeper session 的 Watcher     */    private class ZookeeperWatcher implements Watcher {        @Override        public void process(WatchedEvent event) {            logger.debug("Receive watched event: {}", event.getState());            if (Event.KeeperState.SyncConnected == event.getState()) {                countDownLatch.countDown();            }        }    }    private static class Singleton {        private static ZookeeperSession instance;        static {            instance = new ZookeeperSession();        }        public static ZookeeperSession getInstance() {            return instance;        }    }    /**     * 獲取 ZookeeperSession 例項     *     * @return     */    public static ZookeeperSession getInstance() {        return Singleton.getInstance();    }    /**     * 初始化單例的便捷方式     */    public static void init() {        getInstance();    }}業務程式碼1> 主動更新 : 監聽 kafka訊息佇列,獲取到一個商品變更的訊息之後,去那個源服務中呼叫介面拉取資料,更新到 Ehcache和 Redis中。先獲取分散式鎖,然後才能更新 Redis,同時更新時要比較時間版本2> 被動重建 : 直接讀取源頭資料,直接返回給 Nginx,同時推送一條訊息到一個佇列,後臺執行緒非同步消費,後臺現成負責先獲取分散式鎖,然後才能更新 Redis,同時要比較時間版本