1. 程式人生 > 其它 >微服務:ES

微服務:ES

SpringAMQP工作佇列

Work queues,也被稱為(Task queues),任務模型。簡單來說就是讓多個消費者繫結到一個佇列,共同消費佇列中的訊息

當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大於訊息的消費速度。長此以往,訊息就會堆積越來越多,無法及時處理。

此時就可以使用work 模型,多個消費者共同處理訊息處理,速度就能大大提高了。

再編寫一個接受者

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
    System.out.println("消費者1接收到訊息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(20);
}

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
    System.err.println("消費者2........接收到訊息:【" + msg + "】" + LocalTime.now());
    Thread.sleep(200);
}

通過設定prefetch來調整訊息預取的數量

Work模型的使用:

  • 多個消費者繫結到一個佇列,同一條訊息只會被一個消費者處理

  • 通過設定prefetch來控制消費者預取的訊息數量


交換機

釋出訂閱的模型如圖:

可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • Publisher:生產者,也就是要傳送訊息的程式,但是不再發送到佇列中,而是發給X(交換機)
  • Exchange:交換機,圖中的X。一方面,接收生產者傳送的訊息。另一方面,知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。Exchange有以下3種類型:
    • Fanout:廣播,將訊息交給所有繫結到交換機的佇列
    • Direct:定向,把訊息交給符合指定routing key 的佇列
    • Topic:萬用字元,把訊息交給符合routing pattern(路由模式) 的佇列
  • Consumer:消費者,與以前一樣,訂閱佇列,沒有變化
  • Queue:訊息佇列也與以前一樣,接收訊息、快取訊息。

Exchange(交換機)只負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange繫結,或者沒有符合路由規則的佇列,那麼訊息會丟失!


Fanout型別

/*
*在consumer服務中,利用程式碼宣告佇列、交換機,並將兩者繫結
*在consumer服務中,編寫兩個消費者方法,分別監聽fanout.queue1和fanout.queue2
*在publisher中編寫測試方法,向itcast.fanout傳送訊息
*/
//步驟1:在consumer服務宣告Exchange、Queue、Binding
@Configuration
public class FanoutConfig {
    /**
     * 宣告交換機
     * @return Fanout型別交換機
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }
    /**
     * 第1個佇列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }
    /**
     * 繫結佇列和交換機
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    /**
     * 第2個佇列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }
    /**
     * 繫結佇列和交換機
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
//步驟2:在consumer服務宣告兩個消費者
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
    System.out.println("消費者1接收到Fanout訊息:【" + msg + "】");
}

@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
    System.out.println("消費者2接收到Fanout訊息:【" + msg + "】");
}
//步驟3:在publisher服務傳送訊息到FanoutExchange
@Test
public void testFanoutExchange() {
    // 佇列名稱
    String exchangeName = "itcast.fanout";
    // 訊息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

Direct型別

Direct Exchange 會將接收到的訊息根據規則路由到指定的Queue,因此稱為路由模式(routes)。

  • 每一個Queue都與Exchange設定一個BindingKey

  • 釋出者傳送訊息時,指定訊息的RoutingKey

  • Exchange將訊息路由到BindingKey與訊息RoutingKey一致的佇列

//基於註解方式完成Exchange、Queue的繫結
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消費者接收到direct.queue1的訊息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消費者接收到direct.queue2的訊息:【" + msg + "】");
}
//publisher服務傳送訊息到DirectExchange
@Test
public void testSendDirectExchange() {
    // 交換機名稱
    String exchangeName = "itcast.direct";
    // 訊息
    String message = "hello, red!";
    // 傳送訊息
    rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

描述下Direct交換機與Fanout交換機的差異?

  • Fanout交換機將訊息路由給每一個與之繫結的佇列

  • Direct交換機根據RoutingKey判斷路由給哪個佇列

  • 如果多個佇列具有相同的RoutingKey,則與Fanout功能類似

基於@RabbitListener註解宣告佇列和交換機有哪些常見註解?

  • @Queue

  • @Exchange


Topic型別

Topic型別的ExchangeDirect相比,都是可以根據RoutingKey把訊息路由到不同的佇列。只不過Topic型別Exchange可以讓佇列在繫結Routing key 的時候使用萬用字元!

Routingkey 一般都是有一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert

萬用字元規則:

#:匹配一個或多個詞

*:匹配不多不少恰好1個詞

圖示:

解釋:

  • Queue1:繫結的是china.# ,因此凡是以 china.開頭的routing key 都會被匹配到。包括china.news和china.weather
  • Queue2:繫結的是#.news ,因此凡是以 .news結尾的 routing key 都會被匹配。包括china.news和japan.news
//訊息接收
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "china.#"
    //接受所有 china 開頭的訊息
))
public void listenTopicQueue1(String msg){
    System.out.println("消費者接收到topic.queue1的訊息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
    key = "#.news"
    //接受所有 .news 結尾的訊息
))
public void listenTopicQueue2(String msg){
    System.out.println("消費者接收到topic.queue2的訊息:【" + msg + "】");
}
//訊息傳送
@Test
public void testSendTopicExchange() {
    // 交換機名稱
    String exchangeName = "itcast.topic";
    // 訊息
    String message = "喜報!孫悟空大戰哥斯拉,勝!";
    // 傳送訊息
    rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

訊息轉換器

之前,Spring會把你傳送的訊息序列化為位元組傳送給MQ,接收訊息的時候,還會把位元組反序列化為Java物件。

只不過,預設情況下Spring採用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:

  • 資料體積過大
  • 有安全漏洞
  • 可讀性差

配置JSON轉換器

顯然,JDK序列化方式並不合適。我們希望訊息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。

在publisher和consumer兩個服務中都引入依賴:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

配置訊息轉換器。

在啟動類中新增一個Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

SpringAMQP中訊息的序列化和反序列化是怎麼實現的?

  • 利用MessageConverter實現的,預設是JDK的序列化

  • 注意傳送方與接收方必須使用相同的MessageConverter


Elasticsearch

elasticsearch是一款非常強大的開源搜尋引擎,可以幫助我們從海量資料中快速找到需要的內容。

elasticsearch結合kibana、Logstash、Beats,也就是elastic stack(ELK)。被廣泛應用在日誌資料分析、實時監控等領域。

elasticsearch底層是基於lucene來實現的。

Lucene的優勢:

  • 易擴充套件

  • 高效能(基於倒排索引)

Lucene的缺點:

  • 只限於Java語言開發

  • 學習曲線陡峭

  • 不支援水平擴充套件


倒排索引

倒排索引的概念是基於MySQL這樣的正向索引而言的。

倒排索引中有兩個非常重要的概念:

  • 文件(Document):用來搜尋的資料,其中的每一條資料就是一個文件。例如一個網頁、一個商品資訊
  • 詞條(Term):對文件資料或使用者搜尋資料,利用某種演算法分詞,得到的具備含義的詞語就是詞條。例如:我是中國人,就可以分為:我、是、中國人、中國、國人這樣的幾個詞條

建立倒排索引是對正向索引的一種特殊處理,流程如下:

  • 將每一個文件的資料利用演算法分詞,得到一個個詞條
  • 建立表,每行資料包括詞條、詞條所在文件id、位置等資訊
  • 因為詞條唯一性,可以給詞條建立索引,例如hash表結構索引

如圖:

倒排索引的搜尋流程如下(以搜尋"華為手機"為例):

1)使用者輸入條件"華為手機"進行搜尋。

2)對使用者輸入內容分詞,得到詞條:華為手機

3)拿著詞條在倒排索引中查詢,可以得到包含詞條的文件id:1、2、3。

4)拿著文件id到正向索引中查詢具體文件。

如圖:

雖然要先查詢倒排索引,再查詢倒排索引,但是無論是詞條、還是文件id都建立了索引,查詢速度非常快!無需全表掃描。


初識ES

索引

索引(Index),就是相同型別的文件的集合。

例如:

  • 所有使用者文件,就可以組織在一起,稱為使用者的索引;
  • 所有商品的文件,可以組織在一起,稱為商品的索引;
  • 所有訂單的文件,可以組織在一起,稱為訂單的索引;

因此,我們可以把索引當做是資料庫中的表。

資料庫的表會有約束資訊,用來定義表的結構、欄位的名稱、型別等資訊。因此,索引庫中就有對映(mapping),是索引中文件的欄位約束資訊,類似表的結構約束。

我們統一的把mysql與elasticsearch的概念做一下對比:

MySQL Elasticsearch 說明
Table Index 索引(index),就是文件的集合,類似資料庫的表(table)
Row Document 文件(Document),就是一條條的資料,類似資料庫中的行(Row),文件都是JSON格式
Column Field 欄位(Field),就是JSON文件中的欄位,類似資料庫中的列(Column)
Schema Mapping Mapping(對映)是索引中文件的約束,例如欄位型別約束。類似資料庫的表結構(Schema)
SQL DSL DSL是elasticsearch提供的JSON風格的請求語句,用來操作elasticsearch,實現CRUD

是不是說,我們學習了elasticsearch就不再需要mysql了呢?

並不是如此,兩者各自有自己的擅長支出:

  • Mysql:擅長事務型別操作,可以確保資料的安全和一致性

  • Elasticsearch:擅長海量資料的搜尋、分析、計算

因此在企業中,往往是兩者結合使用:

  • 對安全性要求較高的寫操作,使用mysql實現
  • 對查詢效能要求較高的搜尋需求,使用elasticsearch實現
  • 兩者再基於某種方式,實現資料的同步,保證一致性