怎樣將 MySQL 資料表匯入到 Elasticsearch
阿新 • • 發佈:2022-05-02
本文節選自《Netkiller Database 手札》
MySQL 匯入 Elasticsearch 的方法有很多,通常是使用ETL工具,但我覺得太麻煩。於是想到 logstash 。
23.8. Migrating MySQL Data into Elasticsearch using logstash
23.8.1. 安裝 logstash
安裝 JDBC 驅動 和 Logstash
curl -s https://raw.githubusercontent.com/oscm/shell/master/database/mysql/5.7/mysql-connector-java.sh | bash curl -s https://raw.githubusercontent.com/oscm/shell/master/log/kibana/logstash-5.x.sh | bash
mysql 驅動檔案位置在 /usr/share/java/mysql-connector-java.jar
23.8.2. 配置 logstash
建立配置檔案 /etc/logstash/conf.d/jdbc-mysql.conf
mysql> desc article; +-------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+---------+-------+ | id | int(11) | NO | | 0 | | | title | mediumtext | NO | | NULL | | | description | mediumtext | YES | | NULL | | | author | varchar(100) | YES | | NULL | | | source | varchar(100) | YES | | NULL | | | ctime | datetime | NO | | NULL | | | content | longtext | YES | | NULL | | +-------------+--------------+------+-----+---------+-------+ 7 rows in set (0.00 sec)
input { jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => "* * * * *" statement => "select * from article" } } output { elasticsearch { hosts => "localhost:9200" index => "information" document_type => "article" document_id => "%{id}" } }
23.8.3. 啟動 Logstash
root@netkiller /var/log/logstash % systemctl restart logstash
root@netkiller /var/log/logstash % systemctl status logstash
● logstash.service - logstash
Loaded: loaded (/etc/systemd/system/logstash.service; enabled; vendor preset: disabled)
Active: active (running) since Mon 2017-07-31 09:35:00 CST; 11s ago
Main PID: 10434 (java)
CGroup: /system.slice/logstash.service
└─10434 /usr/bin/java -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -Djava.awt.headless=true -Dfi...
Jul 31 09:35:00 VM_3_2_centos systemd[1]: Started logstash.
Jul 31 09:35:00 VM_3_2_centos systemd[1]: Starting logstash...
root@netkiller /var/log/logstash % cat logstash-plain.log
[2017-07-31T09:35:28,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-07-31T09:35:28,172][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-07-31T09:35:28,298][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<Java::JavaNet::URI:0x453a18e9>}
[2017-07-31T09:35:28,299][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-07-31T09:35:28,337][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-07-31T09:35:28,344][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash
[2017-07-31T09:35:28,465][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<Java::JavaNet::URI:0x66df34ae>]}
[2017-07-31T09:35:28,483][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>1000}
[2017-07-31T09:35:29,562][INFO ][logstash.pipeline ] Pipeline main started
[2017-07-31T09:35:29,700][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-31T09:36:01,019][INFO ][logstash.inputs.jdbc ] (0.006000s) select * from article
23.8.4. 驗證
% curl -XGET 'http://localhost:9200/_all/_search?pretty'
23.8.5. 配置模板
23.8.5.1. 全量匯入
適合資料沒有改變的歸檔資料或者只能增加沒有修改的資料
input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from article"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "article"
document_id => "%{id}"
}
}
23.8.5.2. 多表匯入
多張資料表匯入到 Elasticsearch
# multiple inputs on logstash jdbc
input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from article"
type => "article"
}
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from comment"
type => "comment"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "%{type}"
document_id => "%{id}"
}
}
需要在每一個jdbc配置項中加入 type 配置,然後 elasticsearch 配置項中加入 document_type => "%{type}"
23.8.5.3. 通過 ID 主鍵欄位增量複製資料
input {
jdbc {
statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
# ... other configuration bits
}
}
tracking_column_type => "numeric" 可以宣告 id 欄位的資料型別, 如果不指定將會預設為日期
[2017-07-31T11:08:00,193][INFO ][logstash.inputs.jdbc ] (0.020000s) select * from article where id > '2017-07-31 02:47:00'
如果複製不對稱可以加入 clean_run => true 配置項,清楚資料
23.8.5.4. 通過日期欄位增量複製資料
input {
jdbc {
statement => "SELECT * FROM my_table WHERE create_date > :sql_last_value"
use_column_value => true
tracking_column => "create_date"
# ... other configuration bits
}
}
如果複製不對稱可以加入 clean_run => true 配置項,清楚資料
23.8.5.5. 指定SQL檔案
statement_filepath 指定 SQL 檔案,有時SQL太複雜寫入 statement 配置項維護部方便,可以將 SQL 寫入一個文字檔案,然後使用 statement_filepath 配置項引用該檔案。
input {
jdbc {
jdbc_driver_library => "/path/to/driver.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_url => "jdbc://postgresql"
jdbc_user => "neo"
jdbc_password => "password"
statement_filepath => "query.sql"
}
}
23.8.5.6. 引數傳遞
將需要複製的條件引數寫入 parameters 配置項
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
}
23.8.5.7. 控制返回JDBC資料量
jdbc_fetch_size => 1000 #jdbc獲取資料的數量大小
jdbc_page_size => 1000 #jdbc一頁的大小,
jdbc_paging_enabled => true #和jdbc_page_size組合,將statement的查詢分解成多個查詢,相當於: SELECT * FROM table LIMIT 1000 OFFSET 4000
23.8.6. example
input {
jdbc {
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"
jdbc_user => "cms"
jdbc_password => "password"
schedule => "* * * * *" #定時cron的表示式,這裡是每分鐘執行一次
statement => "select id, title, description, author, source, ctime, content from article where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
record_last_run => true
last_run_metadata_path => "/var/tmp/article.last"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "information"
document_type => "article"
document_id => "%{id}"
action => "update" # 操作執行的動作,可選值有["index", "delete", "create", "update"]
doc_as_upsert => true #支援update模式
}
}