1. 程式人生 > 實用技巧 >php 使用 ElasticSearch/es 的最佳實踐

php 使用 ElasticSearch/es 的最佳實踐

PHP 使用elasticsearch

composer require elasticsearch/elasticsearch

會自動載入合適的版本!我的php是7.1的,它會自動載入7.9的elasticsearch版本!

elasticsearch 的安裝:

https://www.cnblogs.com/-mrl/p/13854210.html

elasticsearch的簡單使用:

例子1:

<?php

require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;

//https://www.elastic.co/guide/cn/elasticsearch/php/current/_quickstart.html#_quickstart
//$client = ClientBuilder::create()->build();
//如果沒有設定主機地址預設為127.0.0.1:9200
$client = Elasticsearch\ClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build(); //var_dump($client); //索引一個文件 echo '索引一個文件'.PHP_EOL; $params = [ 'index' => 'my_index', 'type' => 'my_type', 'id' => 'my_id', 'body' => ['testField' => 'abc'] ]; $response = $client->index($params
); pr($response); //獲取一個文件 echo '獲取一個文件'.PHP_EOL; $params = [ 'index' => 'my_index', 'type' => 'my_type', 'id' => 'my_id' ]; $response = $client->get($params); pr($response); echo '搜尋一個文件'.PHP_EOL; //搜尋一個文件 $params = [ 'index' => 'my_index', 'type' => 'my_type', 'body' => [
'query' => [ 'match' => [ 'testField' => 'abc' ] ] ] ]; $response = $client->search($params); pr($response); //DIE; echo '刪除一個文件'.PHP_EOL; //刪除一個文件 $params = [ 'index' => 'my_index', 'type' => 'my_type', 'id' => 'my_id' ]; $response = $client->delete($params); pr($response); echo '刪除一個索引'.PHP_EOL; //刪除一個索引 $deleteParams = [ 'index' => 'my_index' ]; $response = $client->indices()->delete($deleteParams); pr($response); echo '建立一個索引'.PHP_EOL; //建立一個索引 $params = [ 'index' => 'my_index', 'body' => [ 'settings' => [ 'number_of_shards' => 2, 'number_of_replicas' => 0 ] ] ]; $response = $client->indices()->create($params); pr($response); function pr($response){ echo '<pre>'; print_r($response); echo '</pre>'; }
View Code

例子2:

先在 MySQL 中建立一張資料表 product,建表語句如下:

CREATE TABLE `product` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '商品 ID',
  `title` varchar(64) NOT NULL DEFAULT '' COMMENT '商品名稱',
  `long_title` varchar(64) NOT NULL DEFAULT '' COMMENT '商品長名稱',
  `sku` varchar(32) NOT NULL DEFAULT '' COMMENT '商品 SKU',
  `money` int(10) NOT NULL DEFAULT '0' COMMENT '商品價格',
  `sales` int(11) NOT NULL DEFAULT '0' COMMENT '商品銷量',
  `created_at` datetime DEFAULT NULL COMMENT '建立時間',
  `updated_at` datetime DEFAULT NULL COMMENT '修改時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8mb4;

建立test.php

<?php

require 'vendor/autoload.php';
require 'Db.php';

use Elasticsearch\ClientBuilder;

const ES_INDEX = 'product_index';
const ES_TYPE = 'product';


$nowTime = date("Y-m-d H:i:s");
//https://www.elastic.co/guide/cn/elasticsearch/php/current/_quickstart.html#_quickstart
//$client = ClientBuilder::create()->build();
//如果沒有設定主機地址預設為127.0.0.1:9200
$client = Elasticsearch\ClientBuilder::create()->setHosts(['127.0.0.1:9200'])->build();
//var_dump($client);
$db = new Db();
//print_r($db);die;
//http://localhost/es/test.php?do=create
if (isset($_GET['do']) && $_GET['do'] == 'create') {    // 只能建立一次
    //通過設定mapping結構建立一個索引庫(相當於mysql建立一個數據庫)
    $params = [
        'index' => ES_INDEX,  //索引名(相當於mysql的資料庫)
        /*'client' => [
            'ignore' => 404
        ],*/
        'body' => [
            'settings' => [
                'number_of_shards' => 2,  #分片數
                'number_of_replicas' => 0,
            ],
            'mappings' => [
                'properties' => [ //文件型別設定(相當於mysql的資料型別)
                    'product_id' => [
                        'type' => 'integer',
                    ],
                    'title' => [
                        'type' => 'text'
                    ],
                    'long_title' => [
                        'type' => 'text'
                    ],
                    'sku' => [
                        'type' => 'text'
                    ],
                    'money' => [
                        'type' => 'integer'
                    ],
                    'sales' => [
                        'type' => 'integer'
                    ],
                    'created_at' => [
                        'type' => 'date',
                        'format' => 'yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis',
                    ],
                    'updated_at' => [
                        'type' => 'date',
                        'format' => 'yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis',
                    ],
                ]
            ]
        ]
    ];
    try {
        echo '建立一個索引' . PHP_EOL;
        //建立庫索引
        $response = $client->indices()->create($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=index
if (isset($_GET['do']) && $_GET['do'] == 'index') {
    /**
     * 建立商品資料
     */
    // 商品資料寫入 DB
    $title = ['蘋果', '香蕉', '橙子', '馬蹄', '草莓'];
    $data = [
        'title' => $title[array_rand($title, 1)],
        'long_title' => 'long_title' . $nowTime,
        'sku' => rand(1, 3),
        'money' => rand(1, 10000000),
        'sales' => rand(1, 1000),
        'created_at' => $nowTime,
        'updated_at' => $nowTime
    ];
    $productId = $db->table('product')->insert($data);
    if ($productId) {
        $body = array_merge($data, [
            'product_id' => $productId,
        ]);
        $params = [
            'body' => $body,
            'id' => $productId, //(提供id,則會更新對應id的記錄。若沒有提供,則會生成一條文件)#可以手動指定id,也可以不指定隨機生成
            'index' => ES_INDEX,
        ];
        try {
            // 商品資料寫入 ES
            $response = $client->index($params);
            pr($response);
        } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
}
//http://localhost/es/test.php?do=delete&id=14
if (isset($_GET['do']) && $_GET['do'] == 'delete' && isset($_GET['id'])) {
    $res = $db->table('product')->where("id = {$_GET['id']}")->delete();
    if ($res) {
        //刪除一個文件
        $params = [
            'index' => ES_INDEX,
            'id' => $_GET['id']
        ];
        try {
            echo '刪除一個文件' . PHP_EOL;
            // 商品資料寫入 ES
            $response = $client->delete($params);
            pr($response);
        } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
}
//http://localhost/es/test.php?do=deleteindex
if (isset($_GET['do']) && $_GET['do'] == 'deleteindex') {
    //刪除一個索引
    $params = [
        'index' => ES_INDEX,
        //'index' => 'test_index*', //刪除以test_index開始的所有索引
    ];
    try {
        echo '刪除一個索引' . PHP_EOL;
        // 商品資料寫入 ES
        $response = $client->indices()->delete($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=update&id=14
if (isset($_GET['do']) && $_GET['do'] == 'update' && isset($_GET['id'])) {
    $title = ['蘋果', '香蕉', '橙子', '馬蹄', '草莓'];
    $data = [
        'title' => $title[array_rand($title, 1)],
        'long_title' => 'long_title' . $nowTime,
        'sku' => rand(1, 3),
        'money' => rand(1, 10000000),
        'sales' => rand(1, 1000),
        'created_at' => $nowTime,
        'updated_at' => $nowTime
    ];
    $res = $db->table('product')->where("id = {$_GET['id']}")->update($data);
    if ($res) {
        //更新一個文件
        $body = array_merge($data, [
            'product_id' => $_GET['id'],
        ]);
        $params = [
            'body' => [
                'doc' => $body
            ],
            'id' => $_GET['id'],
            'index' => ES_INDEX,
        ];
        try {
            echo '更新一個文件' . PHP_EOL;
            // 商品資料寫入 ES
            $response = $client->update($params);
            pr($response);
        } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
            $msg = $e->getMessage();
            $msg = json_decode($msg, true);
            pr($msg);
        }
    }
}
//http://localhost/es/test.php?do=get&id=13
if (isset($_GET['do']) && $_GET['do'] == 'get' && isset($_GET['id'])) {
    $params = [
        'index' => ES_INDEX,
        'id' => $_GET['id']
    ];
    try {
        echo '獲取一個文件' . PHP_EOL;
        $response = $client->get($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=mget&ids=24,25
if (isset($_GET['do']) && $_GET['do'] == 'mget' && isset($_GET['ids'])) {
    $params = [
        'index' => ES_INDEX,
        'body' => ['ids' => explode(',', $_GET['ids'])],
        '_source' => ['money', 'title'], // 請求指定的欄位
    ];
    try {
        echo '獲取多個文件' . PHP_EOL;
        $response = $client->mget($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=search
if (isset($_GET['do']) && $_GET['do'] == 'search') {
    //$query1.相當於sql語句:select * from product where title='香蕉' limit 0,10;
    $query1 = [
        'match' => [
            'title' => '香蕉'
        ]
    ];
    $params = [
        'index' => [ES_INDEX],      //['my_index1', 'my_index2'],可以通過這種形式進行跨庫查詢
        '_source' => ['money', 'title','sku'], // 請求指定的欄位
        'body' => [
            'query' => $query1,
            'sort' => [['money' => ['order' => 'desc']]],     //排序
            'from' => 0,
            'size' => 10
        ]
    ];
    try {
        echo '搜尋文件' . PHP_EOL;
        $response = $client->search($params);
        pr($response);
        $data = $response['hits']['hits'];
        pr($data);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=putSettings
if (isset($_GET['do']) && $_GET['do'] == 'putSettings') {
    $params = [
        'index' => ES_INDEX,
        'body' => [
            'settings' => [
                'number_of_replicas' => 2,
            ]
        ]
    ];
    try {
        echo '更改索引的配置引數' . PHP_EOL;
        $response = $client->indices()->putSettings($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=getSource&id=24
if (isset($_GET['do']) && $_GET['do'] == 'getSource') {
    $params = [
        'index' => ES_INDEX,
        'id' => $_GET['id']
    ];
    try {
        echo '獲取指定文件的sourse內容(即欄位的資訊)' . PHP_EOL;
        $response = $client->getSource($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=getSettings
if (isset($_GET['do']) && $_GET['do'] == 'getSettings') {
    $params = [
        'index' => [ES_INDEX],
    ];
    try {
        echo '更改索引的配置引數' . PHP_EOL;
        $response = $client->indices()->getSettings($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    } catch (Exception $e) {
        pr($e);
    }
}
//http://localhost/es/test.php?do=exists&id=13
if (isset($_GET['do']) && $_GET['do'] == 'exists' && isset($_GET['id'])) {
    $params = [
        'index' => ES_INDEX,
        'id' => $_GET['id']
    ];
    try {
        echo '判斷文件是否存在' . PHP_EOL;
        $response = $client->exists($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}
//http://localhost/es/test.php?do=getMapping
if (isset($_GET['do']) && $_GET['do'] == 'getMapping') {
    $params = [
        'index' => ES_INDEX,
    ];
    try {
        echo '獲取檢視對映' . PHP_EOL;
        $response = $client->indices()->getMapping($params);
        pr($response);
    } catch (Elasticsearch\Common\Exceptions\BadRequest400Exception $e) {
        $msg = $e->getMessage();
        $msg = json_decode($msg, true);
        pr($msg);
    }
}

function pr($response)
{
    echo '<pre>';
    print_r($response);
    echo '</pre>';
}
View Code

建立Db.php

<?php
header("Content-Type:text/html;charset=utf-8");

/**
 *php操作mysql的工具類
 */
class Db
{
    private $_db = null;//資料庫連線控制代碼
    private $_table = null;//表名
    private $_where = null;//where條件
    private $_order = null;//order排序
    private $_limit = null;//limit限定查詢
    private $_group = null;//group分組
    private $_configs = array(
        'hostname' => 'localhost',
        'dbname' => 'test',
        'username' => 'root',
        'password' => 'root'
    );//資料庫配置

    /**
     * 建構函式,連線資料庫
     */
    public function __construct()
    {
        $link = $this->_db;
        if (!$link) {
            $db = mysqli_connect($this->_configs['hostname'], $this->_configs['username'], $this->_configs['password'], $this->_configs['dbname']);
            mysqli_query($db, "set names utf8");
            if (!$db) {
                $this->ShowException("錯誤資訊" . mysqli_connect_error());
            }
            $this->_db = $db;
        }
    }

    /**
     * 獲取所有資料
     *
     * @param <type> $table The table
     *
     * @return     boolean  All.
     */
    public function getAll($table = null)
    {
        $link = $this->_db;
        if (!$link) return false;
        $sql = "SELECT * FROM {$table}";
        $data = mysqli_fetch_all($this->execute($sql));
        return $data;
    }

    public function table($table)
    {
        $this->_table = $table;
        return $this;
    }

    /**
     * 實現查詢操作
     *
     * @param string $fields The fields
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function select($fields = "*")
    {
        $fieldsStr = '';
        $link = $this->_db;
        if (!$link) return false;
        if (is_array($fields)) {
            $fieldsStr = implode(',', $fields);
        } elseif (is_string($fields) && !empty($fields)) {
            $fieldsStr = $fields;
        }
        $sql = "SELECT {$fields} FROM {$this->_table} {$this->_where} {$this->_order} {$this->_limit}";
        $data = mysqli_fetch_all($this->execute($sql));
        return $data;
    }

    /**
     * order排序
     *
     * @param string $order The order
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function order($order = '')
    {
        $orderStr = '';
        $link = $this->_db;
        if (!$link) return false;
        if (is_string($order) && !empty($order)) {
            $orderStr = "ORDER BY " . $order;
        }
        $this->_order = $orderStr;
        return $this;
    }

    /**
     * where條件
     *
     * @param string $where The where
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function where($where = '')
    {
        $whereStr = '';
        $link = $this->_db;
        if (!$link) return $link;
        if (is_array($where)) {
            foreach ($where as $key => $value) {
                if ($value == end($where)) {
                    $whereStr .= "`" . $key . "` = '" . $value . "'";
                } else {
                    $whereStr .= "`" . $key . "` = '" . $value . "' AND ";
                }
            }
            $whereStr = "WHERE " . $whereStr;
        } elseif (is_string($where) && !empty($where)) {
            $whereStr = "WHERE " . $where;
        }
        $this->_where = $whereStr;
        return $this;
    }

    /**
     * group分組
     *
     * @param string $group The group
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function group($group = '')
    {
        $groupStr = '';
        $link = $this->_db;
        if (!$link) return false;
        if (is_array($group)) {
            $groupStr = "GROUP BY " . implode(',', $group);
        } elseif (is_string($group) && !empty($group)) {
            $groupStr = "GROUP BY " . $group;
        }
        $this->_group = $groupStr;
        return $this;
    }

    /**
     * limit限定查詢
     *
     * @param string $limit The limit
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function limit($limit = '')
    {
        $limitStr = '';
        $link = $this->_db;
        if (!$link) return $link;
        if (is_string($limit) || !empty($limit)) {
            $limitStr = "LIMIT " . $limit;
        } elseif (is_numeric($limit)) {
            $limitStr = "LIMIT " . $limit;
        }
        $this->_limit = $limitStr;
        return $this;
    }

    /**
     * 執行sql語句
     *
     * @param <type> $sql The sql
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function execute($sql = null)
    {
        $link = $this->_db;
        if (!$link) return false;
        $res = mysqli_query($this->_db, $sql);
        if (!$res) {
            $errors = mysqli_error_list($this->_db);
            $this->ShowException("報錯啦!<br/>錯誤號:" . $errors[0]['errno'] . "<br/>SQL錯誤狀態:" . $errors[0]['sqlstate'] . "<br/>錯誤資訊:" . $errors[0]['error']);
            die();
        }
        return $res;
    }

    /**
     * 插入資料
     *
     * @param <type> $data The data
     *
     * @return     boolean  ( description_of_the_return_value )
     */
    public function insert($data)
    {
        $link = $this->_db;
        if (!$link) return false;
        if (is_array($data)) {
            $keys = '';
            $values = '';
            foreach ($data as $key => $value) {
                $keys .= "`" . $key . "`,";
                $values .= "'" . $value . "',";
            }
            $keys = rtrim($keys, ',');
            $values = rtrim($values, ',');
        }
        $sql = "INSERT INTO `{$this->_table}`({$keys}) VALUES({$values})";
        mysqli_query($this->_db, $sql);
        $insertId = mysqli_insert_id($this->_db);
        return $insertId;
    }

    /**
     * 更新資料
     *
     * @param <type> $data The data
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function update($data)
    {
        $link = $this->_db;
        if (!$link) return $link;
        if (is_array($data)) {
            $dataStr = '';
            foreach ($data as $key => $value) {
                $dataStr .= "`" . $key . "`='" . $value . "',";
            }
            $dataStr = rtrim($dataStr, ',');
        }
        $sql = "UPDATE `{$this->_table}` SET {$dataStr} {$this->_where} {$this->_order} {$this->_limit}";
        $res = $this->execute($sql);
        return $res;
    }

    /**
     * 刪除資料
     *
     * @return     <type>  ( description_of_the_return_value )
     */
    public function delete()
    {
        $link = $this->_db;
        if (!$link) return $link;
        $sql = "DELETE FROM `{$this->_table}` {$this->_where}";
        $res = $this->execute($sql);
        return $res;
    }

    /**
     * 異常資訊輸出
     *
     * @param <type> $var The variable
     */
    private function ShowException($var)
    {
        if (is_bool($var)) {
            var_dump($var);
        } else if (is_null($var)) {
            var_dump(NULL);
        } else {
            echo "<pre style='position:relative;z-index:1000;padding:10px;border-radius:5px;background:#F5F5F5;border:1px solid #aaa;font-size:14px;line-height:18px;opacity:0.9;'>" . print_r($var, true) . "</pre>";
        }
    }

}
View Code

注意:

建立好索引之後,我們就從 MySQL 將資料同步到 ES,同步的方案有如下三種:

1、可以直接在儲存入 MySQL 之後,就直接寫入 ES。

2、通過 Logstash 定時,從 MySQL 資料庫中拉取資料同步到 ES。

3、可以通過第三方中介軟體(例如:canal、go-mysql-elasticsearch),拉取 MySQL 的 binlog 日誌,之後中介軟體解析日誌將資料同步到 ES。

業務發展到中後時期的時候,可能發現欄位越來越多了,這個時候想要刪除一些欄位。 但是,在 ES 中的 Mapping 中是不能直接刪除欄位的,只能重新建立。 很多情況,我們還是不建議去刪除欄位,因為這會增加很多不必要的成本以及帶來的風險。 如果,為了節省儲存空間,Boss 一定要刪除欄位。那就按照下面的方法,也是可以實現的。

1、建立一個新的索引

2、建立新的對映關係 mapping

3、將原索引的資料到入到新索引

4、新索引建立原索引一致的別名 5、刪除原索引