1. 程式人生 > 實用技巧 >Logstash:使用 aggregate filter 處理 N:N 關係

Logstash:使用 aggregate filter 處理 N:N 關係

使用Logstash從mysql同步使用者和使用者所有的寵物到ES中。

"register_name": "孟林潔",
    "id": 80469531,
    "pets": [
      {
        "breed_name": "萬能梗",
        "birthday": null,
        "pet_id": 999044,
        "name": "一隻狗",
        "images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}",
        
"breed_id": 130 }, { "breed_name": "萬能梗", "birthday": null, "pet_id": 999097, "name": "一隻狗2", "images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}", "breed_id": 130 } ],
"mobile": "*******", "avatar": null, "pet_list": [ 999044, 999097 ]

問題:

  1. logstash同步nested巢狀型別到ES中。
  2. logstash同步巢狀陣列物件時,聚合過程中資料丟失(使用者寵物會隨機丟失,偶而資料不丟失)。
  3. logstash同步時少同步一條資料,在停止logstash服務時才進行同步
  4. (更新) mysql的多條資料同步到es只有一條

解決:

1、解決logstash同步nested巢狀型別到ES中

先建立索引,並且修改索引型別為nested

建立索引:PUT /user
修改索引對映:
PUT 
/user/_mapping/doc { "doc": { "properties": { "avatar": { "type": "text" }, "id": { "type": "long" }, "mobile": { "type": "text" }, "pets": { "type": "nested", "properties": { "birthday": { "type": "date" }, "breed_id": { "type": "long" }, "breed_name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "images": { "type": "text" }, "name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "pet_id": { "type": "long" } } }, "register_name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" } } } }

使用logstash的過濾器中aggregate外掛進行資料聚合。

配置檔案jdbc.conf

input {
    stdin {}
    jdbc {
        jdbc_driver_library => "../mysql-connector-java-6.0.6.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://****.com:3306/food-dev"
        jdbc_user => "****"
        jdbc_password => "****"
        #jdbc_paging_enabled => "true"
        #jdbc_page_size => "50"
        clean_run => true
        use_column_value => true
        record_last_run => "true"
        tracking_column => "id"
        schedule => "*/1 * * * *"
        #last_run_metadata_path => "/Users/menglinjie/ES-node/testdata.text"
        statement => "select u.id,u.register_name,u.mobile,u.avatar,u.status,svp.id  as pet_id,svp.name,svp.images,svp.gender,svp.birthday,pb.id as breed_id,pb.name as breed_name from user u left join store_vip_pet svp on svp.user_id = u.id and svp.pet_status = 1 left join pet_breed pb on svp.breed_id = pb.id order by u.id desc"
   }
 
}
 
filter {
#這裡做聚合
     aggregate {
        task_id => "%{id}"
        code => "
            map['id'] = event.get('id')
            map['register_name'] = event.get('register_name')
            map['mobile'] = event.get('mobile')
            map['avatar'] = event.get('avatar')
            map['pet_list'] ||=[]
            map['pets'] ||=[]
            if (event.get('pet_id') != nil)
                if !(map['pet_list'].include? event.get('pet_id'))  
                    map['pet_list'] << event.get('pet_id')        
                    map['pets'] << {
                        'pet_id' => event.get('pet_id'),
                        'name' => event.get('name'),
                        'images' => event.get('images'),
                        'breed_id' => event.get('breed_id'),
                        'breed_name' => event.get('breed_name'),
                        'birthday' => event.get('birthday')
                    }
                end
            end
            event.cancel()
        "
        
        push_previous_map_as_event => true
        timeout => 5
    }
    json {
        source => "message"
        remove_field => ["message"]
        #remove_field => ["message", "type", "@timestamp", "@version"]
    }
    mutate  {
        #將不需要的JSON欄位過濾,且不會被存入 ES 中
        remove_field => ["tags", "@timestamp", "@version"]
    }
}
 
output {
   stdout {
        #codec => json_lines
   }
        elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "user"
        document_id => "%{id}"
   }
}

2、解決聚合過程中子陣列物件丟失

問題定位:多執行緒跑聚合過程中,同一個使用者的多個寵物可能被分配到不通過的執行緒,分別做不同的聚合,導致一個使用者存在多條資料,分別擁有不同的寵物,然後多執行緒的進行輸出到ES,ES儲存過程中會把存在的資料給更新掉,這就是我的寵物丟失的原因,多執行緒分配的隨機性導致資料也隨機丟失。

驗證後確認猜想正確。

回想剛才把執行緒數設定為1,這樣肯定會影響效能的吧,萬一以後我有不需要聚合的的資料時完全可以多執行緒跑。Logstash提供的pipelines.yml可以配置多管道,使不同的同步任務繫結不同管道配置。

這裡pipeline.workers: 4,pipeline.output.workers: 3,那麼執行聚合的filter就是1,這樣可以單執行緒聚合,多執行緒輸出。

多個任務可以配置多個管道,pipeline.id標示管道唯一性。

- pipeline.id: user_pipeline
  pipeline.workers: 4
  pipeline.batch.size: 1000
  # 輸出
  pipeline.output.workers: 3
  # 配置檔案位置
  path.config: "/Users/menglinjie/ES-node/logstash-6.3.1/conf.d/*.conf"
  # 對基於磁碟的排隊進行“持久化”。預設值是記憶體
  queue.type: persisted

更新:

影響聚合結果的還有sql語句!!

sql語句必須根據聚合task_id排序,也就是需要聚合的資料必須排在一起。否則map['pets']會被覆蓋掉,導致資料丟失。

3、logstash同步時少同步一條資料,在停止logstash服務時才進行同步

在filter 聚合配置中新增:

timeout => 3

filter aggregate 建立中 event map 並不知道我、這次事件是不是應該結束,也就是它也不知道到那一條才是最後一條, 因此設定一個 timeout 告訴它這個時間執行多少秒就結束繼續執行第二個。但這樣並不是很嚴謹,因為你也不確定你的 event map 到底要執行多久 。最好的方式是 我們應該給定一個 task end 的條件ES官網關於 aggregate 的說明

4、es 配置id的問題,必須有唯一性,否則被覆蓋

參考連結:

https://segmentfault.com/a/1190000016592277

https://segmentfault.com/q/1010000016861266

https://blog.csdn.net/weixin_33910460/article/details/88719101

https://elasticsearch.cn/question/6648