1. 程式人生 > 其它 >ASP.NET Core 6.0 使用RabbitMQ

ASP.NET Core 6.0 使用RabbitMQ

1. 簡介

RabbitMQ是一個開源的,基於AMQP(Advanced Message Queuing Protocol)協議的完整的可複用的企業級訊息隊,RabbitMQ可以實現點對點,釋出訂閱等訊息處理模式。

RabbitMQ是一個開源的AMQP實現,伺服器端用Erlang語言編寫,支援Linux,windows,macOS,FreeBSD等作業系統,同時也支援很多語言,如:Python,Java,Ruby,PHP,C#,JavaScript,Go,Elixir,Objective-C,Swift等。

當今市面上有很多主流的訊息中介軟體,如老牌的ActiveMQ、RabbitMQ,炙手可熱的Kafka,阿里巴巴自主開發RocketMQ等。

不同MQ特點

  1. ActiveMQ
    是Apache出品,最流行的,能力強勁的開源訊息匯流排。它是一個完全支援JMS規範的的訊息中介軟體。豐富的API,多種叢集架構模式讓ActiveMQ在業界成為老牌的訊息中介軟體,在中小型企業頗受歡迎!
  2. Kafka
    是LinkedIn開源的分散式釋出-訂閱訊息系統,目前歸屬於Apache頂級專案。Kafka主要特點是基於Pull的模式來處理訊息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支援複製,不支援事務,對訊息的重複、丟失、錯誤沒有嚴格要求,適合產生大量資料的網際網路服務的資料收集業務。
  3. RocketMQ
    是阿里開源的訊息中介軟體,它是純Java開發,具有高吞吐量、高可用性、適合大規模分散式系統應用的特點。RocketMQ思路起源於Kafka,但並不是Kafka的一個Copy,它對訊息的可靠傳輸及事務性做了優化,目前在阿里集團被廣泛應用於交易、充值、流計算、訊息推送、日誌流式處理、binglog分發等場景。
  4. RabbitMQ
    RabbitMQ是使用Erlang語言開發的開源訊息佇列系統,基於AMQP協議來實現。AMQP的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內對資料一致性、穩定性和可靠性要求很高的場景,對效能和吞吐量的要求還在其次。

RabbitMQ比Kafka可靠,Kafka更適合IO高吞吐的處理,一般應用在大資料日誌處理或對實時性(少量延遲),可靠性(少量丟資料)要求稍低的場景使用,比如ELK日誌收集。

RabbitMQ的工作機制:

首先要知道RabbitMQ的三種角色:生產者、消費者、訊息伺服器

  • 生產者:訊息的建立者,負責建立和推送訊息到訊息伺服器
  • 消費者:訊息的接收方,接受訊息並處理訊息
  • 訊息伺服器:其實RabbitMQ本身,不會產生和消費訊息,相當於一箇中轉站,將生產者的訊息路由給消費者

RabbitMQ的一些角色

  • ConnectionFactory:連線管理,應用程式或消費方與RabbitMQ建立連線的管理器
  • Channel:通道,推送訊息的通道
  • Exchange:交換機,用於接收分配訊息到佇列中
  • Queue:儲存訊息
  • Routingkey:訊息會攜帶routingKey,決定訊息最終的佇列
  • BindingKey:Queue通過bindingKey與交換機繫結

2. 安裝

網上有許多RabbitMQ的安裝部落格,所以在此不介紹。可以安裝在 windows、linux、docker

web管理介面介紹

2.1. overview概覽

2.2. Admin使用者和虛擬主機管理

2.2.1. 新增使用者


上面的Tags選項,其實是指定使用者的角色,可選的有以下幾個:

  • 超級管理員(administrator)
    可登陸管理控制檯,可檢視所有的資訊,並且可以對使用者,策略(policy)進行操作。
  • 監控者(monitoring)
    可登陸管理控制檯,同時可以檢視rabbitmq節點的相關資訊(程序數,記憶體使用情況,磁碟使用情況等)
  • 策略制定者(policymaker)
    可登陸管理控制檯, 同時可以對policy進行管理。但無法檢視節點的相關資訊(上圖紅框標識的部分)。
  • 普通管理者(management)
    僅可登陸管理控制檯,無法看到節點資訊,也無法對策略進行管理。
  • 其他
    無法登陸管理控制檯,通常就是普通的生產者和消費者。

2.2.2. 建立虛擬主機


為了讓各個使用者可以互不干擾的工作,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。其實就是一個獨立的訪問路徑,不同使用者使用不同路徑,各自有自己的佇列、交換機,互相不會影響。

2.2.3. 支援的訊息模型


3. NET Core中使用RabbitMQ

RabbitMQ 從資訊接收者角度可以看做三種模式,一對一,一對多(此一對多並不是釋出訂閱,而是每條資訊只有一個接收者)和釋出訂閱。其中一對一是簡單佇列模式,一對多是Worker模式,而釋出訂閱包括髮布訂閱模式,路由模式和萬用字元模式,為什麼說釋出訂閱模式包含三種模式呢,其實發布訂閱,路由,萬用字元三種模式都是使用只是交換機(Exchange)型別不一致

3.1 簡單佇列

首先,我們需要建立兩個控制檯專案.Send(傳送者)和Receive(接收者),然後為兩個專案安裝RabbitMQ.Client驅動

install-package rabbitmq.client

然後在Send和Receive專案中編寫我們的訊息佇列程式碼

生產者程式碼

show code
using RabbitMQ.Client;
using System.Text;

Console.WriteLine("Hello, World! 生產者");

var factory = new ConnectionFactory()       // 建立連線工廠物件
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 建立連線物件
var channel = connection.CreateModel();         // 建立連線會話物件

string queueName = "queue1";

// 宣告一個佇列
channel.QueueDeclare(
    queue: queueName,   // 佇列名稱
    durable: false,     // 是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊
    exclusive: false,   // 是否排他,如果一個佇列宣告為排他佇列,該佇列僅對時候次宣告它的連線可見,並在連線斷開時自動刪除
    autoDelete: false,  // 是否自動刪除,自動刪除的前提是:至少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除
    arguments: null     // 設定佇列的其他引數
);

string str = string.Empty;

do {
    Console.WriteLine("傳送內容:");
    str = Console.ReadLine()!;

    // 訊息內容
    byte[] body = Encoding.UTF8.GetBytes(str);

    // 傳送訊息
    channel.BasicPublish("", queueName, null, body);

    // Console.WriteLine("成功傳送訊息:" + str);
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

code describe

  • 可以看到 RabbitMQ 使用了 IConnectionFactory, IConnection和IModel 來建立連結和通訊管道, IConnection 例項物件只負責與 Rabbit 的連線,而傳送接收這些實際操作全部由會話通道進行。
  • 而後使用 QueneDeclare 方法進行建立訊息佇列,建立完成後可以在 RabbitMQ 的管理工具中看到此佇列,QueneDelare 方法需要一個訊息佇列名稱的必須引數.後面那些引數則代表快取,引數等資訊。
  • 最後使用 BasicPublish 來發送訊息,在一對一中 routingKey 必須和 queueName 一致。

消費者程式碼

show code
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

Console.WriteLine("Hello, World! 消費者1");

var factory = new ConnectionFactory()       // 建立連線工廠物件
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 建立連線物件
IModel channel = connection.CreateModel();         // 建立連線會話物件

string queueName = "queue1";
//宣告一個佇列
channel.QueueDeclare(
  queue: queueName,//訊息佇列名稱
  durable: false,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。
  exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除.
  arguments: null ////設定佇列的一些其它引數
);

// 建立消費者物件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的訊息為:" + Encoding.UTF8.GetString(message));
};

// 消費者開啟監聽
channel.BasicConsume(queueName, true, consumer);

Console.ReadKey();
channel.Dispose();
connection.Close();

code describe

  • 在接收者中是定義一個EventingBasicConsumer物件的消費者(接收者),這個消費者與會話物件關聯,
  • 然後定義接收事件,輸出從訊息佇列中接收的資料,
  • 最後使用會話物件的BasicConsume方法來啟動消費者監聽.消費者的定義也是如此簡單.
  • 不過注意一點,可以看到在接收者程式碼中也有宣告佇列的方法,其實這句程式碼可以去掉,但是如果去掉的話接收者在程式啟動時監聽佇列,而此時這個佇列還未存在,所以會出異常,所以往往會在消費者中也新增一個宣告佇列方法

此時,簡單訊息佇列傳輸就算寫好了,我們可以執行程式碼就行測試

3.2 Worker模式

Worker模式其實是一對多的模式,但是這個一對多並不是像釋出訂閱那種,而是資訊以順序的傳輸給每個接收者,我們可以使用上個例子來執行worker模式甚至,只需要執行多個接收者即可

預設情況下,RabbitMQ會順序的將message發給下一個消費者。每個消費者會得到平均數量的message。這種方式稱之為round-robin(輪詢).
但是很多情況下並不希望訊息平均分配,而是要消費快的多消費,消費少的少消費。還有很多情況下一旦其中一個宕機,那麼另外接收者的無法接收原本這個接收者所要接收的資料。

下面針對上面的兩個問題進行處理
首先我們先來看一下所說的宕機丟失資料一說,我們在上個例子Receive接收事件中新增執行緒等待

consumer.Received += (model, ea) => {
    Thread.Sleep(3000);
    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的訊息為:" + Encoding.UTF8.GetString(message));
};

然後再次啟動兩個接收者進行測試

可以看到傳送者傳送了1-9的數字,第二個接收者在接收資料途中宕機,第一個接收者也並沒有去接收第二個接收者宕機後的資料,有的時候我們會有當接收者宕機後,其餘資料交給其它接收者進行消費,那麼該怎麼進行處理呢,解決這個問題得方法就是改變其訊息確認模式

Rabbit中存在兩種訊息確認模式

  • 自動模式 - 只要訊息從佇列獲取,無論消費者獲取到訊息後是否成功消費,都認為是訊息成功消費.
  • 手動模式 - 消費從佇列中獲取訊息後,伺服器會將該訊息處於不可用狀態,等待消費者反饋。如果消費者在消費過程中出現異常,斷開連線切沒有傳送應答,那麼RabbitMQ會將這個訊息重新投遞。

修改兩個消費者程式碼,並在其中一箇中延遲確認。

consumer.Received += (model, ea) => {
    Thread.Sleep(3000);
    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的訊息為:" + Encoding.UTF8.GetString(message));
    
    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回訊息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設定false 關閉自動確認.

如果在延遲中消費者斷開連線,那麼RabbitMQ會重新投遞未確認的訊息

‘能者多勞’模式
能者多勞是給消費速度快的消費更多的訊息.少的責消費少的訊息.能者多勞是建立在手動確認基礎上實現。
在延遲確認的消費中新增BasicQos

3.3 Exchange模式(釋出訂閱模式,路由模式,萬用字元模式)

前面說過釋出,路由,萬用字元這三種模式其實可以算為一種模式,區別僅僅是互動機型別不同.在這裡出現了一個交換機的東西,傳送者將訊息傳送傳送到交換機,接收者建立各自的訊息佇列繫結到交換機,

通過上面三幅圖可以看出這三種模式本質就是一種訂閱模式,路由,萬用字元模式只是訂閱模式的變種模式。使其可以選擇傳送訂閱者中的接收者。
注意:交換機本身並不儲存資料,資料儲存在訊息佇列中,所以如果向沒有繫結訊息佇列的交換機中傳送資訊,那麼資訊將會丟失

3.3.1 釋出訂閱模式(Fanout)

生產者程式碼

show code
using RabbitMQ.Client;
using System.Text;

Console.WriteLine("Hello, World! 生產者");

var factory = new ConnectionFactory()       // 建立連線工廠物件
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 建立連線物件
var channel = connection.CreateModel();         // 建立連線會話物件

#region 定義交換機
string exchangeName = "exchange1";

channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout); // 把交換機設定為 fanout 釋出訂閱模式
#endregion

string str;
do {
    Console.WriteLine("傳送內容:");
    str = Console.ReadLine()!;

    byte[] body = Encoding.UTF8.GetBytes(str); // 訊息內容

    channel.BasicPublish(exchangeName, "", null, body); // 傳送訊息
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

code describe

  • 程式碼與上面沒有什麼差異,只是由上面的訊息佇列宣告變成了交換機宣告(交換機型別為fanout),也就說傳送者傳送訊息從原來的直接傳送訊息佇列變成了傳送到交換機

消費者程式碼

show code
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

Console.WriteLine("Hello, World! 消費者1");

var factory = new ConnectionFactory()       // 建立連線工廠物件
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 建立連線物件
IModel channel = connection.CreateModel();         // 建立連線會話物件

#region 宣告交換機
string exchangeName = "exchange1";
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
#endregion

#region 宣告佇列
string queueName = exchangeName + "_" + new Random().Next(1, 1000);
Console.WriteLine("佇列名稱:" + queueName);

channel.QueueDeclare(
  queue: queueName,//訊息佇列名稱
  durable: false,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。
  exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除.
  arguments: null ////設定佇列的一些其它引數
);
#endregion


channel.QueueBind(queueName, exchangeName, ""); // 將佇列與交換機繫結

channel.BasicQos(0, 1, false);  // 告訴Rabbit每次只能向消費者傳送一條資訊,再消費者未確認之前,不再向他傳送資訊

// 建立消費者物件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的訊息為:" + Encoding.UTF8.GetString(message));

    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回訊息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設定false 關閉自動確認.

Console.ReadKey();
channel.Dispose();
connection.Close();

code describe

  • 可以看到消費者程式碼與上面有些差異
  • 首先是宣告交換機(同上面一樣,為了防止異常)
  • 然後宣告訊息佇列並對交換機進行繫結,在這裡使用了隨機數,目的是宣告不重複的訊息佇列,如果是同一個訊息佇列,則就變成worker模式,也就是說對於釋出訂閱模式有多少接收者就有多少個訊息佇列,而這些訊息佇列共同從一個交換機中獲取資料

然後同時開兩個接收者,結果就如下

3.3.2 路由模式(Direct)

路由模式下,在釋出訊息時指定不同的routeKey,交換機會根據不同的routeKey分發訊息到不同的佇列中

生產者程式碼

show code
Console.WriteLine("Hello, World! 生產者");

Console.WriteLine($"輸入 routingKey:");
string routingKey = Console.ReadLine()!;

// 建立連線工廠物件
var factory = new ConnectionFactory() {
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 建立連線物件
var channel = connection.CreateModel();         // 建立連線會話物件

#region 定義交換機
string exchangeName = "exchange2";

channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
#endregion

string str;
do {
    Console.WriteLine("傳送內容:");
    str = Console.ReadLine()!;

    byte[] body = Encoding.UTF8.GetBytes(str); // 訊息內容

    channel.BasicPublish(exchangeName, routingKey, null, body); // 傳送訊息
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

申明一個routeKey值為key1,並在釋出訊息的時候告訴了RabbitMQ,訊息傳遞時routeKey必須匹配,才會被佇列接收否則訊息會被拋棄。

消費者程式碼

show code
Console.WriteLine("Hello, World! 消費者1");

Console.WriteLine($"輸入接受key名稱:");
string routeKey = Console.ReadLine()!;

var factory = new ConnectionFactory()       // 建立連線工廠物件
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 建立連線物件
IModel channel = connection.CreateModel();         // 建立連線會話物件

#region 宣告交換機
string exchangeName = "exchange2";
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
#endregion

#region 宣告佇列
string queueName = exchangeName + "_" + new Random().Next(1, 1000);
Console.WriteLine("佇列名稱:" + queueName);

channel.QueueDeclare(
  queue: queueName,//訊息佇列名稱
  durable: false,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。
  exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除.
  arguments: null ////設定佇列的一些其它引數
);
#endregion

channel.QueueBind(queueName, exchangeName, routeKey); // 將佇列與交換機繫結
channel.QueueBind(queueName, exchangeName, "key2"); 
channel.QueueBind(queueName, exchangeName, "key3"); // 可以通過繫結多個,來匹配多個路由 

// channel.BasicQos(0, 1, false);  // 告訴Rabbit每次只能向消費者傳送一條資訊,再消費者未確認之前,不再向他傳送資訊

// 建立消費者物件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的訊息為:" + Encoding.UTF8.GetString(message));

    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回訊息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設定false 關閉自動確認.

Console.ReadKey();
channel.Dispose();
connection.Close();

code describe

  • 一個接收者訊息佇列可以宣告多個路由與交換機進行繫結

執行結果如下

3.3.3 萬用字元模式(Topic)

萬用字元模式與路由模式一致,只不過萬用字元模式中的路由可以宣告為模糊查詢,RabbitMQ擁有兩個萬用字元

  • #:匹配0-n個字元語句
  • *:匹配一個字元語句
  • 注意:RabbitMQ中萬用字元並不像正則中的單個字元,而是一個以“.”分割的字串,如 ”topic1.*“匹配的規則以topic1開始並且"."後只有一段語句的路由 例:“topic1.aaa”,“topic1.bb”
  • 而“#”可以匹配到 “topic1.aaa.bb”,“topic1.bb.cc”.

生產者程式碼

show code
Console.WriteLine("Hello, World! 生產者");

Console.WriteLine($"輸入 routingKey:");
string routingKey = Console.ReadLine()!;


// 建立連線工廠物件
var factory = new ConnectionFactory() {
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};
var connection = factory.CreateConnection();    // 建立連線物件
var channel = connection.CreateModel();         // 建立連線會話物件

#region 定義交換機
string exchangeName = "exchange3";

channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic);
#endregion

string str;
do {
    Console.WriteLine("傳送內容:");
    str = Console.ReadLine()!;

    byte[] body = Encoding.UTF8.GetBytes(str); // 訊息內容

    channel.BasicPublish(exchangeName, routingKey, null, body); // 傳送訊息
} while (str.Trim().ToLower() != "exit");

channel.Close();
connection.Close();

消費者程式碼

show code
Console.WriteLine("Hello, World! 消費者1");

Console.WriteLine($"輸入接受key名稱:");  // key.* 或者 key.#
string routeKey = Console.ReadLine()!;

var factory = new ConnectionFactory()       // 建立連線工廠物件
{
    HostName = "localhost",
    Port = 5672,
    UserName = "guest",
    Password = "guest"
};

IConnection connection = factory.CreateConnection();    // 建立連線物件
IModel channel = connection.CreateModel();         // 建立連線會話物件

#region 宣告交換機
string exchangeName = "exchange3";
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
#endregion

#region 宣告佇列
string queueName = exchangeName + "_" + new Random().Next(1, 1000);
Console.WriteLine("佇列名稱:" + queueName);

channel.QueueDeclare(
  queue: queueName,//訊息佇列名稱
  durable: false,//是否持久化,true持久化,佇列會儲存磁碟,伺服器重啟時可以保證不丟失相關資訊。
  exclusive: false,//是否排他,true排他的,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除.
  autoDelete: false,//是否自動刪除。true是自動刪除。自動刪除的前提是:致少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除.
  arguments: null ////設定佇列的一些其它引數
);
#endregion

channel.QueueBind(queueName, exchangeName, routeKey); // 將佇列與交換機繫結

// 建立消費者物件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {

    byte[] message = ea.Body.ToArray();
    Console.WriteLine("接收到的訊息為:" + Encoding.UTF8.GetString(message));
    channel.BasicAck(ea.DeliveryTag, true); // 開啟返回訊息確認
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer); // 將autoAck設定false 關閉自動確認.

Console.ReadKey();
channel.Dispose();
connection.Close();

只有在萬用字元匹配通過的情況下才會接收訊息

這裡引用兩個連結是對RabbitMQ中引數和方法的說明:
https://blog.csdn.net/fly_leopard/article/details/102821776
https://www.cnblogs.com/cuijl/p/8075130.html

部分內容來自:
https://blog.csdn.net/qq_44845339/article/details/114848670
https://www.cnblogs.com/yan7/p/9498685.html