Elasticsearch JDBC的使用-MySQL 資料來源匯入和增量索引、更新
在使用 Elasticsearch 的時候,經常會涉及到要將其它資料來源的資料匯入到 Elasticsearch 中,今天就來介紹一下關於 Elasticsearch 從 MySQL 匯入資料和增量索引的實現
這裡要用到一個 Elasticsearch 的外掛 elasticsearch-jdbc
需要的資源和版本
Elasticsearch 版本:2.2.0 CSDN下載
elasticsearch-jdbc 版本 : 2.2 CSDN下載
一、安裝 jdbc
jdbc 的壓縮包我已經放在了 /usr/local/src/ 目錄下,可以去它的 GitHub地址 獲取對應版本的壓縮包
cd /usr/local /src/
unzip ./elasticsearch-jdbc-2.2.0.0-dist.zip
cp -r ./elasticsearch-jdbc-2.2.0.0 /usr/local/elasticsearch-2.2.0/jdbc2.2
這樣就可以使用啦,jdbc 還提供了一些常用的例子,在 【ES安裝目錄/jdbc2.2/bin/ 】這個資料夾下,改一改就可以用,都是bash 檔案,記得加執行許可權哦
二、使用jdbc
我們先在 MySQL中建立一個用於測試的資料表 article ,並新增幾條資料
(注意, update_time 欄位我加了ON UPDATE CURRENT_TIMESTAMP,資料發生改變就會更新此欄位)
DROP TABLE IF EXISTS `article`;
CREATE TABLE `article` (
`id` mediumint(8) unsigned NOT NULL AUTO_INCREMENT,
`subject` varchar(150) NOT NULL,
`author` varchar(15) DEFAULT NULL,
`create_time` timestamp NULL DEFAULT NULL,
`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP ,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
# 資料
INSERT INTO `article` VALUES ('1', '"閨蜜"崔順實被韓檢方傳喚 韓總統府促徹查真相', 'jam', '2016-10-31 17:49:21', '2016-10-31 17:50:21');
INSERT INTO `article` VALUES ('2', '韓舉行"護國訓練" 青瓦臺:決不許國家安全出問題', 'jam00', '2016-10-31 17:50:39', '2016-10-31 17:50:51');
INSERT INTO `article` VALUES ('3', '媒體稱FBI已經取得搜查令 檢視希拉里電郵', 'tomi', '2016-10-31 17:51:03', '2016-10-31 17:51:08');
INSERT INTO `article` VALUES ('4', '村上春樹獲安徒生獎 演講中談及歐洲排外問題', 'jason', '2016-10-31 17:51:38', '2016-10-31 17:51:41');
INSERT INTO `article` VALUES ('5', '希拉里團隊炮轟FBI 參院民主黨領袖批其“違法”', 'tommy', '2016-10-31 17:52:07', '2016-10-31 17:52:09');
1、資料來源匯入
首先執行全部資料匯入(注:ES 使用的是預設配置)
我們寫一個名叫 mysql-article.sh 的bash指令碼,並放在 /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh 下面,指令碼內容如下(內容註釋會在後面給出)
#執行
/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh
#檔案內容如下
#!/bin/sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib
echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "root",
"password" : "123456",
"sql" : "select *, id as _id from article",
"index" : "jdbctest",
"type" : "article",
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"type_mapping": {
"article" : {
"properties" : {
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
執行後會自動建立 jdbctest 索引(若不存在) ,article 型別 和幾個對應的欄位,這裡因為有中文,我使用了 ik 分詞器(如何使用?)
若執行失敗,請檢視日誌檔案,jdbc 的日誌存放在 /usr/local/elasticsearch-2.2.0/logs/jdbc.log
檢視是否匯入成功
curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty'
#返回
{
"took" : 33,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 5,
"max_score" : 1.0,
"hits" : [ {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "5",
"_score" : 1.0,
"_source" : {
"id" : 5,
"subject" : "希拉里團隊炮轟FBI 參院民主黨領袖批其“違法”",
"author" : "tommy",
"create_time" : "2016-10-31T17:52:07.000+08:00",
"update_time" : "2016-10-31T17:52:09.000+08:00"
}
}, {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 2,
"subject" : "韓舉行"護國訓練" 青瓦臺:決不許國家安全出問題",
"author" : "jam00",
"create_time" : "2016-10-31T17:50:39.000+08:00",
"update_time" : "2016-10-31T17:50:51.000+08:00"
}
}, {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"id" : 4,
"subject" : "村上春樹獲安徒生獎 演講中談及歐洲排外問題",
"author" : "jason",
"create_time" : "2016-10-31T17:51:38.000+08:00",
"update_time" : "2016-10-31T17:51:41.000+08:00"
}
}, {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : 1,
"subject" : ""閨蜜"崔順實被韓檢方傳喚 韓總統府促徹查真相",
"author" : "jam",
"create_time" : "2016-10-31T17:49:21.000+08:00",
"update_time" : "2016-10-31T17:50:21.000+08:00"
}
}, {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"id" : 3,
"subject" : "媒體稱FBI已經取得搜查令 檢視希拉里電郵",
"author" : "tomi",
"create_time" : "2016-10-31T17:51:03.000+08:00",
"update_time" : "2016-10-31T17:51:08.000+08:00"
}
} ]
}
}
內容已成功匯入到 Elasticsearch 中
2、增量索引、更新
如果我們對資料做了更改或是有新資料加入,若再執行全部匯入,就有點得不償失了
這裡我們就要用到jdbc 的兩個屬性 statefile(狀態檔案) 和 schedule(計劃任務時間),並且 sql 語句也要改成動態的
改動如下
"statefile" : "statefile-article.json",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [
{
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
改動後的完整檔案 mysql-article.sh
#!/bin/sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib
echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "root",
"password" : "123456",
"statefile" : "statefile-article.json",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [
{
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
"index" : "jdbctest",
"type" : "article",
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"type_mapping": {
"article" : {
"properties" : {
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
執行該檔案 :/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh
可以看到 命令列端 被佔用,一直在執行,並且在 mysql-article.sh 的同級目錄下生成了一個 statefile-article.json 的檔案,sql 語句中需要的資料 lastexecutionstart 就儲存在該檔案中
現在我們來改動一下MySQL 中的資料,增加一條資料,並修改一條 id 等於 5 的資料
INSERT INTO article() VALUES(NULL,'測試JDBC','jam00','2016-11-01 13:34:15','2016-11-01 13:34:15');
UPDATE article SET `subject`='測試JDBC-改動' WHERE id=5;
最多等一分鐘,再看看ES 中的資料
curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty' -d '{
"sort": {
"id": { "order": "desc" }
}
}'
# 返回
...
"hits" : [ {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "6",
"_score" : null,
"_source" : {
"id" : 6,
"subject" : "測試JDBC",
"author" : "jam00",
"create_time" : "2016-11-01T13:34:15.000+08:00",
"update_time" : "2016-11-01T13:34:15.000+08:00"
},
"sort" : [ 6 ]
}, {
"_index" : "jdbctest",
"_type" : "article",
"_id" : "5",
"_score" : null,
"_source" : {
"id" : 5,
"subject" : "測試JDBC-改動",
"author" : "tommy",
"create_time" : "2016-10-31T17:52:07.000+08:00",
"update_time" : "2016-11-01T13:35:41.000+08:00"
},
"sort" : [ 5 ]
}
...
測試成功。
為了讓 mysql-article.sh 後臺執行,我們可以使用 nohup 命令
nohup /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh &
當我們想停止執行的時候。
ps aux |grep jdbc2.2
#返回
root 26118 0.0 0.1 106092 1212 pts/0 S 14:03 0:00 /bin/sh /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh
root 26123 11.0 4.4 1079192 44932 pts/0 Sl 14:03 0:00 java -cp /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../lib/* -Dlog4j.configurationFile=/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../bin/log4j2.xml org.xbib.tools.Runner org.xbib.tools.JDBCImporter
# 使用 kill 命令關閉程序, 26123 就是上面一句返回的程序號,不用殺掉 26118 ,殺掉26123 這個程序,26118 程序會自動關閉
kill -9 26123
至此,MySQL 資料來源的 增量索引和更新就完成了。
3、bash 檔案釋義
增量索引的bash檔案註釋如下,更多詳細配置請查閱官方文件
#!/bin/sh
# 當前指令碼的絕對路徑
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib
echo '
{
"type" : "jdbc",
"jdbc" : {
# 連結 mysql 的 test 資料庫
"url" : "jdbc:mysql://localhost:3306/test",
# mysql 使用者
"user" : "root",
# mysql 密碼
"password" : "123456",
# 計劃任務狀態檔案
"statefile" : "statefile-article.json",
# 計劃任務時間 這裡是每分鐘執行一次
"schedule" : "0 0-59 0-23 ? * *",
# 執行匯入的sql 語句
"sql" : [
{
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
# 索引名稱 jdbctest
"index" : "jdbctest",
# 型別名稱 article
"type" : "article",
# 型別設定
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
# 涉及到中文使用ik 分詞
"tokenizer" : "ik"
}
}
}
},
# 型別中的欄位對映
"type_mapping": {
# 型別名稱
"article" : {
"properties" : {
# 對應的欄位
"id" : {
# 欄位型別
"type" : "integer",
# 當成一個準確的值進行索引(全匹配)
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
這裡選幾個屬性來介紹一下
url:資料庫連結串,所以把這個連結串改成其它資料來源,這個指令碼也可以使用(前提是那個資料來源中有對應的 article 表)
statefile :計劃任務狀態檔名稱。它長這樣:
{
"type" : "jdbc",
"jdbc" : {
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"index" : "jdbctest",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [ {
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
} ],
"metrics" : {
"lastexecutionend" : "2016-11-01T06:01:01.441Z",
"lastexecutionstart" : "2016-11-01T06:01:01.125Z",
"counter" : "23"
},
"type" : "article",
"statefile" : "statefile-article.json",
"user" : "root",
"password" : "123456",
"url" : "jdbc:mysql://localhost:3306/test",
"type_mapping" : {
"article" : {
"properties" : {
"create_time" : {
"type" : "date"
},
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"update_time" : {
"type" : "date"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
}
}
}
}
}
}
其實 jdbc 每次執行的就是這個檔案,執行完成後就覆蓋此檔案,改變的只是 metrics 屬性內的時間,而 lastexecutionstart 這個時間就是我們下面 sql 語句要用到的最後更新時間
schedule : 計劃任務時間表。表示多久執行一次更新。下面有幾個例子
0 0-59 0-23 ? * *:每分鐘執行一次
0 0/5 0-23 ? * * :每五分鐘執行一次;當分鐘等於 0,5,10,15…55的時候執行
我還是貼一個官方的欄位描述
欄位名稱 | 允許的值 | 允許的特殊字元 |
---|---|---|
Seconds | 0-59 | , - * / |
Minutes | 0-59 | , - * / |
Hours | 0-23 | , - * / |
Day-of-month | 1-31 | , - * ? / L W |
Month | 1-12 or JAN-DEC | , - * / |
Day-of-Week | 1-7 or SUN-SAT | , - * ? / L # |
Year (Optional) | empty, 1970-2199 | , - * / |
詳細註釋請點選檢視
sql:支援兩種方式,一種是直接寫sql語句,一種是有條件的sql語句。一般我們會在sql語句中使用”field as _id “這樣的方式來指定這條資料在ES 中的唯一標識(field欄位為唯一標識)
parameter 屬性中的可選的動態引數有
$now - the current timestamp
$state - the state, one of: BEFORE_FETCH, FETCH, AFTER_FETCH, IDLE, EXCEPTION
$metrics.counter - a counter
$lastrowcount - number of rows from last statement
$lastexceptiondate - SQL timestamp of last exception
$lastexception - full stack trace of last exception
$metrics.lastexecutionstart - SQL timestamp of the time when last execution started
$metrics.lastexecutionend - SQL timestamp of the time when last execution ended
$metrics.totalrows - total number of rows fetched
$metrics.totalbytes - total number of bytes fetched
$metrics.failed - total number of failed SQL executions
$metrics.succeeded - total number of succeeded SQL executions
在上面例子中的 sql
select *, id as _id from article where update_time > ?
表示獲取更新時間(update_time)大於 最後執行時間($metrics.lastexecutionstart)的所有資料
其它如 index、type_mapping 之類的屬性就不一一介紹了,很容易理解
趕快動手試一下吧!