1. 程式人生 > 其它 >springboot整合rabbitmq-釋出訂閱模式

springboot整合rabbitmq-釋出訂閱模式

技術標籤:mqjava

本篇文章只涉及到程式碼實現,具體rabbitmq原理可以參考前面文章<<RabbitMQ六種佇列模式>>。

一、定義佇列名稱

package com.shuofeng.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

//釋出訂閱模式的配置,包括兩個佇列和對應的訂閱者,釋出者的交換機型別使用fanout(子網廣播),兩根網線binding用來繫結佇列到交換機
@Configuration
public class PublishSubscribeConfig {
	
	@Bean
	public Queue myQueue1() {
		Queue queue = new Queue("queue1");
		return queue;
	}
	
	@Bean
	public Queue myQueue2() {
		Queue queue = new Queue("queue2");
		return queue;
	}
	
	@Bean
	public FanoutExchange fanoutExchange() {
		FanoutExchange fanoutExchange = new FanoutExchange("fanout");
		return fanoutExchange;
	}
	
	@Bean
	public Binding binding1() {
		Binding binding = BindingBuilder.bind(myQueue1()).to(fanoutExchange());
		return binding;
	}
	
	@Bean
	public Binding binding2() {
		Binding binding = BindingBuilder.bind(myQueue2()).to(fanoutExchange());
		return binding;
	}
}

啟動載入並建立佇列,交換機,同時把佇列繫結到交換機。

二、定義傳送物件MAIL

package com.shuofeng.dto;

import java.io.Serializable;

public class Mail implements Serializable {

    private static final long serialVersionUID = -8140693840257585779L;
    private String mailId;
    private String country;
    private Double weight;


    public Mail() {
    }

    public Mail(String mailId, String country, double weight) {
        this.mailId = mailId;
        this.country = country;
        this.weight = weight;
    }

    public String getMailId() {
        return mailId;
    }

    public void setMailId(String mailId) {
        this.mailId = mailId;
    }

    public String getCountry() {
        return country;
    }

    public void setCountry(String country) {
        this.country = country;
    }

    public double getWeight() {
        return weight;
    }

    public void setWeight(double weight) {
        this.weight = weight;
    }

    @Override
    public String toString() {
        return "Mail [mailId=" + mailId + ", country=" + country + ", weight="
                + weight + "]";
    }

}

三、定義生產者

package com.shuofeng.producer;

import com.shuofeng.dto.Mail;

public interface Publisher {
	public void publishMail(Mail mail);
}
package com.shuofeng.producer.impl;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.shuofeng.dto.Mail;
import com.shuofeng.producer.Publisher;

/**
 * 釋出訂閱模式
 * @author menglingwei
 *
 */
@Service
public class PublisherImpl implements Publisher {
	
	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Override
	public void publishMail(Mail mail) {
		rabbitTemplate.convertAndSend("fanout", "", mail);
	}

}

四、定義消費者

package com.shuofeng.customer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.shuofeng.dto.Mail;

@Component
public class QueueListener1 {
	
	@RabbitListener(queues = "queue1")
	public void displayMail(Mail mail) throws Exception {
		System.out.println("佇列監聽器1號收到訊息" + mail.toString());
	}

}

五、測試

package com.shuofeng;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.web.WebAppConfiguration;

import com.shuofeng.dto.Mail;
import com.shuofeng.producer.impl.ProducerImpl;
import com.shuofeng.producer.impl.PublisherImpl;

@RunWith(MockitoJUnitRunner.class)
@SpringBootTest
@WebAppConfiguration
public class ProducerImplTest {
	
	@SpyBean
	private ProducerImpl producerImpl;
	
	@SpyBean
	private PublisherImpl publisherImpl;
	
	@Test
	public void testSendMail() {
		Mail mail = new Mail();
		mail.setMailId("001");
		mail.setCountry("CHINA");
		mail.setWeight(2.2);
		this.producerImpl.sendMail("myqueue", mail);
	}
	
	@Test
	public void testSendMailByQueue1() {
		Mail mail = new Mail();
		mail.setMailId("Queue1");
		mail.setCountry("CHINA");
		mail.setWeight(2.2);
		this.publisherImpl.publishMail(mail);
	}
	
	@Test
	public void testSendMailByQueue2() {
		Mail mail = new Mail();
		mail.setMailId("Queue2");
		mail.setCountry("CHINA");
		mail.setWeight(2.2);
		this.publisherImpl.publishMail(mail);
	}
	
}

執行結果如下圖: