SpringBoot(十三):SpringBoot整合RabbitMQ
阿新 • • 發佈:2019-01-24
如果對RabbitMQ不熟悉的,建議先看RabbitMQ系列教程。
一、環境準備
- RabbitMQ 3.7.4
- SpringBoot 1.5.10.RELEASE
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: 192.168.239.128
port: 5672
# username: test
# password: 123456
# virtual-host: /vhost_test
# publisher-confirms: true
二、簡單佇列
RabbitMQConfiguration.java 宣告佇列
package cn.saytime.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
private static final String QUEUE_SIMPLE_NAME = "test_simple_queue";
@Bean
public Queue queue(){
return new Queue(QUEUE_SIMPLE_NAME, false, false, false, null);
}
}
消費者
package cn.saytime.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費者
*/
@RabbitListener(queues = "test_simple_queue")
@Component
public class SimpleRecv {
@RabbitHandler
public void process(String message) {
System.out.println("[x] rev : " + message);
}
}
測試生產者
package cn.saytime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSimpleQueue() {
String message = "Hello RabbitMQ !";
amqpTemplate.convertAndSend("test_simple_queue", message);
System.out.println("[x] send " + message + " ok");
}
}
執行test:
[x] send Hello RabbitMQ ! ok
[x] rev : Hello RabbitMQ !
三、工作佇列
However, “Fair dispatch” is the default configuration for spring-amqp
公平分發模式在Spring-amqp中是預設的,這種情況也是日常工作中使用最為正常的,輪詢模式用的較少,區別在於prefetch預設是1,如果設定為0就是輪詢模式。
3.1 公平分發模式
RabbitMQConfiguration.java 宣告佇列
package cn.saytime.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
private static final String QUEUE_WORK_NAME = "test_work_queue";
@Bean
public Queue workQueue(){
return new Queue(QUEUE_WORK_NAME, false, false, false, null);
}
}
消費者1
package cn.saytime.listener.workfair;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費者
*/
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv {
@RabbitHandler
public void process(String message){
System.out.println("[1] rev : " + message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消費者2
package cn.saytime.listener.workfair;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費者
*/
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv2 {
@RabbitHandler
public void process(String message){
System.out.println("[2] rev : " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試生產者
package cn.saytime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testWorkFairQueue(){
for (int i = 0; i < 20; i++) {
String message = "Hello RabbitMQ " + i;
// 傳送訊息
amqpTemplate.convertAndSend("test_workfair_queue", message);
System.out.println(" [x] Sent '" + message + "'");
try {
Thread.sleep(i*100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
執行test:
[x] Sent 'Hello RabbitMQ 0'
[x] Sent 'Hello RabbitMQ 1'
[2] rev : Hello RabbitMQ 1
[1] rev : Hello RabbitMQ 0
[x] Sent 'Hello RabbitMQ 2'
[x] Sent 'Hello RabbitMQ 3'
[x] Sent 'Hello RabbitMQ 4'
[x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
[x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
[x] Sent 'Hello RabbitMQ 7'
[x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
[x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
[x] Sent 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
[x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
[x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
[x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
[x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
[x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
[x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
[x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
[x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
[x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19
公平分發模式測試正常。
3.2 輪詢分發模式
修改application.yml
spring:
rabbitmq:
host: 192.168.239.128
port: 5672
# username: test
# password: 123456
# virtual-host: /vhost_test
# publisher-confirms: true
listener:
simple:
prefetch: 0
[x] Sent 'Hello RabbitMQ 0'
[x] Sent 'Hello RabbitMQ 1'
[1] rev : Hello RabbitMQ 0
[2] rev : Hello RabbitMQ 1
[x] Sent 'Hello RabbitMQ 2'
[x] Sent 'Hello RabbitMQ 3'
[x] Sent 'Hello RabbitMQ 4'
[x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
[x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
[x] Sent 'Hello RabbitMQ 7'
[x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
[x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
[x] Sent 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
[x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
[x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
[x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
[x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
[x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
[x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
[x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
[x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
[x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19
可以看到消費者1消費偶數,消費者2消費奇數,表示輪詢分發正常。
四、訂閱模式
定義交換機、佇列、以及佇列與交換機的繫結關係。
package cn.saytime.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
private static final String EXCHANGE_FANNOUT_NAME = "test_exchange_fanout";
private static final String QUEUE_PS_SMS_NAME = "test_queue_fanout_sms";
private static final String QUEUE_PS_EMAIL_NAME = "test_queue_fanout_email";
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCHANGE_FANNOUT_NAME);
}
@Bean
public Queue fanoutSmsQueue(){
return new Queue(QUEUE_PS_SMS_NAME, false, false, false, null);
}
@Bean
public Queue fanoutEmailQueue(){
return new Queue(QUEUE_PS_EMAIL_NAME, false, false, false, null);
}
@Bean
public Binding smsQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutSmsQueue){
return BindingBuilder.bind(fanoutSmsQueue).to(fanoutExchange);
}
@Bean
public Binding emailQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutEmailQueue){
return BindingBuilder.bind(fanoutEmailQueue).to(fanoutExchange);
}
}
消費者1 sms
package cn.saytime.listener.ps;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* sms消費者
*/
@RabbitListener(queues = "test_queue_fanout_sms")
@Component
public class SmsRecv {
@RabbitHandler
public void process(String message){
System.out.println("[sms] rev : " + message);
}
}
消費者2
package cn.saytime.listener.ps;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* email消費者
*/
@RabbitListener(queues = "test_queue_fanout_email")
@Component
public class EmailRecv {
@RabbitHandler
public void process(String message){
System.out.println("[email] rev : " + message);
}
}
測試訊息:
package cn.saytime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testFanoutQueue() {
String message = "Hello, fanout message ";
// 傳送訊息
amqpTemplate.convertAndSend("test_exchange_fanout", "", message);
System.out.println(" [x] Sent '" + message + "'");
}
}
執行test結果:
[x] Sent 'Hello, fanout message '
[sms] rev : Hello, fanout message
[email] rev : Hello, fanout message
五、路由模式
跟訂閱模式一樣,只不過Configuration裡面配置的是DirectExchange,並設定路由鍵。
繫結關係:
BindingBuilder.bind(queue).to(exchange).with("info");
傳送訊息:
amqpTemplate.convertSendAndReceive(exchange, routingkey, message);
六、主題模式
跟路由模式一樣,只不過路由鍵可以模糊匹配,配置的是TopicExchange.