RabbitMq 結合Spring
阿新 • • 發佈:2018-11-01
背景:前期再學習RabbitMq時,只是基於main方法的形式進行學習,但是實際中使用是結合spring 或者是spring boot的,所以就搜尋部落格檢視RabbitMq和Spring的整合方式,開始摸著石頭過河,估計是運氣不好,找了好多demo都是有bug,或者專案報錯,基於看了好多部落格,spring配置檔案配置的方式大致相同,於是開始邊摸索邊參考一些文件敲出一下demo;以下demo經測試可以正常執行,直接上程式碼;
spirng-mq.xml配置檔案
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd "> <!-- 載入配置檔案 --> <rabbit:connection-factory id="connectionFactory" username="root" password="root" virtual-host="/" host="localhost" port="5672" /> <!--定義mq管理--> <rabbit:admin connection-factory="connectionFactory"/> <!--宣告佇列--> <rabbit:queue name="queue_one" durable="true"/> <rabbit:queue name="queue_two" durable="true"/> <!--定義交換機繫結佇列--> <rabbit:direct-exchange name="IExchange" id="IExchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"></rabbit:binding> <rabbit:binding queue="queue_two" key="queue_two_key"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--訊息物件轉json類--> <bean id="jsonMessageConverter" class="com.hsnn.medstgmini.drug.rabbitmq.FastJsonMessageConverter"/> <!--定義模板--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="IExchange" message-converter="jsonMessageConverter"/> <!--定義消費者--> <bean name="Consume1Handler" class="com.hsnn.medstgmini.drug.rabbitmq.Consume1Handler"/> <bean name="Consume2Handler" class="com.hsnn.medstgmini.drug.rabbitmq.Consume2Handler"/> <!--消費者監聽佇列--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="Consume1Handler" queues="queue_one"/> <rabbit:listener ref="Consume2Handler" queues="queue_two"/> </rabbit:listener-container> </beans>
訊息物件轉json類預設採用的fackjson,但是考慮到效率問題,修改成FastJson進行物件json轉換操作,class類內容如下;
package com.hsnn.medstgmini.drug.rabbitmq; import java.io.UnsupportedEncodingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import com.alibaba.fastjson.JSONObject; public class FastJsonMessageConverter extends AbstractMessageConverter { @SuppressWarnings("unused") private static Log log = LogFactory.getLog(FastJsonMessageConverter.class); public static final String DEFAULT_CHARSET = "UTF-8"; private volatile String defaultCharset = DEFAULT_CHARSET; public FastJsonMessageConverter() { super(); } public void setDefaultCharset(String defaultCharset) { this.defaultCharset = (defaultCharset != null) ? defaultCharset : DEFAULT_CHARSET; } public Object fromMessage(Message message) throws MessageConversionException { Object o = new CommonMessage(); try{ o = fromMessage(message, new CommonMessage()); }catch(Exception e){ log.error("queue message error : " + message); e.printStackTrace(); } return o; } @SuppressWarnings("unchecked") public <T> T fromMessage(Message message, T t) { String json = ""; try { json = new String(message.getBody(), defaultCharset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return (T) JSONObject.parseObject(json, t.getClass()); } protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; try { String jsonString = JSONObject.toJSONString(objectToConvert); bytes = jsonString.getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(this.defaultCharset); if (bytes != null) { messageProperties.setContentLength(bytes.length); } return new Message(bytes, messageProperties); } }
message 訊息模型類如下:
package com.hsnn.medstgmini.drug.rabbitmq; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; /** * 訊息模型 * */ public class CommonMessage { /** * 約定的幾個訊息源名稱 */ private String source; /** * 實體表名 */ private String table; /** * 主鍵 */ private String primaryKey; /** * 訊息實體bean */ private Object message; public String getSource() { return source; } public void setSource(String source) { this.source = source; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } public String getPrimaryKey() { return primaryKey; } public void setPrimaryKey(String primaryKey) { this.primaryKey = primaryKey; } public Object getMessage() { return message; } public void setMessage(Object message) { this.message = message; } @Override public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE); } }
定義兩個消費者:進行消費
package com.hsnn.medstgmini.drug.rabbitmq;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import java.io.IOException;
/*
*消費者1
*
*/
public class Consume1Handler implements MessageListener {
private static final ObjectMapper mapper = new ObjectMapper();
@Override
public void onMessage(Message message) {
try {
JsonNode jsonNode =mapper.readTree(message.getBody());
System.out.println(jsonNode.get("id").asText()+":"+jsonNode.get("name").asText());
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.hsnn.medstgmini.drug.rabbitmq;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import java.io.IOException;
/*
*消費者2
*/
public class Consume2Handler implements MessageListener {
private static final ObjectMapper mapper = new ObjectMapper();
@Override
public void onMessage(Message message) {
try {
JsonNode jsonNode =mapper.readTree(message.getBody());
System.out.println(jsonNode.get("id").asText()+":"+jsonNode.get("name").asText());
} catch (IOException e) {
e.printStackTrace();
}
}
}
測試類:
@RequestMapping("toCheckList")
public String toCheckList() {
Map map = new HashMap();
map.put("id","001");
map.put("name","酒鬼");
rabbitTemplate.convertAndSend("queue_one_key",map);
map.put("id","002");
map.put("name","撕家");
rabbitTemplate.convertAndSend("queue_two_key",map);
return MODEL_PATH + "checkList";
}
專案啟動後,訪問相應路徑,會在控制檯列印:
001:酒鬼
002:撕家
到這裡配置就結束了,但是在配置過程中也踩過了一些坑,這裡也貼出來,共勉
遇到的坑:啟動過程中出現報錯,錯誤資訊如下:
org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)
at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:218)
遇到問題後百度嗎,但是結果都不是我想要的,百度的結果都是RabbitMq許可權問題,只需要登入RabbitMq進行重置許可權,如下
但是我重置後還是報錯,然後開始排查問題,去RabbitMq官網檢視,發現是連線時埠號配置錯誤導致的異常,RabbitMq的埠號資訊如下:
4369 -- erlang發現口
5672 --client端通訊口 --RabbitMq cliean連線埠號,及spring 配置中連線的prot值
15672 -- 管理介面ui埠 --RabbitMq管理訪問埠
25672 -- server間內部通訊口