rabbitMq完整通訊(一)---producer
阿新 • • 發佈:2021-01-21
application.properties:
server.port=8080 spring.application.name=producer spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest 最後是pom
先建立兩個佇列:
package com..direct; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration; //配置類,隨系統啟動時,建立兩個佇列, 用來接收發送過來的資料 @Configuration public class DirectConf { @Bean public Queue queue() { // System.out.println("系統啟動時:建立一個queue的佇列到rabbitMQ"); return new Queue("queue"); } @Bean public Queue queueObject() {// System.out.println("系統啟動時:建立一個queueObject的佇列到rabbitMQ"); return new Queue("queueObject"); } }
建立佇列和交換器,並進行繫結:
package com..topic; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //配置類,隨系統啟動時,根據需求建立交換器和佇列, 用來接收服務端傳送過來的資料 @Configuration public class TopicConf { //系統啟動時:建立一個message的佇列到rabbitMQ @Bean(name="message") public Queue queueMessage() { System.out.println("系統啟動時:建立一個topic.order的佇列到rabbitMQ"); return new Queue("topic.order"); } //系統啟動時:建立一個exchange的交換器到rabbitMQ @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } //系統啟動時:將exchange的交換器與佇列繫結 @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { System.out.println("系統啟動時:將exchange的交換器與topic.order佇列繫結"); return BindingBuilder.bind(queueMessage).to(exchange).with("topic.order"); } }
定義佇列傳送的方法:
package com..sender; import java.util.Map; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RabbitSender { //注入AmqpTemplate @Autowired private AmqpTemplate template; //由AmqpTemplate將資料傳送到指定的佇列 public void send(String queueName,String orderId) { System.out.println("由AmqpTemplate將資料傳送到指定的佇列"); template.convertAndSend(queueName, orderId); } //由AmqpTemplate將資料傳送到指定的佇列,主要用於傳送物件 public void sendObject(String queueName,Map user) { System.out.println("由AmqpTemplate將資料傳送到指定的佇列,主要用於傳送物件"); template.convertAndSend(queueName,user); } //由AmqpTemplate將資料傳送到交換機和佇列 public void sendTopic(String exchange, String queueName, String orderId) { System.out.println(Thread.currentThread().getName()+": 進入sendTopic方法"); System.out.println("%%%由AmqpTemplate將資料傳送到交換機"+exchange+" 和佇列 "+queueName); template.convertAndSend(exchange,queueName,orderId); } }
RabbitListener監聽服務端傳送到佇列的資料:
package com.wondersgroup.receive; import java.util.Map; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class OrderInfoReceive { //接收從topic.orderReceive佇列的資料(主要存放了服務端訂單查詢的結果) @RabbitListener(queues="topic.orderReceive") public void process1(String orderInfo) { //用User作為引數 System.out.println("監聽%%%====topic.orderReceive 佇列取到的 orderInfo :========:"+orderInfo); } }
POM:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>product</artifactId> <packaging>jar</packaging> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.21.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> <scope>true</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- 新增springboot對amqp的支援 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> </dependencies> </project>