從OTA測試淺談汽車電子測試發展趨勢
訊息中介軟體-RabbitM
一、基礎知識
1. 什麼是RabbitMQ
RabbitMQ是2007年釋出,是一個在AMQP(高階訊息佇列協議)基礎上完成的,簡稱MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法,由Erlang(專門針對於大資料高併發的語言)語言開發,可複用的企業訊息系統,是當前最主流的訊息中介軟體之一,具有可靠性、靈活的路由、訊息叢集簡單、佇列高可用、多種協議的支援、管理介面、跟蹤機制以及外掛機制。
2.什麼是訊息和佇列
1.訊息 就是資料,增刪改查的資料。例如在員工管理系統中增刪改查的資料
2.佇列 指的是一端進資料一端出資料,例如C#中(Queue資料結構)
3.什麼是訊息佇列
1.訊息佇列指:一端進訊息,一端出訊息
2.RabbitMQ就是實現了訊息佇列概念的一個元件,以面向物件的思想去理解,訊息佇列就是類,而RabbitMQ就是例項,當然不僅僅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以實現訊息佇列。
4.什麼地方使用RabbitMQ
1.在常見的單體架構中,主要流程是使用者UI操作發起Http請求>伺服器處理>然後由伺服器直接和資料庫互動,最後同步反饋使用者結果
2.在微服務架構中,例如下圖中的員工管理系統,UI與微服務通訊,主要是通過Http或者gRPC同步通訊
問題分析
在上述2種情況下,我們發現在UI請求時都是同步操作 ,第2種架構雖然將整體服務按業務拆分成不同的微服務並且對應各自的資料庫,但是在使用者與微服務通訊時,存在的問題依然沒有解決,例如資料庫的承載能力只能處理10w個請求,如果遇到高併發情況下,UI發起50w請求,那資料庫是遠遠承載不了的,從而導致如下問題。
1.高併發請求導致系統性能下降響應慢,同時資料庫承載風險加大
2.擴充套件性不強UI操作的互動對業務的依賴較大,導致使用者體驗下降
3.瞬時流量湧入巨大的話,伺服器可能直接掛了
解決方案
- 為了解決效能瓶頸問題。我們需要將同步通訊換成非同步通訊方式。因此就使用訊息佇列,使用者在UI中操作直接寫入RabbitMQ然後直接返回,剩下的業務操作由訊息佇列和各自的微服務來完成
RabbitMQ的優勢
-
非同步處理,響應快,增加了資料庫(伺服器的承載能力)
-
削峰,可以把流量的高峰分解到不同的時間段來處理
-
解耦(擴充套件性就更強),讓UI和業務獨立演化
-
高可用,處理器如果發生故障了,對其他的處理器沒有影響
RabbitMQ的不足
-
增加了系統複雜性,不方便除錯和開發,在使用RabbitMQ以前前端直接和服務互動,現在加了一層
-
即時性降低了,在某一程度上提升了使用者操作體驗,也降低了使用者體驗,但是避免不了,取長補短
-
更加依賴訊息隊列了
5.RabbitMQ組成概念
1.ConnectionFactory 為Connection的製造工廠。
2.Connection是RabbitMQ的socket連結,它封裝了socket協議相關部分邏輯。
3.Channel是我們與RabbitMQ打交道的最重要的一個介面,我們大部分的業務操作是在Channel這個介面中完成的,包括定義Queue、定義Exchange、繫結Queue與Exchange、釋出訊息等。
4.Exchange(交換機) 我們通常認為生產者將訊息投遞到Queue中,實際上實際的情況是,生產者將訊息傳送到Exchange,由Exchange將訊息路由到一個或多個Queue中(或者丟棄),而在RabbitMQ中的Exchange一共有4種策略,分別為:fanout(扇形)、direct(直連)、topic(主題)、headers(頭部)
二、如何落地RabbitMQ
1.RabbitMQ環境安裝
3.安裝完成之後,載入RabbitMQ管理外掛
rabbitmq-plugins enable rabbitmq_management
4.安裝成功訪問RabbitMQ管理後臺http://localhost:15672
2.建立系統業務
1.分別建立考勤服務,請假服務,計算薪酬服務,郵件服務,簡訊服務消費者角色
2.建立員工管理網站用於模擬前端呼叫,主要充當生產者角色
3.在員工管理網站和每一個模擬微服務中通過nuget引入RabbitMQ.Client
4.在員工管理網站中建立模擬新增考勤的控制器並加入生產者程式碼
//建立連線
using (var connection = factory.CreateConnection())
{
//建立通道
var channel = connection.CreateModel();
//定義佇列
channel.QueueDeclare("CreateAttendance", false, false, false, null);
string json = JsonConvert.SerializeObject(attendanceDto);
//建立內容物件
var properties = channel.CreateBasicProperties();
//傳送訊息
channel.BasicPublish(exchange: "",routingKey: "CreateAttendance",basicProperties: properties,body: Encoding.UTF8.GetBytes(json));
}
5.在考勤微服務中建立介面,並在介面中加入消費者程式碼
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//建立消費者事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
// 1、邏輯程式碼,新增到資料庫
var message = Encoding.UTF8.GetString(body.ToArray());
object json = JsonConvert.DeserializeObject(message);
Console.WriteLine(" [x] 建立考勤資訊 {0}", message);
};
//設定消費者屬性
//p1.監聽佇列p2.訊息確認ACK p3.消費者例項賦值
channel.BasicConsume(queue: "CreateAttendance",autoAck: false,consumer:consumer);
三、Exchange交換機及例項分析
1.Fanout Exchange (扇形交換機)
fanout型別的Exchange路由規則非常簡單,工作方式類似於多播一對多,它會把所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中。
1.生產者一個Exchange對應多個Queue,或者不宣告Queue
2.消費者定義Exchange,如果生產者定義了Queue,那必須將exchange和queue繫結,如果沒有定義佇列,那消費者自己宣告一個隨機Queue用於接收消費訊息
業務例項
當我們有員工需要請假,在員工管理系統提交請假,但是由於公司規定普通員工請假,需要傳送簡訊到他的主管領導,針對此業務場景我們需要呼叫請假服務的同時去傳送簡訊,這時需要兩個消費者(請假服務,簡訊服務)來消費同一條訊息,其實本質就是往RabbitMQ寫入一個能被多個消費者接收的訊息,所以可以使用 扇形交換機
,一個生產者,多個消費者.
生產者模擬使用呼叫控制器來實現
[HttpPost]
public IEnumerable<bool> CreateLeave(CreateLeaveDto createLeaveDto)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//定義交換機
channel.ExchangeDeclare(exchange: "Leave_fanout", type: "fanout");
string productJson = JsonConvert.SerializeObject(createLeaveDto);
var body = Encoding.UTF8.GetBytes(productJson);
var properties = channel.CreateBasicProperties();
//設定訊息持久化
properties.Persistent = true;
channel.BasicPublish(exchange: "Leave_fanout", routingKey: "", basicProperties: properties,body: body);
}
}
消費者實現IHostedService 介面建立一個監聽主機
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定義交換機
channel.ExchangeDeclare(exchange: "Leave_fanout", type: ExchangeType.Fanout);
//定義隨機佇列
var queueName = channel.QueueDeclare().QueueName;
//佇列和交換機繫結
channel.QueueBind(queueName,"Leave_fanout",routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、業務邏輯
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 建立請假 {0}", message);
// 1、自動確認機制缺陷,訊息是否正常新增到資料庫當中,所以需要使用手工確認
channel.BasicAck(ea.DeliveryTag, true);
};
// Qos(防止多個消費者,能力不一致,導致的系統質量問題。
// 每一次一個消費者只成功消費一個)
channel.BasicQos(0, 1, false);
// 訊息確認(防止訊息消費失敗)
channel.BasicConsume(queue: queueName ,autoAck: false,consumer: consumer);
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、關閉rabbitmq的連線
throw new NotImplementedException();
}
}
2.Direct Exchange (直連交換機)
直接交換器,工作方式類似於單播一對一,Exchange會將訊息傳送完全匹配ROUTING_KEY的Queue,缺陷是無法實現多生產者對一個消費者
1.生產者一個Exchange對應一個routingKey繫結,也可以宣告佇列並繫結,然後向指定的佇列傳送訊息。
2.消費者需要定義Exchange和routingKey,如果生產者宣告並綁定了佇列,那消費者必須繫結生產者指定的Queue來接收訊息,如果沒有指定Queue,那消費者需要自己宣告一個隨機Queue然後繫結用於接收訊息
當我們員工管理系統需要計算薪資並將結果以傳送簡訊的方式告訴員工,這個時候我們就不太適合用“扇形交換機”了,因為換做是你,你也不想你的工資全公司都知道吧?這個時候就需要定製了一對一的場景了,那就在生產訊息時使用直連交換機
根據routingKey傳送指定的消費者.
生產者模擬使用呼叫控制器來實現
public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "admin",
UserName = "admin",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義交換機
channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: "direct");
string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);
//3、傳送訊息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設定訊息持久化
//p1 指定交換機
//p2 routingKey
channel.BasicPublish(exchange: "CalculateSalary_direct",routingKey: "product-sms",basicProperties: properties,body: body);
}
}
消費者實現IHostedService 介面建立一個監聽主機
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定義交換機
channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: ExchangeType.Direct);
// 2、定義隨機佇列
var queueName = channel.QueueDeclare().QueueName;
// 3、佇列要和交換機繫結起來
channel.QueueBind(queueName,"CalculateSalary_direct",routingKey: "product-sms");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、業務邏輯
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 傳送簡訊 {0}", message);
// 1、訊息是否正常新增到資料庫當中,所以需要使用手工確認
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消費訊息
channel.BasicQos(0, 1, false); // Qos(防止多個消費者,能力不一致,導致的系統質量問題。
// autoAck設為false 不進行自動確認
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、關閉rabbitmq的連線
throw new NotImplementedException();
}
}
3.Topic Exchange (主題交換機)
Exchange繫結佇列需要制定Key; Key 可以有自己的規則;Key可以有佔位符;或者# ,匹配一個單詞、#匹配多個單詞,在Direct基礎上加上模糊匹配;多生產者一個消費者,可以多對對,也可以多對1, 真實專案當中,使用主題交換機。可以滿足所有場景
1.生產者定義Exchange,然後不同的routingKey繫結
2.消費者定義Exchange,如果生產者定義了Queue,那必須將exchange和queue以及routingKey繫結,如果沒有定義佇列,那消費者自己宣告一個隨機Queue用於接收消費訊息,
3.消費者routingKey的模糊匹配,生產者傳送訊息時routingKey定義以sms.開頭, * 號只能匹配的routingKey為一級,例如(sms.A)或(sms.B)的傳送的訊息,# 能夠匹配的routingKey為一級及多級以上 ,例如 (sms.A)或者(sms.A.QWE.IOP)
在月底的時候我們需要把員工存在異常考勤資訊,薪資結算資訊,請假資訊分別以郵件的形式傳送給我們的員工查閱,我們知道這是一個典型的多個生產者,一個消費者場景,異常考勤資訊,薪資結算資訊,請假資訊分別需要生產訊息傳送到RabbitMQ,然後供我們員工消費
分別模擬3個生產者:異常考勤資訊,薪資結算資訊,請假資訊
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "admin",
UserName = "admin",
VirtualHost = "/"
};
//計算薪資生產者
public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義topic交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);
//3、傳送訊息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設定訊息持久化
//p1 指定交換機
//p2 routingKey
channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateSalary",basicProperties: properties,body: body);
}
}
//考勤生產者
public IEnumerable<bool> SendCalculateAttendance(CalculateAttendanceDto calculateAttendance)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義topic交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);
//3、傳送訊息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設定訊息持久化
//p1 指定交換機
//p2 routingKey
channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);
}
}
//請假資訊生產者
public IEnumerable<bool> SendCalculateLeave(CalculateLeaveDto calculateLeave)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel()