RabbitMQ實踐--安裝、JAVA客戶端操作
RabbitMQ是一種訊息中介軟體,用於處理來自客戶端的非同步訊息。服務端將要傳送的訊息放入到佇列池中。接收端可以根據RabbitMQ配置的轉發機制接收服務端發來的訊息。RabbitMQ依據指定的轉發規則進行訊息的轉發、緩衝和持久化操作,主要用在多伺服器間或單伺服器的子系統間進行通訊,是分散式系統標準的配置。
RabbitMQ服務端安裝
Rabbitmq基於erlang語言開發,所有需要安裝erlang虛擬機器,各平臺參考官網安裝即可。Mac、linux的安裝方法在頁面的相對後面一點,也很簡單。
連結地址:http://www.erlang.org/downloads
同理,參考rabbitMQ官網來安裝RabbitMQ:
連結地址:
開啟管理外掛
使用Rabbit MQ 管理外掛,可以更好的視覺化方式檢視Rabbit MQ 伺服器例項的狀態
請在windows在RabbitMQ的安裝目錄 執行如下命令
# sbin\rabbitmq-plugins.bat enable rabbitmq_management
# net stop RabbitMQ && net start RabbitMQ
各平臺外掛管理命令:
開啟某個外掛:rabbitmq-plugins enable xxx
關閉某個外掛:rabbitmq-plugins disable xxx
注意:重啟伺服器後生效
然後開啟連線http://localhost:15672,以guest/guest登入就可以看到伺服器當前的執行狀態
文件地址
java客戶端操作實踐
“Hello World”
本小節建立一個很簡單的佇列,一個生產者,一個消費者。
新增maven依賴,此處對slf4j的實現是簡單的slf4j-simple實現,在真正的生產環境中建議使用log4j、logback等。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId >amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
程式碼如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//將訊息傳送到某個Queue上面去
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//如同資料庫連線一樣,依次關閉連線
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告接收某個佇列的訊息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//建立一個預設消費者,並在handleDelivery中回撥處理訊息內容
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//channel繫結佇列、消費者,autoAck為true表示一旦收到訊息則自動回覆確認訊息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
執行:
傳送端執行兩次
接收端會一直接收
Work Queues
分發訊息佇列,多個消費者
程式碼如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//將訊息傳送到某個Queue上面去
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10; i++) {//改為傳送十次訊息
String message = "Hello World " + (i+1);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//如同資料庫連線一樣,依次關閉連線
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class Recv1 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告接收某個佇列的訊息
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//建立一個預設消費者,並在handleDelivery中回撥處理訊息內容
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//增加處理時間
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//channel繫結佇列、消費者,autoAck為true表示一旦收到訊息自動回覆確認訊息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
此處Recv1與Recv2程式碼一致,不再贅述
執行
send傳送如下
[x] Sent ‘Hello World 1’
[x] Sent ‘Hello World 2’
[x] Sent ‘Hello World 3’
[x] Sent ‘Hello World 4’
[x] Sent ‘Hello World 5’
[x] Sent ‘Hello World 6’
[x] Sent ‘Hello World 7’
[x] Sent ‘Hello World 8’
[x] Sent ‘Hello World 9’
[x] Sent ‘Hello World 10’
Recv1接收如下
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World 1’
[x] Received ‘Hello World 3’
[x] Received ‘Hello World 5’
[x] Received ‘Hello World 7’
[x] Received ‘Hello World 9’
Recv2接收如下
[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘Hello World 2’
[x] Received ‘Hello World 4’
[x] Received ‘Hello World 6’
[x] Received ‘Hello World 8’
[x] Received ‘Hello World 10’
探索1:將Rev1的Thread.sleep(50)修改為50,意思是Rev1的處理能力比Rev2的處理能力強20倍,會發生什麼事情呢?
結果還是和原來一樣,每個Recv處理5個間隔一個的訊息,為什麼這樣呢?預設情況下,RabbitMQ採用輪詢的方式傳送message,所以只能一個輪一個地傳送。這個在機器處理能力不均勻的場景是不合適的,當然RabbitMQ是提供了配置方法的。
探索2:在Recv2處理到收到第二條訊息的時候,我們強制kill掉Recv2會發生什麼事情呢?剩餘的訊息會不會轉發到Recv1上呢?
不會的,當前的模式下,RabbitMQ傳送完訊息後就把快取的訊息刪除了,不關心訊息是否真正的處理是否完成,所以如果宕機等會發生訊息丟失的情況。這個時候就需要訊息確認機制了,Recv真正的收到訊息,處理完訊息後,RabbitMQ才刪除訊息。
修改方法如下:
Recv增加訊息確認反饋機制
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//增加處理時間
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
//此處增加訊息確認確認機制,envelope.getDeliveryTag()獲取訊息的唯一標識,false表示僅ack當前訊息
channel.basicAck(envelope.getDeliveryTag(), false);
}
//channel繫結佇列、消費者,autoAck為true表示一旦收到訊息自動回覆確認訊息
channel.basicConsume(QUEUE_NAME, false, consumer);
探索3:訊息確認機制可以大體上保證消費端不丟失訊息,那麼Broker怎麼保證呢?
開啟持久化即可,當然達不到100%,畢竟持久化也是需要少量的時間,但這個時間可能造成微量損失。
程式碼改動如下:
//統一修改佇列名稱
private final static String QUEUE_NAME = "task_queue";
//宣告佇列的時候說明屬性
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
//傳送訊息的時候說明儲存方式
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
為什麼要改佇列名字呢?因為RabbitMQ只會遵循首次建立佇列時候的屬性!後面宣告同一個名字的佇列的時候,如果沒有這個佇列則建立,有則忽略建立任務。當然,你也可以先刪除以前的佇列,然後再建立,就不用改佇列名了。
探索4:修正探索1的缺點
改成根據處理能力來分發訊息,程式碼修改如下:
Recv增加設定
int prefetchCount = 1;
channel.basicQos(prefetchCount);
最終整體程式碼如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class Send {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//將訊息傳送到某個Queue上面去
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
for (int i = 0; i < 10; i++) {//改為傳送十次訊息
String message = "Hello World " + (i+1);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//如同資料庫連線一樣,依次關閉連線
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class Recv1 {
private final static String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//建立一個連線,並且從連線處獲取一個channel,為什麼用channel參考"RabbitMQ--整體綜述"
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
//宣告接收某個佇列的訊息
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//建立一個預設消費者,並在handleDelivery中回撥處理訊息內容
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
//增加處理時間
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
//此處增加訊息確認確認機制,envelope.getDeliveryTag()獲取訊息的唯一標識,false表示僅ack當前訊息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//channel繫結佇列、消費者,autoAck為true表示一旦收到訊息自動回覆確認訊息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
Publish/Subscribe
建立一個日誌系統,一個Send,兩個Recv。
重點是增加Exchange,通過fanout型別將訊息廣播給所有的的Recv。
這裡我們建立臨時佇列,從RabbitMQ中獲取佇列名,然後在不需要使用的時候刪除它。
程式碼如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws java.io.IOException, TimeoutException {
//建立連線
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告exchange,並用fanout型別
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 10; i++) {//改為傳送十次訊息
String message = "Hello World " + (i + 1);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
//關閉連線
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
//簡歷連線
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//宣告Exchange型別
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//獲取隨機佇列名稱
String queueName = channel.queueDeclare().getQueue();
//Exchange與queue繫結
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] " + queueName + "Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(queueName, true, consumer);
}
}
執行兩次ReceiveLogs,再執行EmitLog,就會發現ReceiveLogs都接受到了相同的訊息
Routing
重點是Exchange的direct型別。如果某些消費者只關注部分訊息怎麼辦?這個direct型別解決這類問題,也叫routing模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.*;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
private static final List<String> SEVERITIES = new ArrayList<>();
static {
SEVERITIES.add("info");
SEVERITIES.add("error");
SEVERITIES.add("warning");
}
public static void main(String[] argv) throws java.io.IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
for (int i = 0; i < 10; i++) {
String severity = SEVERITIES.get(new Random().nextInt(3));//隨機產生一個routingKey
String message = "some logs " + i;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
//繫結固定型別的routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "error");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class ReceiveLogsDirect2 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
//繫結固定型別的routingKey,只關心error
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
執行即可發現ReceiveLogsDirect2只收到了error的相關資訊,而ReceiveLogsDirect1接收到了所有訊息
Topics
Exchange的topic型別,用萬用字元的方式來匹配相應的接收資訊
符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞
程式碼如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "topic_logs";
private static final List<String> SEVERITIES = new ArrayList<>();
static {
// 符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞
SEVERITIES.add("*.*.rabbit");
SEVERITIES.add("a.b.rabbit");
SEVERITIES.add("c.rabbit");//丟失,因為不匹配
SEVERITIES.add("lazy.#");
SEVERITIES.add("lazy.a.b");
SEVERITIES.add("lazy.c");
SEVERITIES.add("*.orange.*");
SEVERITIES.add("a.orange.b");
SEVERITIES.add("c.orange");//丟失,因為不匹配
}
public static void main(String[] argv) throws java.io.IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
for (int i = 0; i < 30; i++) {
String severity = SEVERITIES.get(new Random().nextInt(SEVERITIES.size()));//隨機產生一個routingKey
String message = "some logs " + i;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
//繫結固定型別的topic
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
*
* @author xuexiaolei
* @version 2017年08月20日
*/
public class ReceiveLogsDirect2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
//繫結固定型別的topic
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
Remote procedure call (RPC)
RPC畢竟不是MQ擅長的事情,建議使用擅長的工具做擅長的事,所以此處不再贅述