java實現rabbitmq訊息的傳送接受
阿新 • • 發佈:2019-01-23
本文不介紹amqp和rabbitmq相關知識,請自行網上查閱
本文是基於spring-rabbit中介軟體來實現訊息的傳送接受功能
see http://www.rabbitmq.com/tutorials/tutorial-one-java.html
see http://www.springsource.org/spring-amqp
<!-- for rabbitmq --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>com.caucho</groupId> <artifactId>hessian</artifactId> <version>4.0.7</version> </dependency> </dependencies>
首先我們需要一個用來在app和rabbitmq之間傳遞訊息的持有物件
public class EventMessage implements Serializable{ private String queueName; private String exchangeName; private byte[] eventData; public EventMessage(String queueName, String exchangeName, byte[] eventData) { this.queueName = queueName; this.exchangeName = exchangeName; this.eventData = eventData; } public EventMessage() { } public String getQueueName() { return queueName; } public String getExchangeName() { return exchangeName; } public byte[] getEventData() { return eventData; } @Override public String toString() { return "EopEventMessage [queueName=" + queueName + ", exchangeName=" + exchangeName + ", eventData=" + Arrays.toString(eventData) + "]"; } }
為了可以傳送和接受這個訊息持有物件,我們還需要需要一個用來序列化和反序列化的工廠
public interface CodecFactory {
byte[] serialize(Object obj) throws IOException;
Object deSerialize(byte[] in) throws IOException;
}
下面是編碼解碼的實現類,用了hessian來實現,大家可以自行選擇序列化方式
public class HessionCodecFactory implements CodecFactory { private final Logger logger = Logger.getLogger(HessionCodecFactory.class); @Override public byte[] serialize(Object obj) throws IOException { ByteArrayOutputStream baos = null; HessianOutput output = null; try { baos = new ByteArrayOutputStream(1024); output = new HessianOutput(baos); output.startCall(); output.writeObject(obj); output.completeCall(); } catch (final IOException ex) { throw ex; } finally { if (output != null) { try { baos.close(); } catch (final IOException ex) { this.logger.error("Failed to close stream.", ex); } } } return baos != null ? baos.toByteArray() : null; } @Override public Object deSerialize(byte[] in) throws IOException { Object obj = null; ByteArrayInputStream bais = null; HessianInput input = null; try { bais = new ByteArrayInputStream(in); input = new HessianInput(bais); input.startReply(); obj = input.readObject(); input.completeReply(); } catch (final IOException ex) { throw ex; } catch (final Throwable e) { this.logger.error("Failed to decode object.", e); } finally { if (input != null) { try { bais.close(); } catch (final IOException ex) { this.logger.error("Failed to close stream.", ex); } } } return obj; } }
接下來就先實現傳送功能,新增一個介面專門用來實現傳送功能
public interface EventTemplate {
void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
}
SendRefuseException是自定義的傳送失敗異常類
下面是它的實現類,主要的任務就是將資料轉換為EventMessage
public class DefaultEventTemplate implements EventTemplate {
private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class);
private AmqpTemplate eventAmqpTemplate;
private CodecFactory defaultCodecFactory;
// private DefaultEventController eec;
//
// public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,
// CodecFactory defaultCodecFactory, DefaultEventController eec) {
// this.eventAmqpTemplate = eopAmqpTemplate;
// this.defaultCodecFactory = defaultCodecFactory;
// this.eec = eec;
// }
public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) {
this.eventAmqpTemplate = eopAmqpTemplate;
this.defaultCodecFactory = defaultCodecFactory;
}
@Override
public void send(String queueName, String exchangeName, Object eventContent)
throws SendRefuseException {
this.send(queueName, exchangeName, eventContent, defaultCodecFactory);
}
@Override
public void send(String queueName, String exchangeName, Object eventContent,
CodecFactory codecFactory) throws SendRefuseException {
if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) {
throw new SendRefuseException("queueName exchangeName can not be empty.");
}
// if (!eec.beBinded(exchangeName, queueName))
// eec.declareBinding(exchangeName, queueName);
byte[] eventContentBytes = null;
if (codecFactory == null) {
if (eventContent == null) {
logger.warn("Find eventContent is null,are you sure...");
} else {
throw new SendRefuseException(
"codecFactory must not be null ,unless eventContent is null");
}
} else {
try {
eventContentBytes = codecFactory.serialize(eventContent);
} catch (IOException e) {
throw new SendRefuseException(e);
}
}
// 構造成Message
EventMessage msg = new EventMessage(queueName, exchangeName,
eventContentBytes);
try {
eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg);
} catch (AmqpException e) {
logger.error("send event fail. Event Message : [" + eventContent + "]", e);
throw new SendRefuseException("send event fail", e);
}
}
}
註釋的地方稍後會用到,主要是防止資料資料傳送的地方沒有事先宣告
然後我們再實現接受訊息
首先我們需要一個消費介面,所有的消費程式都實現這個類
public interface EventProcesser {
public void process(Object e);
}
為了能夠將不同型別的訊息交由對應的程式來處理,我們還需要一個訊息處理介面卡
/**
* MessageListenerAdapter的Pojo
* <p>訊息處理介面卡,主要功能:</p>
* <p>1、將不同的訊息型別繫結到對應的處理器並本地快取,如將queue01+exchange01的訊息統一交由A處理器來出來</p>
* <p>2、執行訊息的消費分發,呼叫相應的處理器來消費屬於它的訊息</p>
*
*/
public class MessageAdapterHandler {
private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
private ConcurrentMap<String, EventProcessorWrap> epwMap;
public MessageAdapterHandler() {
this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
}
public void handleMessage(EventMessage eem) {
logger.debug("Receive an EventMessage: [" + eem + "]");
// 先要判斷接收到的message是否是空的,在某些異常情況下,會產生空值
if (eem == null) {
logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
return;
}
if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
return;
}
// 解碼,並交給對應的EventHandle執行
EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
if (eepw == null) {
logger.warn("Receive an EopEventMessage, but no processor can do it.");
return;
}
try {
eepw.process(eem.getEventData());
} catch (IOException e) {
logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
return;
}
}
protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
}
EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
if (oldProcessorWrap != null) {
logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
}
}
protected Set<String> getAllBinding() {
Set<String> keySet = epwMap.keySet();
return keySet;
}
protected static class EventProcessorWrap {
private CodecFactory codecFactory;
private EventProcesser eep;
protected EventProcessorWrap(CodecFactory codecFactory,
EventProcesser eep) {
this.codecFactory = codecFactory;
this.eep = eep;
}
public void process(byte[] eventData) throws IOException{
Object obj = codecFactory.deSerialize(eventData);
eep.process(obj);
}
}
}
這是正常情況下的訊息處理方式,如果rabbitmq訊息接受發生異常,也要監控到,新增一個消費類專門用來處理錯誤異常的訊息
public class MessageErrorHandler implements ErrorHandler{
private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
@Override
public void handleError(Throwable t) {
logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
}
}
接下來我們可能需要一個專門配置和rabbitmq通訊的一些資訊,比如地址,埠等資訊
public class EventControlConfig {
private final static int DEFAULT_PORT = 5672;
private final static String DEFAULT_USERNAME = "guest";
private final static String DEFAULT_PASSWORD = "guest";
private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
private static final int PREFETCH_SIZE = 1;
private String serverHost ;
private int port = DEFAULT_PORT;
private String username = DEFAULT_USERNAME;
private String password = DEFAULT_PASSWORD;
private String virtualHost;
/**
* 和rabbitmq建立連線的超時時間
*/
private int connectionTimeout = 0;
/**
* 事件訊息處理執行緒數,預設是 CPU核數 * 2
*/
private int eventMsgProcessNum;
/**
* 每次消費訊息的預取值
*/
private int prefetchSize;
public EventControlConfig(String serverHost) {
this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory());
}
public EventControlConfig(String serverHost, int port, String username,
String password, String virtualHost, int connectionTimeout,
int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) {
this.serverHost = serverHost;
this.port = port>0?port:DEFAULT_PORT;
this.username = username;
this.password = password;
this.virtualHost = virtualHost;
this.connectionTimeout = connectionTimeout>0?connectionTimeout:0;
this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM;
this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE;
}
public String getServerHost() {
return serverHost;
}
public int getPort() {
return port;
}
public String getUsername() {
return username;
}
public String getPassword() {
return password;
}
public String getVirtualHost() {
return virtualHost;
}
public int getConnectionTimeout() {
return connectionTimeout;
}
public int getEventMsgProcessNum() {
return eventMsgProcessNum;
}
public int getPrefetchSize() {
return prefetchSize;
}
}
具體的傳送、接受程式已經好了,接下來也是最重要的就是管理控制和rabbitmq的通訊
public interface EventController {
/**
* 控制器啟動方法
*/
void start();
/**
* 獲取傳送模版
*/
EventTemplate getEopEventTemplate();
/**
* 繫結消費程式到對應的exchange和queue
*/
EventController add(String queueName, String exchangeName, EventProcesser eventProcesser);
/*in map, the key is queue name, but value is exchange name*/
EventController add(Map<String,String> bindings, EventProcesser eventProcesser);
}
它的實現類如下:
/**
* 和rabbitmq通訊的控制器,主要負責:
* <p>1、和rabbitmq建立連線</p>
* <p>2、宣告exChange和queue以及它們的繫結關係</p>
* <p>3、啟動訊息監聽容器,並將不同訊息的處理者繫結到對應的exchange和queue上</p>
* <p>4、持有訊息傳送模版以及所有exchange、queue和繫結關係的本地快取</p>
* @author yangyong
*
*/
public class DefaultEventController implements EventController {
private CachingConnectionFactory rabbitConnectionFactory;
private EventControlConfig config;
private RabbitAdmin rabbitAdmin;
private CodecFactory defaultCodecFactory = new HessionCodecFactory();
private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container
private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();
private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定
//queue cache, key is exchangeName
private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();
//queue cache, key is queueName
private Map<String, Queue> queues = new HashMap<String, Queue>();
//bind relation of queue to exchange cache, value is exchangeName | queueName
private Set<String> binded = new HashSet<String>();
private EventTemplate eventTemplate; // 給App使用的Event傳送客戶端
private AtomicBoolean isStarted = new AtomicBoolean(false);
private static DefaultEventController defaultEventController;
public synchronized static DefaultEventController getInstance(EventControlConfig config){
if(defaultEventController==null){
defaultEventController = new DefaultEventController(config);
}
return defaultEventController;
}
private DefaultEventController(EventControlConfig config){
if (config == null) {
throw new IllegalArgumentException("Config can not be null.");
}
this.config = config;
initRabbitConnectionFactory();
// 初始化AmqpAdmin
rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
// 初始化RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
rabbitTemplate.setMessageConverter(serializerMessageConverter);
eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);
}
/**
* 初始化rabbitmq連線
*/
private void initRabbitConnectionFactory() {
rabbitConnectionFactory = new CachingConnectionFactory();
rabbitConnectionFactory.setHost(config.getServerHost());
rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
rabbitConnectionFactory.setPort(config.getPort());
rabbitConnectionFactory.setUsername(config.getUsername());
rabbitConnectionFactory.setPassword(config.getPassword());
if (!StringUtils.isEmpty(config.getVirtualHost())) {
rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
}
}
/**
* 登出程式
*/
public synchronized void destroy() throws Exception {
if (!isStarted.get()) {
return;
}
msgListenerContainer.stop();
eventTemplate = null;
rabbitAdmin = null;
rabbitConnectionFactory.destroy();
}
@Override
public void start() {
if (isStarted.get()) {
return;
}
Set<String> mapping = msgAdapterHandler.getAllBinding();
for (String relation : mapping) {
String[] relaArr = relation.split("\\|");
declareBinding(relaArr[1], relaArr[0]);
}
initMsgListenerAdapter();
isStarted.set(true);
}
/**
* 初始化訊息監聽器容器
*/
private void initMsgListenerAdapter(){
MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);
msgListenerContainer = new SimpleMessageListenerContainer();
msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
msgListenerContainer.setMessageListener(listener);
msgListenerContainer.setErrorHandler(new MessageErrorHandler());
msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 設定每個消費者訊息的預取值
msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
msgListenerContainer.setTxSize(config.getPrefetchSize());//設定有事務時處理的訊息數
msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));
msgListenerContainer.start();
}
@Override
public EventTemplate getEopEventTemplate() {
return eventTemplate;
}
@Override
public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {
return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
}
public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {
msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
if(isStarted.get()){
initMsgListenerAdapter();
}
return this;
}
@Override
public EventController add(Map<String, String> bindings,
EventProcesser eventProcesser) {
return add(bindings, eventProcesser,defaultCodecFactory);
}
public EventController add(Map<String, String> bindings,
EventProcesser eventProcesser, CodecFactory codecFactory) {
for(Map.Entry<String, String> item: bindings.entrySet())
msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
return this;
}
/**
* exchange和queue是否已經繫結
*/
protected boolean beBinded(String exchangeName, String queueName) {
return binded.contains(exchangeName+"|"+queueName);
}
/**
* 宣告exchange和queue已經它們的繫結關係
*/
protected synchronized void declareBinding(String exchangeName, String queueName) {
String bindRelation = exchangeName+"|"+queueName;
if (binded.contains(bindRelation)) return;
boolean needBinding = false;
DirectExchange directExchange = exchanges.get(exchangeName);
if(directExchange == null) {
directExchange = new DirectExchange(exchangeName, true, false, null);
exchanges.put(exchangeName, directExchange);
rabbitAdmin.declareExchange(directExchange);//宣告exchange
needBinding = true;
}
Queue queue = queues.get(queueName);
if(queue == null) {
queue = new Queue(queueName, true, false, false);
queues.put(queueName, queue);
rabbitAdmin.declareQueue(queue); //宣告queue
needBinding = true;
}
if(needBinding) {
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//將queue繫結到exchange
rabbitAdmin.declareBinding(binding);//宣告繫結關係
binded.add(bindRelation);
}
}
}
搞定,現在可以將DefaultEventTemplate裡的註釋去掉了,接下來最後完成單元測試,為了測試傳遞物件,建立一個PO
@SuppressWarnings("serial")
public class People implements Serializable{
private int id;
private String name;
private boolean male;
private People spouse;
private List<People> friends;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isMale() {
return male;
}
public void setMale(boolean male) {
this.male = male;
}
public People getSpouse() {
return spouse;
}
public void setSpouse(People spouse) {
this.spouse = spouse;
}
public List<People> getFriends() {
return friends;
}
public void setFriends(List<People> friends) {
this.friends = friends;
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "People[id="+id+",name="+name+",male="+male+"]";
}
}
建立單元測試
public class RabbitMqTest{
private String defaultHost = "127.0.0.1";
private String defaultExchange = "EXCHANGE_DIRECT_TEST";
private String defaultQueue = "QUEUE_TEST";
private DefaultEventController controller;
private EventTemplate eventTemplate;
@Before
public void init() throws IOException{
EventControlConfig config = new EventControlConfig(defaultHost);
controller = DefaultEventController.getInstance(config);
eventTemplate = controller.getEopEventTemplate();
controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
controller.start();
}
@Test
public void sendString() throws SendRefuseException{
eventTemplate.send(defaultQueue, defaultExchange, "hello world");
}
@Test
public void sendObject() throws SendRefuseException{
eventTemplate.send(defaultQueue, defaultExchange, mockObj());
}
@Test
public void sendTemp() throws SendRefuseException, InterruptedException{
String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未宣告的exchange
String tempQueue = "QUEUE_TEST_TEMP";//以前未宣告的queue
eventTemplate.send(tempQueue, tempExchange, mockObj());
//傳送成功後此時不會接受到訊息,還需要繫結對應的消費程式
controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
}
@After
public void end() throws InterruptedException{
Thread.sleep(2000);
}
private People mockObj(){
People jack = new People();
jack.setId(1);
jack.setName("JACK");
jack.setMale(true);
List<People> friends = new ArrayList<>();
friends.add(jack);
People hanMeiMei = new People();
hanMeiMei.setId(1);
hanMeiMei.setName("韓梅梅");
hanMeiMei.setMale(false);
hanMeiMei.setFriends(friends);
People liLei = new People();
liLei.setId(2);
liLei.setName("李雷");
liLei.setMale(true);
liLei.setFriends(friends);
liLei.setSpouse(hanMeiMei);
hanMeiMei.setSpouse(liLei);
return hanMeiMei;
}
class ApiProcessEventProcessor implements EventProcesser{
@Override
public void process(Object e) {//消費程式這裡只是列印資訊
Assert.assertNotNull(e);
System.out.println(e);
if(e instanceof People){
People people = (People)e;
System.out.println(people.getSpouse());
System.out.println(people.getFriends());
}
}
}
}
原始碼地址請點選這裡