Logstash從資料庫一次同步多張表
一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。
博友前天線上提問,說明這塊理解的還不夠透徹。
我整理下,
一是為了儘快解決博友問題,
二是加深記憶,便於未來產品開發中快速上手。
1、同步原理
原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄:
深入詳解Elasticsearch
以下是通過ES5.4.0, logstash5.4.1 驗證成功。
可以確認的是2.X版本同樣可以驗證成功。
2、核心配置檔案
input {
stdin {
}
jdbc {
type => "cxx_article_info"
# mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫
jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "xxxxx"
record_last_run => "true"
use_column_value => "true"
tracking_column => "id"
last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_info"
clean_run => "false"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
statement => "select * from cxx_article_info where id > :sql_last_value"
#定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
schedule => "* * * * *"
#設定ES索引型別
}
jdbc {
type => "cxx_user"
# mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫
jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "xxxxxx"
record_last_run => "true"
use_column_value => "true"
tracking_column => "id"
last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_user_info"
clean_run => "false"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
statement => "select * from cxx_user_info where id > :sql_last_value"
#以下對應著要執行的sql的絕對路徑。
#statement_filepath => "/opt/logstash/bin/logstash_mysql2es/department.sql"
#定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
schedule => "* * * * *"
#設定ES索引型別
}
}
filter {
mutate {
convert => [ "publish_time", "string" ]
}
date {
timezone => "Europe/Berlin"
match => ["publish_time" , "ISO8601", "yyyy-MM-dd HH:mm:ss"]
}
#date {
# match => [ "publish_time", "yyyy-MM-dd HH:mm:ss,SSS" ]
# remove_field => [ "publish_time" ]
# }
json {
source => "message"
remove_field => ["message"]
}
}
output {
if [type]=="cxxarticle_info" {
elasticsearch {
#ESIP地址與埠
hosts => "10.100.11.231:9200"
#ES索引名稱(自己定義的)
index => "cxx_info_index"
#自增ID編號
# document_id => "%{id}"
}
}
if [type]=="cxx_user" {
elasticsearch {
#ESIP地址與埠
hosts => "10.100.11.231:9200"
#ES索引名稱(自己定義的)
index => "cxx_user_index"
#自增ID編號
# document_id => "%{id}"
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
3、同步成功結果
[2017-07-19T15:08:05,438][INFO ][logstash.pipeline ] Pipeline main started
The stdin plugin is now waiting for input:
[2017-07-19T15:08:05,491][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,730][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:09:00,731][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT * FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:10:00,173][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:10:00,174][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.001000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
4、擴充套件
1)多個表無非就是在input裡面多加幾個型別,在output中多加基礎
型別判定。
舉例:
if [type]=="cxx_user"
- 1
2)input裡的type和output if判定的type**保持一致**,該type對應ES中的type。
後記
死磕ES,有問題歡迎大家提問探討!
——————————————————————————————————
更多ES相關實戰乾貨經驗分享,請掃描下方【銘毅天下】微信公眾號二維碼關注。
(每週至少更新一篇!)
2017年07月19日 23:32 於家中床前
相關推薦
Logstash從資料庫一次同步多張表
一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 博友前天線上提問,說明這塊理解的還不夠透徹。 我整理下, 一是為了儘快解決博友問題, 二是加深記憶,便於未來產品開發中快速上手。 1、同步原理 原有ES專欄中有詳解,不再贅述。詳細請參
mysql 從相同類型的多張表中提取到一張表中
tab cursor 同時 sql and ble 有時 eat sch 蝸牛背著沈重的殼,貼著地面一步步艱難地向前爬行,不回頭,也不左顧右盼,只是朝著自己想到達的地方行進。 有時候需要從多張相同類型的表中提取數據,這些表有一些相同的列或者表結構完全相同,同時表名存在一定的
使用JDBC一次插入多個表、多條記錄
程式碼如下: public static void insertBatch() { int count[]; int count1[]; Boolean isinsert = false; Connection con = null; PreparedS
在JDBC一次插入多個表、多條記錄
{ con = getCon(); con.setAutoCommit(false); // 需要用到事務,不能讓他自動提交,需要手動提交 pst = con.prepareStatement(INSERT_SQL);
一個表一次關聯兩張表 desc降序 asc升序
public function wocaotuijian(){ $list=Db::table('xc_tuijiansum')->alias('a') ->join('xc_member me','a.ui
25.logstash一次同步Mysql多張表到ES(ES與關係型資料庫同步)
題記一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 博友前天線上提問,說明這塊理解的還不夠透徹。 我整理下, 一是為了儘快解決博友問題, 二是加深記憶,便於未來產品開發中快速上手。1、同步原理原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄: 深
基於ELK5.1(ElasticSearch, Logstash, Kibana)的一次整合測試
success move maven issues ats call using env proto 前言開源實時日誌分析ELK平臺(ElasticSearch, Logstash, Kibana組成),能很方便的幫我們收集日誌,進行集中化的管理,並且能很方便的進行日誌的統
dtd + 復雜元素的子元素出現次數 一次或多次
ges block 珍惜 html XML 1.0 條件 sch version 禮悟: 好好學習多思考,尊師重道存感恩。葉見尋根三二一,江河湖海同一體。 虛懷若谷良心主,願行無悔給最苦。讀書鍛煉強身心,誠勸且行且珍惜。 xml:1.0
一次下載多個文件
itl cnblogs script log logs html 文件 download nload 一次下載多個文件 JavaScript多文件下載一次下載多個文件
分離式編譯時 鏈接器工具錯誤 (一個變量被定義一次或多次)
效果 include private 可讀性 con lnk2005 可能 ring 生成 在編寫程序時,將類中的函數成員的聲明和定義分開,在頭文件(.h)中進行聲明,在源文件(.cpp)中進行定義 以及具體功能的實現。達到分離式編譯的效果,提高代碼的可讀性。 自己在編寫是
一次針對多臺服務器交互式主機命令采集Python腳本編寫
.py 數據庫版本 toad 監控主機 pro efault 分享 多臺 linux 【環境介紹】 系統環境:Linux + Python 2.7.10(監控主機) 【背景描述】 需求:每次節假日或者重要時間時,需要對數據庫主機信息進行檢查,比如主機空間使用率之類。
(轉)Spring文件上傳,包括一次選中多個文件
bmi while .html span cto input 獲取文件 dex asn 背景: http://www.cnblogs.com/lixuwu/p/8495275.html已經實現了單文件的上傳和下載,多文件的上傳是另一種情景,這裏記錄下來 實現過程
記一次安裝多版本php的四個雷區,你踩著了嗎
path start cgi 命令執行 mysq -c tool port 一鍵 記一次安裝多版本的php的四個雷區,你踩著了嗎 需求:公司需要在同一臺服務器上安裝不同版本的php,而這一臺的服務上已經安裝了php.7.1,現需要同
oracle Insert 一次插入多條記錄
pan rac ora 方法 tab where ble code 兩種方法 oracle Insert 一次插入多條記錄有兩種方法: 1)Insert All Into table_name values ... insert all into table_name v
一次下載多個檔案的解決思路-JS
一次下載多個檔案的解決思路(iframe) - Eric 真實經歷 最近開發專案需要做檔案下載,想想挺簡單的,之前也做過,後臺提供下載介面,前端使用window.location.href就行了唄。不過開發的時候發現,有些檔案有附屬檔案,點選 下載按鈕 需要下載兩個檔案,而且不能使用壓縮包的形式。想想
記一次800多萬XML文字檔案預處理經歷
一.背景 由於某些需求,現需對系統在最近幾個月生成的xml檔案進行預處理,提取<text>標籤內的資料進行分析。這些需要預處理的資料大概有280GB左右880多萬,存放在gysl目錄下,gysl的下一層按天命名,分為若干個目錄,接下來一層目錄下又有多個目錄,我們所需的xml目錄就在這一層。我們現
記一次800多萬XML文本文件預處理經歷
超過 random while 表達式 range utf-8 test 現在 其他 一.背景 由於某些需求,現需對系統在最近幾個月生成的xml文件進行預處理,提取<text>標簽內的數據進行分析。這些需要預處理的數據大概有280GB左右880多萬,存放在gys
maven 一次打包多個maven專案
maven 一次打包多個maven專案。 使用場景 一個專案由多個子專案組成,每個子專案也是一個maven專案。每次打包需要打包每個子專案,很麻煩,其實可以通過配置一個頂級的pom.xml檔案來解決這個問題,只需要打包頂層的maven專案,即可。如果一個專案有多個子專案的pom.
Laravel一次更新多條記錄,批量更新的方法
在我們實際應用中,免不了這樣的情況——例如我們同時錄入多條資訊,可能三條五條還好說,但量一旦變大,就會增加讀寫資料庫的次數,會降低效率,那麼,我們該如何實現,做到一次讀寫資料庫,批量更新呢? 例如這種情況: HTML程式碼: <!doctype html> &l
使用JDBC一次插入多條記錄(以MySQL為例)
閱讀本文需要的先修知識: 最基本的SQL語句 最基本的JDBC操作(如插入單條記錄) 如急需使用請直接看最後一段程式碼。 在JDBC中,插入記錄最簡單的方法是使用executeUpdate()方法,但該方法中的引數只能是單條SQL語句,其實對於需要INSERT或者UPDA