SpringBoot整合RabbitMQ解耦合
阿新 • • 發佈:2018-11-17
RabbitMQ的訊息接收者程式往往需要操作業務資料,如何將收發訊息的工具類與業務系統解耦合,提供訊息工具類的應用範圍是一個需要解決的問題,本例中新增一層介面,參考下圖:
同步訊息
SyncSendMsg
package com.test.util; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; @Component public class SyncSendMsg { @Value("${spring.rabbitmq.host}") private String host = "localhost"; @Value("${spring.rabbitmq.port}") private String port = "5672"; @Value("${spring.rabbitmq.username}") private String userName = "admin2"; @Value("${spring.rabbitmq.password}") private String pwd = "admin2"; public byte[] toBytes(Object obj) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(obj); byte[] data = baos.toByteArray(); baos.close(); return data; } catch(Exception e) { e.printStackTrace(); } return null; } public boolean sendSyncObject(Object obj) { try { System.out.println("host="+host+",port="+port); //1.creeate ConnectionFactory ConnectionFactory cf = new ConnectionFactory(); cf.setHost(host); cf.setPort(Integer.parseInt(port)); cf.setUsername(userName); cf.setPassword(pwd); //2.create Conection Connection con = cf.newConnection(); //3.create Channel Channel channel = con.createChannel(); //4.create exchage String exgName = "test_rpc_exg"; channel.exchangeDeclare(exgName, "direct"); String queueName = "test_rpc"; String routeKey = "java.io.File"; boolean durable = true; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); String replyTo = channel.queueDeclare().getQueue(); final String corrId = java.util.UUID.randomUUID().toString(); //5.bind exchange and queue channel.queueBind(queueName,exgName,routeKey); //6.send msg AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder() .correlationId(corrId).replyTo(replyTo).build(); channel.basicPublish(exgName, routeKey, prop,toBytes(obj)); final BlockingQueue<byte[]> response = new ArrayBlockingQueue<byte[]>(1); Consumer csum = new DefaultConsumer(channel){ @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { String corrId2 = properties.getCorrelationId(); if(corrId.equals(corrId2)) { response.offer(body); } } }; channel.basicConsume(replyTo,true,csum); //從JVM阻塞佇列中獲取回覆訊息,如果沒收到當前執行緒阻塞 byte[] result = response.take(); String str = new String(result); //7.close Connection channel.close(); con.close(); if("true".equals(str)) return true; else return false; } catch(Exception e) { e.printStackTrace(); } return false; } }
SyncReceiveMsg
package com.test.util; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; @Component public class SyncReceiveMsg { @Autowired private IMsgProcess serv; @Value("${spring.rabbitmq.host}") private String host = "localhost"; @Value("${spring.rabbitmq.port}") private String port = "5672"; @Value("${spring.rabbitmq.username}") private String userName = "admin2"; @Value("${spring.rabbitmq.password}") private String pwd = "admin2"; @Scheduled(cron="0/2 * * ? * *") public void receive() { try { //System.out.println("receive = "+new java.sql.Timestamp(System.currentTimeMillis())); //1.creeate ConnectionFactory ConnectionFactory cf = new ConnectionFactory(); cf.setHost(host); cf.setPort(Integer.parseInt(port)); cf.setUsername(userName); cf.setPassword(pwd); //2.create Conection Connection con = cf.newConnection(); //3.create Channel final Channel channel = con.createChannel(); String queueName = "test_rpc"; Consumer csum = new DefaultConsumer(channel){ @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { try { String replyTo = properties.getReplyTo(); String corrId = properties.getCorrelationId(); AMQP.BasicProperties replyProp = new AMQP.BasicProperties().builder() .correlationId(corrId).build(); Object obj = toObject(body); System.out.println("obj==="+obj); boolean rtn = serv.process(obj); String result = rtn+""; channel.basicPublish("", replyTo, replyProp, result.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } catch(Exception e) { e.printStackTrace(); } } }; boolean isAck = false; channel.basicConsume(queueName,isAck,csum); //7.close Connection //channel.close(); //con.close(); } catch(Exception e) { e.printStackTrace(); } } public Object toObject(byte[] data) { try { ByteArrayInputStream bais = new ByteArrayInputStream(data); ObjectInputStream ois = new ObjectInputStream(bais); Object obj = ois.readObject(); bais.close(); return obj; } catch(Exception e) { e.printStackTrace(); } return null; } }
IMsgProcess
package com.test.util;
public interface IMsgProcess {
public Boolean process(Object obj);
}
同步訊息工具類應用
Appliction.properties配置檔案
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
POM檔案
Controller類傳送訊息
服務實現類,實現IMsgProcess介面
啟動類
非同步訊息
交換機,佇列配置類
package com.test.util;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置Rabbitmq交換機與佇列
* 如果通過Rabbitmq控制檯建立好交換機與佇列,此類不再需要
* @author java
*
*/
@Configuration
public class RabbitmqConf {
public static String QUEUE1 = "test_queue1";
public static String EXCHANGE1 = "test_exg1";
public static String ROUTINGKEY1 = "java";
@Bean
public Queue queue1()
{
return new Queue(QUEUE1);
}
@Bean
public DirectExchange exg1()
{
return new DirectExchange(EXCHANGE1);
}
@Bean
public Binding directBinding1() {
return BindingBuilder.bind(queue1()).to(exg1()).with(ROUTINGKEY1);
}
}
訊息監聽器
package com.test.util;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 接收RabbitMQ非同步訊息的監聽器
* 當監聽的佇列上有訊息,獲取訊息並儲存到資料庫,需要注意Service介面
* @author java
*
*/
@Component
public class RabbitmqListener {
@Autowired
private IMsgProcess serv;
@RabbitListener(queues="test_queue1")
public void receive(Message msg)
{
try
{
byte[] data = msg.getBody();
Object obj = toObject(data);
boolean rtn = serv.process(obj);
System.out.println("Receive Msg="+new String(data));
}
catch(Exception e)
{
e.printStackTrace();
}
}
public Object toObject(byte[] data)
{
try
{
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
Object obj = ois.readObject();
bais.close();
return obj;
}
catch(Exception e)
{
e.printStackTrace();
}
return null;
}
}
非同步訊息應用
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
POM檔案
Controller層傳送訊息
服務實現類