設計一個簡單的訊息佇列
由於分散式系統的廣泛應用,越來越多地涉及到系統間通訊。系統間通訊一般有兩種方式,一種是基於遠端過程呼叫的方式,另一種是基於訊息佇列的方式。基於訊息佇列的方式是指由應用中的某個系統負責傳送訊息,由關心這條訊息的系統負責接收訊息,並在接收到訊息後進行各自的業務處理。
目前主流的訊息中介軟體有RabbitMQ、RocketMQ、ActiveMQ、Kafka等
一、訊息佇列的作用
(1)解耦
訊息佇列的各種實現產品又叫,既然是中介軟體,就是用訊息佇列實現兩個模組的遠端呼叫,模組只關心自己的核心流程,而不依賴呼叫的執行結果。
(2)流量削峰
利用訊息佇列可以將短時間高併發請求持久化,然後逐步處理,從而削平高峰期的併發流量,改善系統性能
(3)日誌收集
利用訊息佇列產品在接收和持久化訊息方面的高效能,引入訊息佇列快速收集日誌資訊,避免為寫入日誌時的某些故障導致業務系統訪問阻塞、請求延遲等。
(4)事務最終一致性
二、訊息佇列的功能特點
訊息佇列這個屬於包含訊息和佇列兩個關鍵詞,訊息是指應用間傳遞的資料,可以使簡單的字串,也可以是複雜的結構化物件定義格式;佇列指訊息的進和出,它包含一個容器,至少需實現訊息的傳送、接收和暫存功能。在生產環境中,訊息佇列還需解決諸如訊息堆積、訊息持久化、可靠投遞、訊息重複、嚴格有序、叢集等各種問題。
訊息佇列的簡單模型:
Broker:訊息處理中心,負責訊息的接收、儲存、轉發
Producer:訊息生產者,負責產生和傳送訊息到訊息處理中心
Consumer:訊息消費者,負責從訊息中心獲取訊息,並進行相應的處理
三、用java實現一個簡單的訊息佇列
結構圖:
Broker類:
package com.youzi.MQ;
import java.util.concurrent.ArrayBlockingQueue;
/**
* 訊息處理中心
*/
public class Broker {
//設定儲存訊息的最大數量
private final static int MAX_SIZE = 5;
//儲存訊息的容器
private static ArrayBlockingQueue<String> MassageQueue = new ArrayBlockingQueue <String>(MAX_SIZE);
//生產訊息
public static void produce(String msg){
if (MassageQueue.offer(msg)){
System.out.println("成功向訊息中心投遞訊息:"+msg+",當前暫存訊息數目為"+MassageQueue.size());
}else{
System.out.println("訊息中心已滿,不能繼續放入訊息!");
}
System.out.println("==================================");
}
//消費訊息
public static String consume(){
String msg = MassageQueue.poll();
if(msg!=null){
System.out.println("已經消費訊息:"+msg+",當前暫存訊息數目為"+MassageQueue.size());
}else{
System.out.println("訊息處理中心已經沒有訊息可供消費!");
}
System.out.println("==================================");
return msg;
}
}
用BrokerServer類對外提供Broker類的服務:
package com.youzi.MQ;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
/**
* 訊息佇列服務
*/
public class BrokerServer implements Runnable {
public static int SERVICE_PORT = 9999;
private final Socket socket ;
public BrokerServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream());
){
while (true){
String str = in.readLine();
if (str==null){
continue;
}
System.out.println("接收到的原始資料為:"+str);
if (str.equals("CONSUME")){//CONSUME表示要消費一條訊息
String msg = Broker.consume();
out.println(msg);
out.flush();
}else{//其他情況都表示要生產訊息到訊息佇列中
Broker.produce(str);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(BrokerServer.SERVICE_PORT);
while(true){
BrokerServer bs = new BrokerServer(server.accept());
new Thread(bs).start();
}
}
}
客戶端訪問:
package com.youzi.MQ;
import org.omg.CORBA.portable.UnknownException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class MQClient {
//生產訊息
public static void produce(String msg) throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try (
PrintWriter out = new PrintWriter(socket.getOutputStream());
){
out.println(msg);
out.flush();
}
}
//消費訊息
public static String consume() throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())){
//先向訊息佇列傳送CONSUME表示消費訊息
out.println("CONSUME");
out.flush();
//再從佇列獲取一條訊息
String message = in.readLine();
return message;
}
}
}
生產者客戶端測試類:
package com.youzi.MQ;
public class ProduceClient {
public static void main(String[] args) throws Exception {
MQClient client = new MQClient();
client.produce("Hello World4!!");
}
}
消費者客戶端測試類:
package com.youzi.MQ;
public class ConsumeClient {
public static void main(String[] args) throws Exception {
MQClient client = new MQClient();
String message = client.consume();
System.out.println("獲取的訊息為:"+message);
}
}
先啟動服務端BrokerServer,因為訊息處理中心最大容量設定了5,這裡生產者啟動6次,服務端輸出:
然後啟動6次訊息消費端,服務端輸出: