1. 程式人生 > >Java中簡單使用RabbitMQ進行訊息收發

Java中簡單使用RabbitMQ進行訊息收發

文章目錄


在專案中需要使用RabbitMQ作為訊息佇列,於是寫了一個RabbitMQ的服務提供類,這個服務提供類包含RabbitMQ相關例項的初始化及建立連線、訊息的接收以及訊息的傳送,想偷懶的童鞋拷過去改改配置什麼的就可直接使用,以此做個記錄,方便以後溫故而知新。

Windows中安裝RabbitMQ-Server

安裝Erlang

因為RabbitMQ是使用Erlang開發的,所以要使用RabbitMQ-Server首先就需要安裝Erlang的執行環境,可以在官網下載安裝,安裝後需要配置環境變數。

配置示例:

ERLANG_HOME:D:\software\erl10.1

Path末尾拼接:%ERLANG_HOME%\bin;

安裝RabbitMQ-Server

可在官網自行下載安裝,安裝後需要配置環境變數。

配置示例:

RABBITMQ_SERVER:D:\software\RabbitMQ Server\rabbitmq_server-3.7.9

Path末尾拼接:%RABBITMQ_SERVER%\sbin;

配置完成後便可啟動RabbitMQ-Server,初始使用者名稱與密碼是guest:guest。

其他具體的配置及命令,不詳述,請自行查詢。

需要使用的依賴包

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

RabbitMQ服務提供類,含訊息接收與傳送

import java.io.IOException;
import java.util.HashMap;
import java.util.Map; import java.util.concurrent.TimeoutException; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; /** * rabbitmq服務提供類 * @author dr */ @Component public class RabbitMQProvider { private Logger logger = LoggerFactory.getLogger(this.getClass()); /**是否啟用RabbitMQ*/ @Value("${rabbitmq.setting.enable}") private boolean enable; /**交換器名*/ @Value("${rabbitmq.setting.exchange.name}") private String exchangeName; /**下行佇列名*/ @Value("down.${push.setting.school}") private String downQueueName; @Value("down.${push.setting.school}") private String downRoutingKey; @Value("up.${push.setting.school}") private String upQueueName; @Value("up.${push.setting.school}") private String upRoutingKey; @Value("${rabbitmq.setting.username}") private String username; @Value("${rabbitmq.setting.password}") private String password; @Value("${rabbitmq.setting.host}") private String host; @Value("${rabbitmq.setting.port}") private Integer port; // 是否自動應答 private boolean autoAck = true; private Channel channelDown; private Channel channelUp; private Connection upConnection; private Connection downConnection; @Value("${rabbitmq.setting.message-life-time}") private Integer messageLifeTime; /**處理接收到的訊息的處理例項,即我們的業務程式碼*/ @Autowired private MQDealer mqDealer; // 初始化 @PostConstruct private void init() { if (!enable) return; try { // 建立連線 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setHost(host); factory.setPort(port); // 佇列引數 Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", messageLifeTime*1000);// 訊息過期時間 // 建立上行連線 upConnection = factory.newConnection(); // 建立上行通道 channelUp = upConnection.createChannel(); // 宣告建立配置上行佇列 channelUp.queueDeclare(upQueueName, true, false, false, args); // 將佇列與交換器繫結,並設定路由碼 channelUp.queueBind(upQueueName, exchangeName, upRoutingKey); downConnection = factory.newConnection(); channelDown = downConnection.createChannel(); channelDown.queueDeclare(downQueueName, true, false, false, args); channelDown.queueBind(downQueueName, exchangeName, downRoutingKey); receiveMessage(); } catch (Exception e) { logger.error("啟動MQ下行通道時出現異常!", e); } } /** * 持續監聽佇列以接收資料 * @throws IOException * @throws TimeoutException */ private void receiveMessage() throws IOException, TimeoutException { // 每次快取5個訊息在本地 channelDown.basicQos(5); channelDown.basicConsume(downQueueName, autoAck, "myConsumerTag", new DefaultConsumer(channelDown) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); logger.debug( downQueueName + " Received '" + message + "'" + ", routingKey: " + envelope.getRoutingKey()); // 處理接收到的訊息 mqDealer.deal(message); // 持續監聽 channelDown.basicConsume(downQueueName, autoAck, "myConsumerTag", this); channelDown.basicAck(envelope.getDeliveryTag(), true); } }); } /** * 向上行訊息佇列傳送一條訊息 * @param message * @throws IOException * @throws TimeoutException */ public void sendMessage(String message) throws IOException, TimeoutException { channelUp.basicPublish(exchangeName, upRoutingKey, true, MessageProperties.TEXT_PLAIN, message.getBytes()); logger.debug("send message to " + upQueueName + ": " + message); } }

解析

  • 因為網友反映使用單連線單通道同時傳送和接收訊息容易導致佇列阻塞,所以這裡採用了雙連線雙通道,分別負責接收和傳送;
  • 這裡使用rabbitMQ的方式是直接使用的rabbitMQ提供的開發包,而非Spring整合過的;
  • 建議設定訊息過期時間,因為太多訊息積壓在佇列中而沒有消費的話,可能會導致佇列的阻塞,我在開發中已經碰到了一次;
  • 在宣告佇列時,如果該佇列不存在則會建立,如果已經存在則直接使用現有的,需要注意的是,如果已經有現存佇列,那麼再次宣告時,需要屬性和現存的完全一致,否則會出現異常;
  • 這段程式碼中屬性的載入,連線、通道等例項的初始化使用了Spring的IoC,如果專案沒有使用Spring,需要自行載入和初始執行;