1. 程式人生 > >RabbitMq 結合Spring

RabbitMq 結合Spring

背景:前期再學習RabbitMq時,只是基於main方法的形式進行學習,但是實際中使用是結合spring 或者是spring boot的,所以就搜尋部落格檢視RabbitMq和Spring的整合方式,開始摸著石頭過河,估計是運氣不好,找了好多demo都是有bug,或者專案報錯,基於看了好多部落格,spring配置檔案配置的方式大致相同,於是開始邊摸索邊參考一些文件敲出一下demo;以下demo經測試可以正常執行,直接上程式碼;

spirng-mq.xml配置檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	 xsi:schemaLocation="http://www.springframework.org/schema/beans
			http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
			http://www.springframework.org/schema/rabbit
			http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
			">

	<!-- 載入配置檔案 -->
	<rabbit:connection-factory id="connectionFactory"
							   username="root"
							   password="root"
								virtual-host="/"
							   host="localhost"
							   port="5672"
							   />
    <!--定義mq管理-->
	<rabbit:admin connection-factory="connectionFactory"/>

	<!--宣告佇列-->
	<rabbit:queue name="queue_one" durable="true"/>
	<rabbit:queue name="queue_two" durable="true"/>

	<!--定義交換機繫結佇列-->
	<rabbit:direct-exchange name="IExchange" id="IExchange">
		<rabbit:bindings>
			<rabbit:binding queue="queue_one" key="queue_one_key"></rabbit:binding>
			<rabbit:binding queue="queue_two" key="queue_two_key"></rabbit:binding>
		</rabbit:bindings>
	</rabbit:direct-exchange>

	<!--訊息物件轉json類-->
	<bean id="jsonMessageConverter" class="com.hsnn.medstgmini.drug.rabbitmq.FastJsonMessageConverter"/>

	<!--定義模板-->
	<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="IExchange" message-converter="jsonMessageConverter"/>

	<!--定義消費者-->
	<bean name="Consume1Handler" class="com.hsnn.medstgmini.drug.rabbitmq.Consume1Handler"/>
	<bean name="Consume2Handler" class="com.hsnn.medstgmini.drug.rabbitmq.Consume2Handler"/>

	<!--消費者監聽佇列-->
	<rabbit:listener-container
		connection-factory="connectionFactory">
		<rabbit:listener ref="Consume1Handler" queues="queue_one"/>
		<rabbit:listener ref="Consume2Handler"  queues="queue_two"/>
 	</rabbit:listener-container>
		

</beans>

訊息物件轉json類預設採用的fackjson,但是考慮到效率問題,修改成FastJson進行物件json轉換操作,class類內容如下;

package com.hsnn.medstgmini.drug.rabbitmq;

import java.io.UnsupportedEncodingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;

import com.alibaba.fastjson.JSONObject;

public class FastJsonMessageConverter extends AbstractMessageConverter {

    @SuppressWarnings("unused")
    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

    public static final String DEFAULT_CHARSET = "UTF-8";

    private volatile String defaultCharset = DEFAULT_CHARSET;

    public FastJsonMessageConverter() {
        super();
    }

    public void setDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset != null) ? defaultCharset
                : DEFAULT_CHARSET;
    }

    public Object fromMessage(Message message)
            throws MessageConversionException {
        Object o = new CommonMessage();
        try{
            o = fromMessage(message, new CommonMessage());
        }catch(Exception e){
            log.error("queue message error : " + message);
            e.printStackTrace();
        }
        return o;
    }

    @SuppressWarnings("unchecked")
    public <T> T fromMessage(Message message, T t) {
        String json = "";
        try {
            json = new String(message.getBody(), defaultCharset);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return (T) JSONObject.parseObject(json, t.getClass());
    }

    protected Message createMessage(Object objectToConvert,
                                    MessageProperties messageProperties)
            throws MessageConversionException {
        byte[] bytes = null;
        try {
            String jsonString = JSONObject.toJSONString(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "Failed to convert Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if (bytes != null) {
            messageProperties.setContentLength(bytes.length);
        }
        return new Message(bytes, messageProperties);

    }
}

message 訊息模型類如下:

package com.hsnn.medstgmini.drug.rabbitmq;
 
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
 
/**
 * 訊息模型
 *
 */
public class CommonMessage {
	/**
	 * 約定的幾個訊息源名稱
	 */
	private String source;
	/**
	 * 實體表名
	 */
	private String table;
	/**
	 * 主鍵
	 */
	private String primaryKey;
	/**
	 * 訊息實體bean
	 */
	private Object message;
 
	public String getSource() {
		return source;
	}
 
	public void setSource(String source) {
		this.source = source;
	}
 
	public String getTable() {
		return table;
	}
 
	public void setTable(String table) {
		this.table = table;
	}
 
 
	public String getPrimaryKey() {
		return primaryKey;
	}
 
	public void setPrimaryKey(String primaryKey) {
		this.primaryKey = primaryKey;
	}
 
	public Object getMessage() {
		return message;
	}
 
	public void setMessage(Object message) {
		this.message = message;
	}
 
	@Override
	public String toString() {
		return ToStringBuilder.reflectionToString(this,
				ToStringStyle.DEFAULT_STYLE);
	}
	
}

定義兩個消費者:進行消費

package com.hsnn.medstgmini.drug.rabbitmq;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.IOException;

/*
*消費者1
*
*/
public class Consume1Handler implements MessageListener {
    private static final ObjectMapper mapper = new ObjectMapper();

    @Override
    public void onMessage(Message message) {
        try {
            JsonNode jsonNode =mapper.readTree(message.getBody());
            System.out.println(jsonNode.get("id").asText()+":"+jsonNode.get("name").asText());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
package com.hsnn.medstgmini.drug.rabbitmq;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

import java.io.IOException;
/*
*消費者2
*/
public class Consume2Handler implements MessageListener {
    private static final ObjectMapper mapper = new ObjectMapper();

    @Override
    public void onMessage(Message message) {
        try {
            JsonNode jsonNode =mapper.readTree(message.getBody());
            System.out.println(jsonNode.get("id").asText()+":"+jsonNode.get("name").asText());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

測試類:

  @RequestMapping("toCheckList")
    public String toCheckList() {
        Map map = new HashMap();
        map.put("id","001");
        map.put("name","酒鬼");
        rabbitTemplate.convertAndSend("queue_one_key",map);

        map.put("id","002");
        map.put("name","撕家");
        rabbitTemplate.convertAndSend("queue_two_key",map);
        return MODEL_PATH + "checkList";

    }

專案啟動後,訪問相應路徑,會在控制檯列印:

001:酒鬼

002:撕家

到這裡配置就結束了,但是在配置過程中也踩過了一些坑,這裡也貼出來,共勉

遇到的坑:啟動過程中出現報錯,錯誤資訊如下:

org.springframework.amqp.AmqpIOException: java.io.IOException
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)
	at org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.createBareConnection(AbstractConnectionFactory.java:218)

遇到問題後百度嗎,但是結果都不是我想要的,百度的結果都是RabbitMq許可權問題,只需要登入RabbitMq進行重置許可權,如下

但是我重置後還是報錯,然後開始排查問題,去RabbitMq官網檢視,發現是連線時埠號配置錯誤導致的異常,RabbitMq的埠號資訊如下:

4369 -- erlang發現口

5672 --client端通訊口   --RabbitMq cliean連線埠號,及spring 配置中連線的prot值

15672 -- 管理介面ui埠  --RabbitMq管理訪問埠

25672 -- server間內部通訊口