1. 程式人生 > 其它 >java基於mongodb實現分散式鎖

java基於mongodb實現分散式鎖

基於mongodb實現分散式鎖

原理

通過執行緒安全findAndModify 實現鎖

實現

定義鎖儲存物件:

/**
 * mongodb 分散式鎖
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "distributed-lock-doc")
public class LockDocument {
    @Id
    private String id;
    private long expireAt;
    private String token;
}

定義Lock API:

public interface LockService {

    String acquire(String key, long expiration);

    boolean release(String key, String token);

    boolean refresh(String key, String token, long expiration);
}

獲取鎖:

 	@Override
    public String acquire(String key, long expiration) {
        Query query = Query.query(Criteria.where("_id").is(key));
        String token = this.generateToken();
        Update update = new Update()
            .setOnInsert("_id", key)
            .setOnInsert("expireAt", System.currentTimeMillis() + expiration)
            .setOnInsert("token", token);

        FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
                                                                 .returnNew(true);
        LockDocument doc = mongoTemplate.findAndModify(query, update, options,
                                                       LockDocument.class);
        boolean locked = doc.getToken() != null && doc.getToken().equals(token);

        // 如果已過期
        if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {
            DeleteResult deleted = this.mongoTemplate.remove(
                Query.query(Criteria.where("_id").is(key)
                                    .and("token").is(doc.getToken())
                                    .and("expireAt").is(doc.getExpireAt())),
                LockDocument.class);
            if (deleted.getDeletedCount() >= 1) {
                // 成功釋放鎖, 再次嘗試獲取鎖
                return this.acquire(key, expiration);
            }
        }

        log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",
                  key, token, locked);
        return locked ? token : null;
    }

原理:

  • 先嚐試upsert鎖物件,如果成功且token一致,說明拿到鎖
  • 否則加鎖失敗
  • 如果未拿到鎖,但是鎖已過期,嘗試刪除鎖
    • 如果刪除成功,再次嘗試拿鎖
    • 如果失敗,說明鎖可能已經續期了

釋放和續期鎖:


   @Override
   public boolean release(String key, String token) {
       Query query = Query.query(Criteria.where("_id").is(key)
                                         .and("token").is(token));
       DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);
       boolean released = deleted.getDeletedCount() == 1;
       if (released) {
           log.debug("Remove query successfully affected 1 record for key {} with token {}",
                     key, token);
       } else if (deleted.getDeletedCount() > 0) {
           log.error("Unexpected result from release for key {} with token {}, released {}",
                     key, token, deleted);
       } else {
           log.error("Remove query did not affect any records for key {} with token {}",
                     key, token);
       }

       return released;
   }

   @Override
   public boolean refresh(String key, String token,
                          long expiration) {
       Query query = Query.query(Criteria.where("_id").is(key)
                                         .and("token").is(token));
       Update update = Update.update("expireAt",
                                     System.currentTimeMillis() + expiration);
       UpdateResult updated =
           mongoTemplate.updateFirst(query, update, LockDocument.class);

       final boolean refreshed = updated.getModifiedCount() == 1;
       if (refreshed) {
           log.debug("Refresh query successfully affected 1 record for key {} " +
                     "with token {}", key, token);
       } else if (updated.getModifiedCount() > 0) {
           log.error("Unexpected result from refresh for key {} with token {}, " +
                     "released {}", key, token, updated);
       } else {
           log.warn("Refresh query did not affect any records for key {} with token {}. " +
                    "This is possible when refresh interval fires for the final time " +
                    "after the lock has been released",
                    key, token);
       }

       return refreshed;
   }

使用


private  LockService lockService;

private void tryAcquireLockAndSchedule() {
        while (!this.stopSchedule) {
            // 嘗試拿鎖
            this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);
            if (this.token != null) {
				// 拿到鎖
            } else {
                // 等待LOCK_EXPIRATION, 再次嘗試
                Thread.sleep(LOCK_EXPIRATION);
            }
        }
    }
  • 先嚐試拿鎖,如果獲取到token,說明拿鎖成功
  • 否則可以sleep一段時間後再拿鎖

完整程式碼,可到github檢視 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java


感謝您的認真閱讀。

如果你覺得有幫助,歡迎點贊支援!

不定期分享軟體開發經驗,歡迎關注作者, 一起交流軟體開發: