mysql與es同步的其他方案logstash
阿新 • • 發佈:2021-07-21
商品搜尋與商品詳情
課程內容
- mysql與es同步的其他方案
- laravel封裝elasticsearchService
1. mysql與es同步的其他方案
1.1 mysql與es同步的其他方案
logstash介紹
Logstash 是免費且開放的伺服器端資料處理管道,能夠從多個來源採集資料,轉換資料,然後將資料傳送到您最喜歡的“儲存庫”中。
Logstash是一個功能強大的工具,可與各種部署整合。 它提供了大量外掛,可幫助你解析,豐富,轉換和緩衝來自各種來源的資料。 如果你的資料需要 Beats 中沒有的其他處理,則需要將 Logstash 新增到部署中。
應用場景
- 日誌搜尋器: logstash採集、處理、轉發到elasticsearch儲存,在kibana進行展示
- Elk日誌分析(elasticsearch+logstash+kibana)
- logstash同步mysql資料庫資料到es
logstash安裝
- 拉取logstash映象
docker pull logstash:7.12.1(需要與es版本對應)
- 構建logstash容器
mkdir /docker/logstash --建立一個用於儲存logstash配置以及外掛的目錄 docker run -p 9900:9900 -d --name logstash -v /docker/logstash:/etc/logstash/pipeline --privileged=true logstash:7.12.1
- 進入容器內部安裝 jdbc 和 elasticsearch 外掛
#進入logstash容器內部
docker exec -it logstash bash
#使用logstash-plugin安裝器安裝logstash-input-jdbc外掛,改安裝器在bin目錄下 (此外掛映象新版本自帶)
logstash-plugin install logstash-input-jdbc
#安裝資料輸出到es的外掛
logstash-plugin install logstash-output-elasticsearch
- 下載jdbc的mysql-connection.jar包
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.24/mysql-connector-java-8.0.24.jar
- 修改容器內部配置
vi config/logstash.yml 更改logstash.yml檔案
內容如下:
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.17.0.7:9200" ]
vi config/pipelines.yml 更改pipelines.yml檔案
內容如下:
- pipeline.id: table1
path.config: "/etc/logstash/pipeline/logstash.conf"
需要注意的是自己目錄是不是存在這些檔案,不要找錯地方了,一般進入容器就會是logstash的安裝目錄,ls檢視就能夠看到config目錄的
- 退出容器,配置檔案建立與編輯 (此處屬於全量的配置檔案)
touch /docker/logstash/logstash.conf
input {
stdin { }
jdbc {
#注意mysql連線地址一定要用ip,不能使用localhost等
jdbc_connection_string => "jdbc:mysql://172.17.0.4:3306/lmrs_2008_shops"
jdbc_user => "root"
jdbc_password => "root"
#這個jar包的地址是容器內的地址
jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.24.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement => "select a.`name`,a.long_name,a.brand_id,a.three_category_id as category_id,a.shop_id,a.price,a.status,a.sold_count,a.review_count,a.create_time,a.last_time,b.`name` as category,b.path from lmrs_products as a LEFT JOIN lmrs_product_categorys as b on a.three_category_id = b.id"
schedule => "* * * * *"
}
}
output {
elasticsearch {
#注意mysql連線地址一定要用ip,不能使用localhost等
hosts => "172.17.0.7:9200"
index => "products"
document_type => "_doc"
document_id => "_id"
}
stdout {
codec => json_lines
}
}
增量配置檔案如下
input {
stdin { }
jdbc {
#注意mysql連線地址一定要用ip,不能使用localhost等
jdbc_connection_string => "jdbc:mysql://192.168.63.1:3306/starsky"
jdbc_user => "starsky"
jdbc_password => "root"
#資料庫重連嘗試
connection_retry_attempts => "3"
#資料庫連線可用校驗超時時間,預設為3600s
jdbc_validation_timeout => "3600"
#這個jar包的地址是容器內的地址
jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.24.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#開啟分頁查詢(預設是false)
jdbc_paging_enabled => "true"
#單次分頁查詢條數(預設100000,欄位較多的話,可以適當調整這個數值)
jdbc_page_size => "50000"
#執行的sql語句
statement => "SELECT a.id,a.`name`,a.long_name,a.brand_id,a.three_category_id AS category_id,a.shop_id,a.price,a.`STATUS`,a.sold_count,a.review_count,a.create_time,a.last_time FROM lmrs_products AS a where a.id > :sql_last_value"
#需要記錄查詢結果某欄位的值時,此欄位為true,否則預設tracking_colum為timestamp的值
use_column_value => true
#是否將欄位名轉為小寫,預設為true(如果具備序列化或者反序列化,建議設定為false)
lowercase_column_names => false
#需要記錄的欄位,同於增量同步,需要是資料庫欄位
tracking_column => id
#記錄欄位的資料型別
tracking_column_type => numeric
#上次資料存放位置
record_last_run => true
#上一個sql_last_value的存放路徑,必須在檔案中指定欄位的初始值
last_run_metadata_path => "/etc/logstash/pipeline/products.txt"
#是否清除last_run_metadata_path的記錄,需要增量同步這個欄位的值必須為false
clean_run => false
#同步的頻率(分 時 天 月 年)預設為每分鐘同步一次
schedule => "* * * * *"
}
}
output {
elasticsearch {
#注意mysql連線地址一定要用ip,不能使用localhost等
hosts => "172.17.0.7:9200"
index => "products"
document_type => "_doc"
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
last_run_metadata_path => "/etc/logstash/pipeline/products.txt":因為需要記錄下上次同步的資料id,所以這裡會有一個檔案進行儲存這個id,需要在logstash目錄下去建立一個txt檔案,用於儲存這個id,同時需要給予許可權。不給會出現許可權異常問題
2. laravel封裝elasticsearchService
es中的商品索引資訊如下:
PUT /products/
{
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "ik_smart"
},
"long_name":{
"type": "text",
"analyzer": "ik_smart"
},
"brand_id":{
"type": "integer"
},
"category_id":{
"type":"integer"
},
"category":{
"type": "keyword"
},
"category_path":{
"type": "keyword"
},
"shop_id":{
"type":"integer"
},
"price":{
"type":"scaled_float",
"scaling_factor":100
},
"sold_count":{
"type":"integer"
},
"review_count":{
"type":"integer"
},
"status":{
"type":"integer"
},
"create_time" : {
"type" : "date"
},
"last_time" : {
"type" : "date"
}
}
}
}
- 建立一個Service,用於提供控制器使用es服務
php artisan make:service ElasticsearchService.php
App/Service/ElasticsearchService.php
<?php
namespace App\Services;
use App\Models\ProductCategory;
class ElasticsearchService
{
protected $params = [
'index' => 'products',//索引
'type' => '_doc',//型別
'body' => [
"query" => [
"bool" => [
'filter' => [],
'must' => []
]
]
]
];
/**
* @param $size 資料量
* @param $page 索引起始
* @return $this
* 搜尋分頁構建
*/
public function paginate($size,$page)
{
$this->params['body']['from'] = ($page - 1) * $size;
$this->params['body']['size'] = $size;
return $this;
}
/**
* @return $this
* 判斷商品是否已上架並經過稽核
*/
public function IsStatus()
{
$this->params['body']['query']['bool']['filter'][] = ['term' => ['status' => 1]];
return $this;
}
/**
* @param ProductCategory $category 使用者傳遞過來的分類物件或者id
* @return $this
* 分類篩選
*/
public function category(ProductCategory $category)
{
if ($category->is_directory){
$this->params['body']['query']['bool']['filter'] = [
'prefix' => ['category_path' => $category->path.$category->id.'-']
];
}else{
$this->params['body']['query']['bool']['filter'][] = ['term' => ['category_id' => $category->id]];
}
return $this;
}
/**
* @param $keywords 關鍵詞陣列
* @return $this
* 關鍵詞按照權重進行搜尋
*/
public function keywords($keywords)
{
//如果不是陣列需要轉為陣列
$keywords = is_array($keywords) ? $keywords : [$keywords];
foreach ($keywords as $keyword){
$this->params['body']['query']['bool']['must'][] = [
'multi_match' => [
'query' => $keyword,
'fields' => [
'long_name^3',
'category^2'
]
]
];
}
return $this;
}
/*
* 排序
*/
public function orderBy($filed,$direction)
{
if (!isset($this->params['body']['sort'])){
$this->params['body']['sort'] = [];
}
$this->params['body']['sort'][] = [$filed => $direction];
return $this;
}
/*
* 返回結構體
*/
public function getParams()
{
return $this->params;
}
}
?>
app/Http/Controllers/Api/V1/ProductController.php
<?php
namespace App\Http\Controllers\Api\V1;
use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
use App\Models\ProductCategory;
use App\Models\Product;
use App\Services\ElasticsearchService;
class ProductController extends Controller
{
/**
* [index description]
* @method index
* @param {[type]} Request [description]
* @return {[type]} [description]
* 商品列表按照輸入條件搜尋
*/
public function index(Request $request)
{
//分頁的起始
$page = $request->input('page',1);
//分頁的資料數量
$perPage = 20;
//呼叫es封裝類,增加商品狀態為上架條件與分頁查詢結構
$builder = (new ElasticsearchService())->IsStatus()->paginate($perPage,$page);
//分類搜尋
if ($request->input('category_id') && $category = $this->category($request->input('category_id')) ){
$builder->category($category);
}
//具備關鍵詞,按照關鍵詞進行搜尋(可以是多個)
if ($search = $request->input('search','')){
$keywords = array_filter(explode(' ',$search));
$builder->keywords($keywords);
}
//根據銷量,價格,評論數量進行排序
if ($order = $request->input('order','')){
if (preg_match('/^(.+)_(asc|desc)$/',$order,$m)){
if (in_array($m[1],['price','sold_count','review_count'])){
$builder->orderBy($m[1],$m[2]);
}
}
}
//通過容器註冊的es單例呼叫search方法到es搜尋資料,條件為上面構建的結構體
$restful = app('es')->search($builder->getParams());
return response()->json([
"data" => $restful
]);
}
/**
* [category description]
* @method category
* @param {[type]} $category [description]
* @return {[type]} [description]
* 查詢分類資料
*/
public function category($category)
{
$category_array = explode(',',$category);
$category_id = array_pop($category_array);
return ProductCategory::query()->where('id',$category_id)->first();
}
}
?>