1. 程式人生 > 實用技巧 >rabbitmq工作模式(一)釋出訂閱模式

rabbitmq工作模式(一)釋出訂閱模式

專案碼雲地址:https://gitee.com/menbbo/springboot-rabbitmq.git

RabbitMQ的工作模式包括了:工作佇列模式、釋出訂閱模式、路由模式、TOPIC(萬用字元模式)。

釋出訂閱模式的訊息釋出到消費流程是:

  (1)生產者將資訊傳送給交換機;

  (2)交換機與多個佇列進行繫結,每個消費者監聽自己的佇列;

  (3)交換機得到訊息之後,將訊息傳送給與其繫結的佇列中,每個繫結的佇列都會得到訊息,再由消費者去拉取訊息。

值得注意的是與同一個佇列繫結的消費者同一訊息只能由一個消費者消費,比如Queue1佇列綁定了consumer1與sonsumer3,此時一個佇列中的一個訊息只能被這兩個消費者中的一個消費。

springboot整合rabbitMQ實現釋出訂閱模式

首先建立produce模組,作為生產者模組。

(1)新建SpringBoot專案springboot-rabbit作為父工程,父工程下建立produce作為生產者模組匯入依賴。

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

(2)配置rabbitMQ的相關屬性

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
server.port=8082

(3)建立佇列,交換機,並進行繫結。此類中建立了hello與info兩個佇列,以及一個交換機,使用bingExchange函式將交換機與佇列繫結。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.amqp.core.Queue; @Configuration public class RabbitCofnig { @Bean(name = "helloQueue") public Queue CreateQueue() { return new Queue("hello"); //例項化佇列hello 消費者1監聽 } @Bean(name = "infoQueue") public Queue CreateQueue2(){ return new Queue("info"); //建立佇列info 由消費者2監聽 } @Bean public FanoutExchange createFunOutChange(){ return new FanoutExchange("Exchange"); //配置交換機 } @Bean public Binding bingExchange(@Qualifier("helloQueue") Queue queue, FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); //將交換機與相關佇列繫結 } @Bean public Binding bingExchange2(@Qualifier("infoQueue") Queue queue,FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }

(4)建立測試類,將資訊儲存到RabbitMQ中。

@SpringBootTest
class ProduceApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        rabbitTemplate.convertAndSend("Exchange","",context);
    }

}

接下來建立消費者

分別建立consumer、infoconsumer、infoconsumer2三個module作為消費者,其中infoconsumer、infoconsumer2與info佇列繫結,consumer與hello佇列繫結。以consumer模組為例。

(1)pom檔案中引入依賴

    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
    </dependency>

(2)application.properties檔案配置rabbitMQ

server.port=8083
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

(3)建立消費者類,從繫結的佇列中獲取資訊。

@Component
@RabbitListener(queues = "hello") //與hello佇列繫結
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
}

結果測試:

啟動生產者測試類,項佇列hello和info中新增資訊,從rabbitMQ的管理介面中可以看到資訊已經新增到佇列中。

依次啟動三個消費者,可以看出其中consumer與infoconsumer得到了資訊,而infoconsumer2沒有拿到資訊,這也證明了綁定了同一佇列的消費者,一個資訊只能由一個消費者消費,且採用輪詢的方式得到訊息。