PHP 中使用 ElasticSearch 的最佳實踐 (下)
阿新 • • 發佈:2020-08-02
引言
上一篇文章,我們使用同步的方式將資料,同步寫入到 ElasticSearch 中。接下來的這篇文章,主要介紹使用 RabbitMQ 的方式,非同步的將資料同步到 ElasticSearch 。
部分實踐程式碼
建立商品
/** * 建立商品資料 * @param Request $request * @return \Illuminate\Http\JsonResponse */ public function createProduct(Request $request) { $title = $request->request->get(ProductModel::TITLE); $longTitle = $request->request->get(ProductModel::LONG_TITLE); $description = $request->request->get(ProductModel::DESCRIPTION); $sku = $request->request->get(ProductModel::SKU); $price = $request->request->get(ProductModel::PRICE); $sales = $request->request->get(ProductModel::SALES); $nowTime = date("Y-m-d H:i:s"); // 商品資料寫入 DB $productId = DB::table(ProductModel::TABLE_NAME)->insertGetId([ ProductModel::TITLE => $title, ProductModel::LONG_TITLE => $longTitle, ProductModel::DESCRIPTION => $description, ProductModel::SKU => $sku, ProductModel::PRICE => $price, ProductModel::SALES => $sales, ProductModel::CREATED_AT => $nowTime, ProductModel::UPDATED_AT => $nowTime ]); $params = [ ProductModel::PRODUCT_ID => $productId, ProductModel::TITLE => $title, ProductModel::LONG_TITLE => $longTitle, ProductModel::DESCRIPTION => $description, ProductModel::SKU => $sku, ProductModel::PRICE => $price, ProductModel::SALES => $sales, ProductModel::CREATED_AT => $nowTime, ProductModel::UPDATED_AT => $nowTime ]; // 將資料投遞到 RabbitMQ $this->routingKey = self::PRODUCT_CREATE; $this->publishMsg($params); return Response()->json(['code' => 0, 'msg' => 'success']); }
刪除商品
/** * 刪除商品資料 * @param Request $request * @return \Illuminate\Http\JsonResponse */ public function deleteProduct(Request $request) { $productId = $request->request->get(ProductModel::PRODUCT_ID); // 刪除 DB 中的商品資料 DB::table(ProductModel::TABLE_NAME)->where(ProductModel::PRODUCT_ID, $productId)->delete(); $params = [ ProductModel::PRODUCT_ID => $productId, ]; // 將資料投遞到 RabbitMQ $this->routingKey = self::PRODUCT_DELETE; $this->publishMsg($params); return Response()->json(['code' => 0, 'msg' => 'success']); }
更新商品
/** * 更新商品資料 * @param Request $request * @return \Illuminate\Http\JsonResponse */ public function updateProduct(Request $request) { $productId = $request->request->get(ProductModel::PRODUCT_ID); $title = $request->request->get(ProductModel::TITLE); $longTitle = $request->request->get(ProductModel::LONG_TITLE); $description = $request->request->get(ProductModel::DESCRIPTION); $sku = $request->request->get(ProductModel::SKU); $price = $request->request->get(ProductModel::PRICE); $sales = $request->request->get(ProductModel::SALES); $nowTime = date("Y-m-d H:i:s"); // 商品資料更新到 DB DB::table(ProductModel::TABLE_NAME) ->where(ProductModel::PRODUCT_ID, $productId) ->update([ ProductModel::TITLE => $title, ProductModel::LONG_TITLE => $longTitle, ProductModel::DESCRIPTION => $description, ProductModel::SKU => $sku, ProductModel::PRICE => $price, ProductModel::SALES => $sales, ProductModel::UPDATED_AT => $nowTime ]); $params = [ ProductModel::PRODUCT_ID => $productId, ProductModel::TITLE => $title, ProductModel::LONG_TITLE => $longTitle, ProductModel::DESCRIPTION => $description, ProductModel::SKU => $sku, ProductModel::PRICE => $price, ProductModel::SALES => $sales, ProductModel::UPDATED_AT => $nowTime ]; // 將資料投遞到 RabbitMQ $this->routingKey = self::PRODUCT_UPDATE; $this->publishMsg($params); return Response()->json(['code' => 0, 'msg' => 'success']); }
獲取單個商品資料
/**
* 獲取單個商品資料
* @param Request $request
* @return \Illuminate\Http\JsonResponse
*/
public function getProductInfo(Request $request)
{
$productId = $request->request->get(ProductModel::PRODUCT_ID);
$params = [
'id' => $productId,
'index' => self::INDEX,
'type' => self::TYPE,
];
$this->client->get($params);
return Response()->json(['code' => 0, 'msg' => 'success']);
}
搜尋商品資料
/**
* 搜尋商品資料
* @param Request $request
* @return \Illuminate\Http\JsonResponse
*/
public function getProductList(Request $request)
{
$params = [
'index' => self::INDEX,
'type' => self::TYPE,
];
$this->client->search($params);
return Response()->json(['code' => 0, 'msg' => 'success']);
}
通過訂閱的方式,同步資料到 ElasticSearch
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$this->listen(function($msg) {
$routingKey = $msg->delivery_info['routing_key'];
$body = unserialize($msg->body);
$this->mapping[$routingKey]($body);
#當no_ack=false時, 需要寫下行程式碼,否則可能出現記憶體不足情況#$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});
}
/**
* 商品資料寫入 ES
* @param $body
*/
protected function createProduct($body)
{
$params = [
'body' => [
ProductModel::PRODUCT_ID => $body[ProductModel::PRODUCT_ID],
ProductModel::TITLE => $body[ProductModel::TITLE],
ProductModel::LONG_TITLE => $body[ProductModel::LONG_TITLE],
ProductModel::DESCRIPTION => $body[ProductModel::DESCRIPTION],
ProductModel::SKU => $body[ProductModel::SKU],
ProductModel::PRICE => $body[ProductModel::PRICE],
ProductModel::SALES => $body[ProductModel::SALES],
ProductModel::CREATED_AT => $body[ProductModel::CREATED_AT],
ProductModel::UPDATED_AT => $body[ProductModel::UPDATED_AT]
],
'id' => $body[ProductModel::PRODUCT_ID],
'index' => self::INDEX,
'type' => self::TYPE,
];
// 商品資料寫入 ES
$this->client->create($params);
}
/**
* 更新 ES 中的商品資料
* @param $body
*/
protected function updateProduct($body)
{
$params = [
'body' => [
ProductModel::PRODUCT_ID => $body[ProductModel::PRODUCT_ID],
ProductModel::TITLE => $body[ProductModel::TITLE],
ProductModel::LONG_TITLE => $body[ProductModel::LONG_TITLE],
ProductModel::DESCRIPTION => $body[ProductModel::DESCRIPTION],
ProductModel::SKU => $body[ProductModel::SKU],
ProductModel::PRICE => $body[ProductModel::PRICE],
ProductModel::SALES => $body[ProductModel::SALES],
ProductModel::CREATED_AT => $body[ProductModel::CREATED_AT],
ProductModel::UPDATED_AT => $body[ProductModel::UPDATED_AT]
],
'id' => $body[ProductModel::PRODUCT_ID],
'index' => self::INDEX,
'type' => self::TYPE,
];
// 商品資料更新到 ES
$this->client->update($params);
}
/**
* 刪除 ES 中的商品資料
* @param $body
*/
protected function deleteProduct($body)
{
// 刪除 ES 中的商品資料
$params = [
'id' => $body[ProductModel::PRODUCT_ID],
'index' => self::INDEX,
'type' => self::TYPE,
];
$this->client->delete($params);
}
小結
通過非同步的方式同步資料到 ElasticSearch,可以提高系統的併發處理能力。