SpringBoot引入RabbitMQ(一):引入進SpringBoot中
本地安裝RabbitMQ參考:https://blog.csdn.net/qq_47588845/article/details/107986373
能進入這個頁面後,就可以開始操作了:
pom檔案:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#虛擬host 初始的就為/
virtual-host: /
listener:
simple:
acknowledge-mode: manual
concurrency: 5
max-concurrency: 5
publisher-returns: true
template:
mandatory: true
connection-timeout: 2000
host其實就是這個:
RabbitMQ模組結構:
因為是在單個Demo中測試,所以生產者、消費者放在一個專案中
MqConstant:
public class MqConstant {// 佇列:儲存訊息的容器,用來儲存訊息,直到訊息傳送給消費者 public static final String THE_MESSAGE_QUEUE = "theMessageQueue"; // 交換機:提供Producer到Queue之間的匹配,接收生產者傳送的訊息並將這些訊息按照路由規則轉發到訊息佇列, // 不會儲存訊息 ,如果沒有 Queue繫結到 Exchange 的話,它會直接丟棄掉 Producer 傳送過來的訊息public static final String THE_MESSAGE_EXCHANGE = "theMessageExchange"; // 路由鍵:訊息頭的一個屬性,用於標記訊息的路由規則,決定了交換機的轉發路徑 public static final String THE_MESSAGE_KEY = "theMessageKey"; }
用於存放所有傳送、消費訊息必要的3個引數:佇列(Queue)、交換機(Exchange)、路由鍵(Routing Key)
TheMessageDTO(訊息實體,一定要序列化):
@Data public class TheMessageDTO implementsSerializable { private Long id; private String messageInfo; }
Producer(生產者):
@Component public class TheMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(TheMessageDTO theMessageDTO) { System.out.println("傳送了一條訊息"); try { rabbitTemplate.convertAndSend(MqConstant.THE_MESSAGE_EXCHANGE, MqConstant.THE_MESSAGE_KEY, theMessageDTO); System.out.println("訊息傳送成功"); } catch (Exception e) { System.out.println("訊息傳送失敗"); e.printStackTrace(); } } }
現在先不寫消費者,先通過啟動一下專案,然後跑一下測試類看看是否可以傳送訊息
測試類:
@SpringBootTest @RunWith(SpringRunner.class) public class RabbitMqTest {
@Autowired private TheMessageProducer theMessageProducer; @Test public void sendMessage() { TheMessageDTO theMessageDTO = new TheMessageDTO(); theMessageDTO.setId(123L); theMessageDTO.setMessageInfo("這是一條訊息"); theMessageProducer.sendMessage(theMessageDTO); } }
看看控制檯執行結果:
雖然顯示傳送成功,但是控制檯卻列印了一條error日誌,粘貼出來看看:
ERROR 10564 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory :
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'theMessageExchange' in vhost '/', class-id=60, method-id=40)
意思就是沒有找到對應的交換機,去RabbitMQ頁面上檢視:
可以發現並沒有剛才定義的交換機,theMessageExchange
再檢視佇列:
同樣沒有對應的佇列:theMessageQueue
(注:first字首的是之前筆者個人測試用的)
其實在SpringBoot啟動時,會自動建立Exchange、Queue以及他們之間的Key,但是需要在有對應消費者時才會自動建立,因此只有生產者的情況下,需要自行建立他們
Consumer(消費者):
import com.rabbitmq.client.Channel; import com.zyuan.boot.rabbitmq.message.constant.MqConstant; import com.zyuan.boot.rabbitmq.message.dto.TheMessageDTO; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class TheMessageConsumer { @RabbitListener( bindings = { @QueueBinding( value = @Queue(value = MqConstant.THE_MESSAGE_QUEUE), exchange = @Exchange(value = MqConstant.THE_MESSAGE_EXCHANGE), key = MqConstant.THE_MESSAGE_KEY ) } ) public void consumeFirstMessage(@Payload TheMessageDTO theMessageDTO, Channel channel, Message message) { System.out.println("消費到了資訊:" + theMessageDTO.toString()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } }
然後重啟專案,檢視RabbitMQ頁面:
可以發現Exchange,Queue都有了,點選theMessageExchange
Routing key也在其中了
這時候再跑一下測試類:
再檢視Application的控制檯:
訊息已經成功傳送並且成功消費了