1. 程式人生 > 其它 >mysql與es同步的其他方案logstash

mysql與es同步的其他方案logstash

商品搜尋與商品詳情

課程內容

  1. mysql與es同步的其他方案
  2. laravel封裝elasticsearchService

1. mysql與es同步的其他方案

1.1 mysql與es同步的其他方案

logstash介紹

Logstash 是免費且開放的伺服器端資料處理管道,能夠從多個來源採集資料,轉換資料,然後將資料傳送到您最喜歡的“儲存庫”中。

Logstash是一個功能強大的工具,可與各種部署整合。 它提供了大量外掛,可幫助你解析,豐富,轉換和緩衝來自各種來源的資料。 如果你的資料需要 Beats 中沒有的其他處理,則需要將 Logstash 新增到部署中。

應用場景

  1. 日誌搜尋器: logstash採集、處理、轉發到elasticsearch儲存,在kibana進行展示
  2. Elk日誌分析(elasticsearch+logstash+kibana)
  3. logstash同步mysql資料庫資料到es

logstash安裝

  1. 拉取logstash映象
docker pull logstash:7.12.1(需要與es版本對應)
  1. 構建logstash容器
mkdir /docker/logstash        --建立一個用於儲存logstash配置以及外掛的目錄
docker run -p 9900:9900 -d --name logstash -v /docker/logstash:/etc/logstash/pipeline --privileged=true logstash:7.12.1
  1. 進入容器內部安裝 jdbc 和 elasticsearch 外掛
#進入logstash容器內部
docker exec -it logstash bash

#使用logstash-plugin安裝器安裝logstash-input-jdbc外掛,改安裝器在bin目錄下 (此外掛映象新版本自帶)
logstash-plugin install logstash-input-jdbc

#安裝資料輸出到es的外掛
logstash-plugin install logstash-output-elasticsearch

  1. 下載jdbc的mysql-connection.jar包

https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.24/mysql-connector-java-8.0.24.jar

  1. 修改容器內部配置
vi config/logstash.yml       更改logstash.yml檔案

內容如下:
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://172.17.0.7:9200" ]

vi config/pipelines.yml  更改pipelines.yml檔案

內容如下:
- pipeline.id: table1
  path.config: "/etc/logstash/pipeline/logstash.conf"

需要注意的是自己目錄是不是存在這些檔案,不要找錯地方了,一般進入容器就會是logstash的安裝目錄,ls檢視就能夠看到config目錄的

  1. 退出容器,配置檔案建立與編輯 (此處屬於全量的配置檔案)
touch /docker/logstash/logstash.conf

input {
 stdin { }
    jdbc {
        #注意mysql連線地址一定要用ip,不能使用localhost等
        jdbc_connection_string => "jdbc:mysql://172.17.0.4:3306/lmrs_2008_shops"
        jdbc_user => "root"
        jdbc_password => "root"
        #這個jar包的地址是容器內的地址
        jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.24.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        statement => "select a.`name`,a.long_name,a.brand_id,a.three_category_id as category_id,a.shop_id,a.price,a.status,a.sold_count,a.review_count,a.create_time,a.last_time,b.`name` as category,b.path from lmrs_products as a LEFT JOIN lmrs_product_categorys as b on a.three_category_id = b.id"
        schedule => "* * * * *"
    }
 }
 output {
     elasticsearch {
        #注意mysql連線地址一定要用ip,不能使用localhost等
        hosts => "172.17.0.7:9200"
        index => "products"
        document_type => "_doc"
        document_id => "_id"
     }
     stdout {
        codec => json_lines
    }
}

增量配置檔案如下

input {
 stdin { }
    jdbc {
        #注意mysql連線地址一定要用ip,不能使用localhost等
        jdbc_connection_string => "jdbc:mysql://192.168.63.1:3306/starsky"
        jdbc_user => "starsky"
        jdbc_password => "root"
        #資料庫重連嘗試
        connection_retry_attempts => "3"
        #資料庫連線可用校驗超時時間,預設為3600s
        jdbc_validation_timeout => "3600"
        #這個jar包的地址是容器內的地址
        jdbc_driver_library => "/etc/logstash/pipeline/mysql-connector-java-8.0.24.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        #開啟分頁查詢(預設是false)
        jdbc_paging_enabled => "true"
        #單次分頁查詢條數(預設100000,欄位較多的話,可以適當調整這個數值)
        jdbc_page_size => "50000"
        #執行的sql語句
        statement => "SELECT a.id,a.`name`,a.long_name,a.brand_id,a.three_category_id AS category_id,a.shop_id,a.price,a.`STATUS`,a.sold_count,a.review_count,a.create_time,a.last_time FROM lmrs_products AS a where a.id > :sql_last_value"
        #需要記錄查詢結果某欄位的值時,此欄位為true,否則預設tracking_colum為timestamp的值
        use_column_value => true
        #是否將欄位名轉為小寫,預設為true(如果具備序列化或者反序列化,建議設定為false)
        lowercase_column_names => false
        #需要記錄的欄位,同於增量同步,需要是資料庫欄位
        tracking_column => id
        #記錄欄位的資料型別
        tracking_column_type => numeric
        #上次資料存放位置
        record_last_run => true
        #上一個sql_last_value的存放路徑,必須在檔案中指定欄位的初始值
        last_run_metadata_path => "/etc/logstash/pipeline/products.txt"
        #是否清除last_run_metadata_path的記錄,需要增量同步這個欄位的值必須為false
        clean_run => false
        #同步的頻率(分 時 天 月 年)預設為每分鐘同步一次
        schedule => "* * * * *"
    }
 }
 output {
     elasticsearch {
        #注意mysql連線地址一定要用ip,不能使用localhost等
        hosts => "172.17.0.7:9200"
        index => "products"
        document_type => "_doc"
        document_id => "%{id}"
     }
     stdout {
        codec => json_lines
    }
}

last_run_metadata_path => "/etc/logstash/pipeline/products.txt":因為需要記錄下上次同步的資料id,所以這裡會有一個檔案進行儲存這個id,需要在logstash目錄下去建立一個txt檔案,用於儲存這個id,同時需要給予許可權。不給會出現許可權異常問題

2. laravel封裝elasticsearchService

es中的商品索引資訊如下:

PUT /products/
{
  "mappings": {
    "properties": {
      "name":{
        "type": "text",
        "analyzer": "ik_smart"
      },
      "long_name":{
        "type": "text",
        "analyzer": "ik_smart"
      },
      "brand_id":{
        "type": "integer"
      },
      "category_id":{
        "type":"integer"
      },
      "category":{
        "type": "keyword"
      },
      "category_path":{
        "type": "keyword"
      },
      "shop_id":{
        "type":"integer"
      },
      "price":{
        "type":"scaled_float",
        "scaling_factor":100
      },
      "sold_count":{
        "type":"integer"
      },
      "review_count":{
        "type":"integer"
      },
      "status":{
        "type":"integer"
      },
      "create_time" : {
          "type" : "date"
      },
      "last_time" : {
          "type" : "date"
      }
    }
  }
}
  1. 建立一個Service,用於提供控制器使用es服務
php artisan make:service ElasticsearchService.php

App/Service/ElasticsearchService.php

<?php
namespace App\Services;

use App\Models\ProductCategory;

class ElasticsearchService
{
    protected $params = [
        'index' => 'products',//索引
        'type'  => '_doc',//型別
        'body' => [
            "query" => [
                "bool" => [
                    'filter' => [],
                    'must'  => []
                ]
            ]
        ]
    ];

    /**
     * @param $size  資料量
     * @param $page   索引起始
     * @return $this
     * 搜尋分頁構建
     */
    public function paginate($size,$page)
    {
        $this->params['body']['from'] = ($page - 1) * $size;
        $this->params['body']['size'] = $size;
        return $this;
    }

    /**
     * @return $this
     * 判斷商品是否已上架並經過稽核
     */
    public function IsStatus()
    {
        $this->params['body']['query']['bool']['filter'][] = ['term' => ['status' => 1]];
        return $this;
    }

    /**
     * @param ProductCategory $category 使用者傳遞過來的分類物件或者id
     * @return $this
     * 分類篩選
     */
    public function category(ProductCategory $category)
    {
        if ($category->is_directory){
            $this->params['body']['query']['bool']['filter'] = [
                'prefix' => ['category_path' => $category->path.$category->id.'-']
            ];
        }else{
            $this->params['body']['query']['bool']['filter'][] = ['term' => ['category_id' => $category->id]];
        }

        return $this;
    }

    /**
     * @param $keywords 關鍵詞陣列
     * @return $this
     * 關鍵詞按照權重進行搜尋
     */
    public function keywords($keywords)
    {
        //如果不是陣列需要轉為陣列
        $keywords = is_array($keywords) ? $keywords : [$keywords];

        foreach ($keywords as $keyword){
            $this->params['body']['query']['bool']['must'][] = [
                'multi_match' => [
                    'query' => $keyword,
                    'fields' => [
                        'long_name^3',
                        'category^2'
                    ]
                ]
            ];
        }

        return $this;
    }

    /*
     * 排序
     */
    public function orderBy($filed,$direction)
    {
        if (!isset($this->params['body']['sort'])){
            $this->params['body']['sort'] = [];
        }
        $this->params['body']['sort'][] = [$filed => $direction];
        return $this;
    }

    /*
     * 返回結構體
     */
    public function getParams()
    {
        return $this->params;
    }
}
?>

app/Http/Controllers/Api/V1/ProductController.php

<?php

namespace App\Http\Controllers\Api\V1;

use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
use App\Models\ProductCategory;
use App\Models\Product;
use App\Services\ElasticsearchService;

class ProductController extends Controller
{
    /**
     * [index description]
     * @method index
     * @param  {[type]} Request [description]
     * @return {[type]}         [description]
     * 商品列表按照輸入條件搜尋
     */
    public function index(Request $request)
    {
        //分頁的起始
        $page = $request->input('page',1);
        //分頁的資料數量
        $perPage = 20;

        //呼叫es封裝類,增加商品狀態為上架條件與分頁查詢結構
        $builder = (new ElasticsearchService())->IsStatus()->paginate($perPage,$page);

        //分類搜尋
        if ($request->input('category_id') && $category = $this->category($request->input('category_id')) ){
            $builder->category($category);
        }

        //具備關鍵詞,按照關鍵詞進行搜尋(可以是多個)
        if ($search = $request->input('search','')){
            $keywords = array_filter(explode(' ',$search));
            $builder->keywords($keywords);
        }

        //根據銷量,價格,評論數量進行排序
        if ($order = $request->input('order','')){
            if (preg_match('/^(.+)_(asc|desc)$/',$order,$m)){
                if (in_array($m[1],['price','sold_count','review_count'])){
                    $builder->orderBy($m[1],$m[2]);
                }
            }
        }

        //通過容器註冊的es單例呼叫search方法到es搜尋資料,條件為上面構建的結構體
        $restful = app('es')->search($builder->getParams());

        return response()->json([
            "data" => $restful
        ]);
    }

    /**
     * [category description]
     * @method category
     * @param  {[type]} $category [description]
     * @return {[type]}           [description]
     * 查詢分類資料
     */
    public function category($category)
    {
        $category_array = explode(',',$category);
        $category_id = array_pop($category_array);
        return ProductCategory::query()->where('id',$category_id)->first();
    }
}

?>