1. 程式人生 > >Elasticsearch更新重複提交和版本控制(併發問題)

Elasticsearch更新重複提交和版本控制(併發問題)

1.樂觀鎖

Elasticsearch 使用這個 _version 號來確保變更以正確順序得到執行。如果舊版本的文件在新版本之後到達,它可以被簡單的忽略。

我們可以利用 _version 號來確保 應用中相互衝突的變更不會導致資料丟失。我們通過指定想要修改文件的 version 號來達到這個目的。 如果該版本不是當前版本號,我們的請求將會失敗。 (1)Java程式碼控制使用upsert()方法

class Thread1 implements Runnable {

    public void run() {
        System.out.println("*************" + Thread.currentThread().getName() + " *************");
        // 設定查詢條件, 查詢不到則新增
        IndexRequest indexRequest = null;
        try {
            indexRequest = new IndexRequest("website", "blog", "1")
                    .source(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("id", "1")
                            .endObject());
            // 設定更新, 查詢到更新下面的設定
            UpdateRequest upsert = new UpdateRequest("website", "blog", "1")
                    .doc(XContentFactory.jsonBuilder()
                            .startObject()
                            .field("process_id", Thread.currentThread().getId())
                            .endObject())
                    .upsert(indexRequest);

            client.update(upsert).get();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

(2)在kibana中 在url上加上version

PUT /website/blog/1?version=1
{
  "title" : "this is title" , 
  "txt" : "just do it"
}

2悲觀鎖

(1)只允許一個執行緒進行執行更新操作,這樣能夠避免併發性問題,在es中,全域性鎖是將一份文件是否存在作為依據

@Test
public void upsertDocument2() throws InterruptedException {

//執行緒數為1是全域性鎖
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    for (int i = 0; i < 10; i++){
        executorService.execute(new Thread1());

    }
    Thread.sleep(10000);
    executorService.shutdown();

}

(2)document文件鎖 當前傳入的process_id和設定的process_id不一致,就拋異常assert false 如果一致的,返回’noop’

PUT website/blog/1
{
  "id" : 1,
  "process_id" : 234
}

修改是process_id 必須一致,否則報錯

POST /website/blog/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "documentLock",
    "params": {
    "process_id": 123
  }
  }
}