springboot整合rabbitmq-釋出訂閱模式
阿新 • • 發佈:2020-12-28
本篇文章只涉及到程式碼實現,具體rabbitmq原理可以參考前面文章<<RabbitMQ六種佇列模式>>。
一、定義佇列名稱
package com.shuofeng.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //釋出訂閱模式的配置,包括兩個佇列和對應的訂閱者,釋出者的交換機型別使用fanout(子網廣播),兩根網線binding用來繫結佇列到交換機 @Configuration public class PublishSubscribeConfig { @Bean public Queue myQueue1() { Queue queue = new Queue("queue1"); return queue; } @Bean public Queue myQueue2() { Queue queue = new Queue("queue2"); return queue; } @Bean public FanoutExchange fanoutExchange() { FanoutExchange fanoutExchange = new FanoutExchange("fanout"); return fanoutExchange; } @Bean public Binding binding1() { Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange()); return binding; } @Bean public Binding binding2() { Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange()); return binding; } }
啟動載入並建立佇列,交換機,同時把佇列繫結到交換機。
二、定義傳送物件MAIL
package com.shuofeng.dto; import java.io.Serializable; public class Mail implements Serializable { private static final long serialVersionUID = -8140693840257585779L; private String mailId; private String country; private Double weight; public Mail() { } public Mail(String mailId, String country, double weight) { this.mailId = mailId; this.country = country; this.weight = weight; } public String getMailId() { return mailId; } public void setMailId(String mailId) { this.mailId = mailId; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; } public double getWeight() { return weight; } public void setWeight(double weight) { this.weight = weight; } @Override public String toString() { return "Mail [mailId=" + mailId + ", country=" + country + ", weight=" + weight + "]"; } }
三、定義生產者
package com.shuofeng.producer;
import com.shuofeng.dto.Mail;
public interface Publisher {
public void publishMail(Mail mail);
}
package com.shuofeng.producer.impl; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.shuofeng.dto.Mail; import com.shuofeng.producer.Publisher; /** * 釋出訂閱模式 * @author menglingwei * */ @Service public class PublisherImpl implements Publisher { @Autowired private RabbitTemplate rabbitTemplate; @Override public void publishMail(Mail mail) { rabbitTemplate.convertAndSend("fanout", "", mail); } }
四、定義消費者
package com.shuofeng.customer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.shuofeng.dto.Mail;
@Component
public class QueueListener1 {
@RabbitListener(queues = "queue1")
public void displayMail(Mail mail) throws Exception {
System.out.println("佇列監聽器1號收到訊息" + mail.toString());
}
}
五、測試
package com.shuofeng;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.web.WebAppConfiguration;
import com.shuofeng.dto.Mail;
import com.shuofeng.producer.impl.ProducerImpl;
import com.shuofeng.producer.impl.PublisherImpl;
@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@WebAppConfiguration
public class ProducerImplTest {
@SpyBean
private ProducerImpl producerImpl;
@SpyBean
private PublisherImpl publisherImpl;
@Test
public void testSendMail() {
Mail mail = new Mail();
mail.setMailId("001");
mail.setCountry("CHINA");
mail.setWeight(2.2);
this.producerImpl.sendMail("myqueue", mail);
}
@Test
public void testSendMailByQueue1() {
Mail mail = new Mail();
mail.setMailId("Queue1");
mail.setCountry("CHINA");
mail.setWeight(2.2);
this.publisherImpl.publishMail(mail);
}
@Test
public void testSendMailByQueue2() {
Mail mail = new Mail();
mail.setMailId("Queue2");
mail.setCountry("CHINA");
mail.setWeight(2.2);
this.publisherImpl.publishMail(mail);
}
}
執行結果如下圖: