微服務:ES
SpringAMQP工作佇列
Work queues,也被稱為(Task queues),任務模型。簡單來說就是讓多個消費者繫結到一個佇列,共同消費佇列中的訊息。
當訊息處理比較耗時的時候,可能生產訊息的速度會遠遠大於訊息的消費速度。長此以往,訊息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,多個消費者共同處理訊息處理,速度就能大大提高了。
再編寫一個接受者
@RabbitListener(queues = "simple.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消費者1接收到訊息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "simple.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消費者2........接收到訊息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
通過設定prefetch
來調整訊息預取的數量
Work模型的使用:
-
多個消費者繫結到一個佇列,同一條訊息只會被一個消費者處理
-
通過設定prefetch來控制消費者預取的訊息數量
交換機
釋出訂閱的模型如圖:
可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:
- Publisher:生產者,也就是要傳送訊息的程式,但是不再發送到佇列中,而是發給X(交換機)
- Exchange:交換機,圖中的X。一方面,接收生產者傳送的訊息。另一方面,知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄。到底如何操作,取決於Exchange的型別。Exchange有以下3種類型:
- Fanout:廣播,將訊息交給所有繫結到交換機的佇列
- Direct:定向,把訊息交給符合指定routing key 的佇列
- Topic:萬用字元,把訊息交給符合routing pattern(路由模式) 的佇列
- Consumer:消費者,與以前一樣,訂閱佇列,沒有變化
- Queue:訊息佇列也與以前一樣,接收訊息、快取訊息。
Exchange(交換機)只負責轉發訊息,不具備儲存訊息的能力,因此如果沒有任何佇列與Exchange繫結,或者沒有符合路由規則的佇列,那麼訊息會丟失!
Fanout型別
/* *在consumer服務中,利用程式碼宣告佇列、交換機,並將兩者繫結 *在consumer服務中,編寫兩個消費者方法,分別監聽fanout.queue1和fanout.queue2 *在publisher中編寫測試方法,向itcast.fanout傳送訊息 */ //步驟1:在consumer服務宣告Exchange、Queue、Binding @Configuration public class FanoutConfig { /** * 宣告交換機 * @return Fanout型別交換機 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange("itcast.fanout"); } /** * 第1個佇列 */ @Bean public Queue fanoutQueue1(){ return new Queue("fanout.queue1"); } /** * 繫結佇列和交換機 */ @Bean public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange); } /** * 第2個佇列 */ @Bean public Queue fanoutQueue2(){ return new Queue("fanout.queue2"); } /** * 繫結佇列和交換機 */ @Bean public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange); } } //步驟2:在consumer服務宣告兩個消費者 @RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) { System.out.println("消費者1接收到Fanout訊息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) { System.out.println("消費者2接收到Fanout訊息:【" + msg + "】"); } //步驟3:在publisher服務傳送訊息到FanoutExchange @Test public void testFanoutExchange() { // 佇列名稱 String exchangeName = "itcast.fanout"; // 訊息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message); }
Direct型別
Direct Exchange 會將接收到的訊息根據規則路由到指定的Queue,因此稱為路由模式(routes)。
-
每一個Queue都與Exchange設定一個BindingKey
-
釋出者傳送訊息時,指定訊息的RoutingKey
-
Exchange將訊息路由到BindingKey與訊息RoutingKey一致的佇列
//基於註解方式完成Exchange、Queue的繫結
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消費者接收到direct.queue1的訊息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消費者接收到direct.queue2的訊息:【" + msg + "】");
}
//publisher服務傳送訊息到DirectExchange
@Test
public void testSendDirectExchange() {
// 交換機名稱
String exchangeName = "itcast.direct";
// 訊息
String message = "hello, red!";
// 傳送訊息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
描述下Direct交換機與Fanout交換機的差異?
-
Fanout交換機將訊息路由給每一個與之繫結的佇列
-
Direct交換機根據RoutingKey判斷路由給哪個佇列
-
如果多個佇列具有相同的RoutingKey,則與Fanout功能類似
基於@RabbitListener註解宣告佇列和交換機有哪些常見註解?
-
@Queue
-
@Exchange
Topic型別
Topic
型別的Exchange
與Direct
相比,都是可以根據RoutingKey
把訊息路由到不同的佇列。只不過Topic
型別Exchange
可以讓佇列在繫結Routing key
的時候使用萬用字元!
Routingkey
一般都是有一個或多個單片語成,多個單詞之間以”.”分割,例如: item.insert
萬用字元規則:
#
:匹配一個或多個詞
*
:匹配不多不少恰好1個詞
圖示:
解釋:
- Queue1:繫結的是
china.#
,因此凡是以china.
開頭的routing key
都會被匹配到。包括china.news和china.weather - Queue2:繫結的是
#.news
,因此凡是以.news
結尾的routing key
都會被匹配。包括china.news和japan.news
//訊息接收
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
//接受所有 china 開頭的訊息
))
public void listenTopicQueue1(String msg){
System.out.println("消費者接收到topic.queue1的訊息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
//接受所有 .news 結尾的訊息
))
public void listenTopicQueue2(String msg){
System.out.println("消費者接收到topic.queue2的訊息:【" + msg + "】");
}
//訊息傳送
@Test
public void testSendTopicExchange() {
// 交換機名稱
String exchangeName = "itcast.topic";
// 訊息
String message = "喜報!孫悟空大戰哥斯拉,勝!";
// 傳送訊息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
訊息轉換器
之前,Spring會把你傳送的訊息序列化為位元組傳送給MQ,接收訊息的時候,還會把位元組反序列化為Java物件。
只不過,預設情況下Spring採用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
- 資料體積過大
- 有安全漏洞
- 可讀性差
配置JSON轉換器
顯然,JDK序列化方式並不合適。我們希望訊息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
在publisher和consumer兩個服務中都引入依賴:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
配置訊息轉換器。
在啟動類中新增一個Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
SpringAMQP中訊息的序列化和反序列化是怎麼實現的?
-
利用MessageConverter實現的,預設是JDK的序列化
-
注意傳送方與接收方必須使用相同的MessageConverter
Elasticsearch
elasticsearch是一款非常強大的開源搜尋引擎,可以幫助我們從海量資料中快速找到需要的內容。
elasticsearch結合kibana、Logstash、Beats,也就是elastic stack(ELK)。被廣泛應用在日誌資料分析、實時監控等領域。
elasticsearch底層是基於lucene來實現的。
Lucene的優勢:
-
易擴充套件
-
高效能(基於倒排索引)
Lucene的缺點:
-
只限於Java語言開發
-
學習曲線陡峭
-
不支援水平擴充套件
倒排索引
倒排索引的概念是基於MySQL這樣的正向索引而言的。
倒排索引中有兩個非常重要的概念:
- 文件(
Document
):用來搜尋的資料,其中的每一條資料就是一個文件。例如一個網頁、一個商品資訊 - 詞條(
Term
):對文件資料或使用者搜尋資料,利用某種演算法分詞,得到的具備含義的詞語就是詞條。例如:我是中國人,就可以分為:我、是、中國人、中國、國人這樣的幾個詞條
建立倒排索引是對正向索引的一種特殊處理,流程如下:
- 將每一個文件的資料利用演算法分詞,得到一個個詞條
- 建立表,每行資料包括詞條、詞條所在文件id、位置等資訊
- 因為詞條唯一性,可以給詞條建立索引,例如hash表結構索引
如圖:
倒排索引的搜尋流程如下(以搜尋"華為手機"為例):
1)使用者輸入條件"華為手機"
進行搜尋。
2)對使用者輸入內容分詞,得到詞條:華為
、手機
。
3)拿著詞條在倒排索引中查詢,可以得到包含詞條的文件id:1、2、3。
4)拿著文件id到正向索引中查詢具體文件。
如圖:
雖然要先查詢倒排索引,再查詢倒排索引,但是無論是詞條、還是文件id都建立了索引,查詢速度非常快!無需全表掃描。
初識ES
索引
索引(Index),就是相同型別的文件的集合。
例如:
- 所有使用者文件,就可以組織在一起,稱為使用者的索引;
- 所有商品的文件,可以組織在一起,稱為商品的索引;
- 所有訂單的文件,可以組織在一起,稱為訂單的索引;
因此,我們可以把索引當做是資料庫中的表。
資料庫的表會有約束資訊,用來定義表的結構、欄位的名稱、型別等資訊。因此,索引庫中就有對映(mapping),是索引中文件的欄位約束資訊,類似表的結構約束。
我們統一的把mysql與elasticsearch的概念做一下對比:
MySQL | Elasticsearch | 說明 |
---|---|---|
Table | Index | 索引(index),就是文件的集合,類似資料庫的表(table) |
Row | Document | 文件(Document),就是一條條的資料,類似資料庫中的行(Row),文件都是JSON格式 |
Column | Field | 欄位(Field),就是JSON文件中的欄位,類似資料庫中的列(Column) |
Schema | Mapping | Mapping(對映)是索引中文件的約束,例如欄位型別約束。類似資料庫的表結構(Schema) |
SQL | DSL | DSL是elasticsearch提供的JSON風格的請求語句,用來操作elasticsearch,實現CRUD |
是不是說,我們學習了elasticsearch就不再需要mysql了呢?
並不是如此,兩者各自有自己的擅長支出:
-
Mysql:擅長事務型別操作,可以確保資料的安全和一致性
-
Elasticsearch:擅長海量資料的搜尋、分析、計算
因此在企業中,往往是兩者結合使用:
- 對安全性要求較高的寫操作,使用mysql實現
- 對查詢效能要求較高的搜尋需求,使用elasticsearch實現
- 兩者再基於某種方式,實現資料的同步,保證一致性