Logstash:使用 aggregate filter 處理 N:N 關係
使用Logstash從mysql同步使用者和使用者所有的寵物到ES中。
"register_name": "孟林潔", "id": 80469531, "pets": [ { "breed_name": "萬能梗", "birthday": null, "pet_id": 999044, "name": "一隻狗", "images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}","breed_id": 130 }, { "breed_name": "萬能梗", "birthday": null, "pet_id": 999097, "name": "一隻狗2", "images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}", "breed_id": 130 } ],"mobile": "*******", "avatar": null, "pet_list": [ 999044, 999097 ]
問題:
- logstash同步nested巢狀型別到ES中。
- logstash同步巢狀陣列物件時,聚合過程中資料丟失(使用者寵物會隨機丟失,偶而資料不丟失)。
- logstash同步時少同步一條資料,在停止logstash服務時才進行同步
- (更新) mysql的多條資料同步到es只有一條
解決:
1、解決logstash同步nested巢狀型別到ES中
先建立索引,並且修改索引型別為nested
建立索引:PUT /user 修改索引對映: PUT/user/_mapping/doc { "doc": { "properties": { "avatar": { "type": "text" }, "id": { "type": "long" }, "mobile": { "type": "text" }, "pets": { "type": "nested", "properties": { "birthday": { "type": "date" }, "breed_id": { "type": "long" }, "breed_name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "images": { "type": "text" }, "name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "pet_id": { "type": "long" } } }, "register_name": { "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" } } } }
使用logstash的過濾器中aggregate外掛進行資料聚合。
配置檔案jdbc.conf
input { stdin {} jdbc { jdbc_driver_library => "../mysql-connector-java-6.0.6.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://****.com:3306/food-dev" jdbc_user => "****" jdbc_password => "****" #jdbc_paging_enabled => "true" #jdbc_page_size => "50" clean_run => true use_column_value => true record_last_run => "true" tracking_column => "id" schedule => "*/1 * * * *" #last_run_metadata_path => "/Users/menglinjie/ES-node/testdata.text" statement => "select u.id,u.register_name,u.mobile,u.avatar,u.status,svp.id as pet_id,svp.name,svp.images,svp.gender,svp.birthday,pb.id as breed_id,pb.name as breed_name from user u left join store_vip_pet svp on svp.user_id = u.id and svp.pet_status = 1 left join pet_breed pb on svp.breed_id = pb.id order by u.id desc" } } filter { #這裡做聚合 aggregate { task_id => "%{id}" code => " map['id'] = event.get('id') map['register_name'] = event.get('register_name') map['mobile'] = event.get('mobile') map['avatar'] = event.get('avatar') map['pet_list'] ||=[] map['pets'] ||=[] if (event.get('pet_id') != nil) if !(map['pet_list'].include? event.get('pet_id')) map['pet_list'] << event.get('pet_id') map['pets'] << { 'pet_id' => event.get('pet_id'), 'name' => event.get('name'), 'images' => event.get('images'), 'breed_id' => event.get('breed_id'), 'breed_name' => event.get('breed_name'), 'birthday' => event.get('birthday') } end end event.cancel() " push_previous_map_as_event => true timeout => 5 } json { source => "message" remove_field => ["message"] #remove_field => ["message", "type", "@timestamp", "@version"] } mutate { #將不需要的JSON欄位過濾,且不會被存入 ES 中 remove_field => ["tags", "@timestamp", "@version"] } } output { stdout { #codec => json_lines } elasticsearch { hosts => ["127.0.0.1:9200"] index => "user" document_id => "%{id}" } }
2、解決聚合過程中子陣列物件丟失
問題定位:多執行緒跑聚合過程中,同一個使用者的多個寵物可能被分配到不通過的執行緒,分別做不同的聚合,導致一個使用者存在多條資料,分別擁有不同的寵物,然後多執行緒的進行輸出到ES,ES儲存過程中會把存在的資料給更新掉,這就是我的寵物丟失的原因,多執行緒分配的隨機性導致資料也隨機丟失。
驗證後確認猜想正確。
回想剛才把執行緒數設定為1,這樣肯定會影響效能的吧,萬一以後我有不需要聚合的的資料時完全可以多執行緒跑。Logstash提供的pipelines.yml可以配置多管道,使不同的同步任務繫結不同管道配置。
這裡pipeline.workers: 4,pipeline.output.workers: 3,那麼執行聚合的filter就是1,這樣可以單執行緒聚合,多執行緒輸出。
多個任務可以配置多個管道,pipeline.id標示管道唯一性。
- pipeline.id: user_pipeline pipeline.workers: 4 pipeline.batch.size: 1000 # 輸出 pipeline.output.workers: 3 # 配置檔案位置 path.config: "/Users/menglinjie/ES-node/logstash-6.3.1/conf.d/*.conf" # 對基於磁碟的排隊進行“持久化”。預設值是記憶體 queue.type: persisted
更新:
影響聚合結果的還有sql語句!!
sql語句必須根據聚合task_id排序,也就是需要聚合的資料必須排在一起。否則map['pets']會被覆蓋掉,導致資料丟失。
3、logstash同步時少同步一條資料,在停止logstash服務時才進行同步
在filter 聚合配置中新增:
timeout => 3
filter aggregate 建立中 event map 並不知道我、這次事件是不是應該結束,也就是它也不知道到那一條才是最後一條, 因此設定一個 timeout 告訴它這個時間執行多少秒就結束繼續執行第二個。但這樣並不是很嚴謹,因為你也不確定你的 event map 到底要執行多久 。最好的方式是 我們應該給定一個 task end 的條件ES官網關於 aggregate 的說明
4、es 配置id的問題,必須有唯一性,否則被覆蓋
參考連結:
https://segmentfault.com/a/1190000016592277
https://segmentfault.com/q/1010000016861266
https://blog.csdn.net/weixin_33910460/article/details/88719101