1. 程式人生 > >Spring 整合 RabbitMQ

Spring 整合 RabbitMQ


pom.xml


<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.0.2.RELEASE</version>
</dependency>


spring-rabbitmq-parent.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 連線服務配置  -->
    <rabbit:connection-factory id="connectionFactory" host="${rabbit.host}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}"/>

    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- queue 佇列宣告 -->
    <rabbit:queue id="queue" name="${rabbit.queue.name}"/>

    <!-- exchange queue binging key 繫結 -->
    <rabbit:direct-exchange id="directExchange" name="${rabbit.direct.exchange.name}">
        <rabbit:bindings>
            <rabbit:binding queue="queue" key="${rabbit.queue.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>


spring-rabbitmq-producer.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 匯入生產者和消費者的公共配置 -->
    <import resource="spring-rabbitmq-parent.xml"/>

    <!-- spring template宣告 -->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="${rabbit.direct.exchange.name}" queue="${rabbit.queue.name}" routing-key="${rabbit.queue.key}"/>
</beans>


spring-rabbitmq-consumer.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 匯入生產者和消費者的公共配置 -->
    <import resource="spring-rabbitmq-parent.xml"/>

    <!-- queue litener  觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="queue" ref="${rabbit.queue.listener}"/>
    </rabbit:listener-container>
</beans>


RabbitMQUtil.java


package com.app.core.util;

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

@Log4j2
public class RabbitMQUtil {
    /**
     * RabbitMQ訊息傳送和接收模板
     */
    private static RabbitTemplate template;

    @Autowired
    public void setTemplate(RabbitTemplate template) {
        RabbitMQUtil.template = template;
    }

    /**
     * 傳送文字訊息
     *
     * @param msg 訊息內容
     */
    public static void send(String msg) {
        template.convertAndSend(msg);

        if (log.isInfoEnabled())
            log.info("RabbitMQ訊息傳送成功,訊息內容:{}", msg);
    }
}


QueueListener.java


package com.app.server.listener;

import lombok.extern.log4j.Log4j2;
import org.apache.commons.codec.CharEncoding;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Log4j2
@Component
public class QueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            // 將 byte 陣列轉換為字串
            String msgContent = new String(message.getBody(), CharEncoding.UTF_8);

            if (log.isInfoEnabled())
                log.info("RabbitMQ訊息接收成功,訊息內容:{}", msgContent);
        } catch (UnsupportedEncodingException e) {
            log.error("RabbitMQ編碼型別不支援", e);
        }
    }
}


config.properties


rabbit.host=127.0.0.1
rabbit.port=8080
rabbit.username=
rabbit.password=
rabbit.direct.exchange.name=exchange.demo.name
rabbit.queue.key=queue.demo.key
rabbit.queue.name=queue.demo.name
rabbit.queue.listener=queueListener