1. 程式人生 > 實用技巧 >rabbitMq完整通訊(一)---producer

rabbitMq完整通訊(一)---producer

application.properties:

server.port=8080
spring.application.name=producer
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

最後是pom

先建立兩個佇列:

package com..direct;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; //配置類,隨系統啟動時,建立兩個佇列, 用來接收發送過來的資料 @Configuration public class DirectConf { @Bean public Queue queue() { // System.out.println("系統啟動時:建立一個queue的佇列到rabbitMQ"); return new Queue("queue"); } @Bean public Queue queueObject() {
// System.out.println("系統啟動時:建立一個queueObject的佇列到rabbitMQ"); return new Queue("queueObject"); } }

建立佇列和交換器,並進行繫結:

package com..topic;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //配置類,隨系統啟動時,根據需求建立交換器和佇列, 用來接收服務端傳送過來的資料 @Configuration public class TopicConf { //系統啟動時:建立一個message的佇列到rabbitMQ @Bean(name="message") public Queue queueMessage() { System.out.println("系統啟動時:建立一個topic.order的佇列到rabbitMQ"); return new Queue("topic.order"); } //系統啟動時:建立一個exchange的交換器到rabbitMQ @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } //系統啟動時:將exchange的交換器與佇列繫結 @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { System.out.println("系統啟動時:將exchange的交換器與topic.order佇列繫結"); return BindingBuilder.bind(queueMessage).to(exchange).with("topic.order"); } }

定義佇列傳送的方法:

package com..sender;

import java.util.Map;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {
    //注入AmqpTemplate
    @Autowired
    private AmqpTemplate template;
    //由AmqpTemplate將資料傳送到指定的佇列
    public void send(String queueName,String orderId) {
        System.out.println("由AmqpTemplate將資料傳送到指定的佇列");
        template.convertAndSend(queueName, orderId);
    }
    //由AmqpTemplate將資料傳送到指定的佇列,主要用於傳送物件
    public void sendObject(String queueName,Map user) {
        System.out.println("由AmqpTemplate將資料傳送到指定的佇列,主要用於傳送物件");
        template.convertAndSend(queueName,user);
    }
    //由AmqpTemplate將資料傳送到交換機和佇列
    public void sendTopic(String exchange, String queueName, String orderId) {
        System.out.println(Thread.currentThread().getName()+":  進入sendTopic方法");
        System.out.println("%%%由AmqpTemplate將資料傳送到交換機"+exchange+" 和佇列 "+queueName);
        template.convertAndSend(exchange,queueName,orderId);
    }
}

RabbitListener監聽服務端傳送到佇列的資料:

package com.wondersgroup.receive;

import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class OrderInfoReceive {
    //接收從topic.orderReceive佇列的資料(主要存放了服務端訂單查詢的結果)
    @RabbitListener(queues="topic.orderReceive")    
    public void process1(String orderInfo) {    //用User作為引數
        System.out.println("監聽%%%====topic.orderReceive  佇列取到的  orderInfo :========:"+orderInfo);
    }    
}

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <artifactId>product</artifactId>
    <packaging>jar</packaging>

   <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
       <version>1.5.21.RELEASE</version>
    </parent>
    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
            <scope>true</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!-- 新增springboot對amqp的支援 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</project>