1. 程式人生 > 其它 >SpringBoot引入RabbitMQ(一):引入進SpringBoot中

SpringBoot引入RabbitMQ(一):引入進SpringBoot中

本地安裝RabbitMQ參考:https://blog.csdn.net/qq_47588845/article/details/107986373

能進入這個頁面後,就可以開始操作了:

pom檔案:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置:

rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虛擬host 初始的就為/
virtual-host: /
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 5
publisher-returns: true
template:
mandatory: true
connection-timeout: 2000

host其實就是這個:

RabbitMQ模組結構:

因為是在單個Demo中測試,所以生產者、消費者放在一個專案中

MqConstant:

public class MqConstant {// 佇列:儲存訊息的容器,用來儲存訊息,直到訊息傳送給消費者
    public static final String THE_MESSAGE_QUEUE = "theMessageQueue";
    // 交換機:提供Producer到Queue之間的匹配,接收生產者傳送的訊息並將這些訊息按照路由規則轉發到訊息佇列,
    // 不會儲存訊息 ,如果沒有 Queue繫結到 Exchange 的話,它會直接丟棄掉 Producer 傳送過來的訊息
public static final String THE_MESSAGE_EXCHANGE = "theMessageExchange"; // 路由鍵:訊息頭的一個屬性,用於標記訊息的路由規則,決定了交換機的轉發路徑 public static final String THE_MESSAGE_KEY = "theMessageKey"; }

用於存放所有傳送、消費訊息必要的3個引數:佇列(Queue)、交換機(Exchange)、路由鍵(Routing Key)

TheMessageDTO(訊息實體,一定要序列化):

@Data
public class TheMessageDTO implements
Serializable { private Long id; private String messageInfo; }

Producer(生產者):

@Component
public class TheMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(TheMessageDTO theMessageDTO) {
        System.out.println("傳送了一條訊息");
        try {
            rabbitTemplate.convertAndSend(MqConstant.THE_MESSAGE_EXCHANGE, MqConstant.THE_MESSAGE_KEY, theMessageDTO);
            System.out.println("訊息傳送成功");
        } catch (Exception e) {
            System.out.println("訊息傳送失敗");
            e.printStackTrace();
        }
    }

}

現在先不寫消費者,先通過啟動一下專案,然後跑一下測試類看看是否可以傳送訊息

測試類:

@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMqTest {
@Autowired private TheMessageProducer theMessageProducer; @Test public void sendMessage() { TheMessageDTO theMessageDTO = new TheMessageDTO(); theMessageDTO.setId(123L); theMessageDTO.setMessageInfo("這是一條訊息"); theMessageProducer.sendMessage(theMessageDTO); } }

看看控制檯執行結果:

雖然顯示傳送成功,但是控制檯卻列印了一條error日誌,粘貼出來看看:

ERROR 10564 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       :

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'theMessageExchange' in vhost '/', class-id=60, method-id=40)

意思就是沒有找到對應的交換機,去RabbitMQ頁面上檢視:

可以發現並沒有剛才定義的交換機,theMessageExchange

再檢視佇列:

 同樣沒有對應的佇列:theMessageQueue

(注:first字首的是之前筆者個人測試用的)

其實在SpringBoot啟動時,會自動建立Exchange、Queue以及他們之間的Key,但是需要在有對應消費者時才會自動建立,因此只有生產者的情況下,需要自行建立他們

Consumer(消費者):

import com.rabbitmq.client.Channel;
import com.zyuan.boot.rabbitmq.message.constant.MqConstant;
import com.zyuan.boot.rabbitmq.message.dto.TheMessageDTO;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class TheMessageConsumer {

    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(value = MqConstant.THE_MESSAGE_QUEUE),
                            exchange = @Exchange(value = MqConstant.THE_MESSAGE_EXCHANGE),
                            key = MqConstant.THE_MESSAGE_KEY
                    )
            }
    )
    public void consumeFirstMessage(@Payload TheMessageDTO theMessageDTO, Channel channel, Message message) {
        System.out.println("消費到了資訊:" + theMessageDTO.toString());
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

然後重啟專案,檢視RabbitMQ頁面:

 可以發現Exchange,Queue都有了,點選theMessageExchange

Routing key也在其中了

這時候再跑一下測試類:

 

 

再檢視Application的控制檯:

訊息已經成功傳送並且成功消費了