spring 集成rabbitmq
mq.properties
mq.host=主機ip mq.username=admin mq.password=admin123 mq.port=5672 mq.queue.vip=test-queue mq.exchange=test-exchange mq.vhost=test
spring-rabbitmq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- RabbitMQ start -->
<!-- 連接配置 -->
<rabbit:connection-factory id="mqConnectionFactory" virtual-host="${mq.vhost}"
host="${mq.host}" username="${mq.username}" password="${mq.password}"
port="${mq.port}" />
<rabbit:admin connection-factory="mqConnectionFactory" />
<!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換為json存入消息隊列,由於fastjson的速度快於jackson,這裏替換為fastjson的一個實現 -->
<bean id="jsonMessageConverter" class="com.mq.util.FastJsonMessageConverter"></bean>
<!-- 消息隊列客戶端 -->
<rabbit:template id="amqpTemplate" exchange="${mq.exchange}"
connection-factory="mqConnectionFactory" message-converter="jsonMessageConverter" />
<!-- queue 隊列聲明 -->
<!-- durable 是否持久化 exclusive 僅創建者可以使用的私有隊列,斷開後自動刪除 auto-delete 當所有消費端連接斷開後,是否自動刪除隊列 -->
<rabbit:queue id="my_queue_vip" name="${mq.queue.vip}"
durable="true" auto-delete="false" exclusive="false" />
<!-- 交換機定義 -->
<!-- 交換機:一個交換機可以綁定多個隊列,一個隊列也可以綁定到多個交換機上。 如果沒有隊列綁定到交換機上,則發送到該交換機上的信息則會丟失。
direct模式:消息與一個特定的路由器完全匹配,才會轉發 topic模式:按規則轉發消息,最靈活 -->
<rabbit:direct-exchange name="${mq.exchange}"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="my_queue_vip" key="vip_key"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 配置監聽 消費者 -->
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="my_queue" ref="rabbitmqService" />
</rabbit:listener-container>
</beans>
FastJsonMessageConverter
package com.pptv.ucm.mq.util; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.ClassMapper; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.MessageConversionException; import com.alibaba.fastjson.JSON; /** * @類名:FastJsonMessageConverter . * @描述: ***** . * @作者: yakunMeng . * @創建時間: 2017年8月11日 上午10:08:00 . * @版本號: V1.0 . */ public class FastJsonMessageConverter extends AbstractJsonMessageConverter { private static Log log = LogFactory.getLog(FastJsonMessageConverter.class); private static ClassMapper classMapper = new DefaultClassMapper(); public FastJsonMessageConverter() { super(); } @Override protected Message createMessage(Object object, MessageProperties messageProperties) { byte[] bytes = null; try { String jsonString = JSON.toJSONString(object); bytes = jsonString.getBytes(getDefaultCharset()); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(getDefaultCharset()); if (bytes != null) { messageProperties.setContentLength(bytes.length); } classMapper.fromClass(object.getClass(), messageProperties); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.contains("json")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = getDefaultCharset(); } try { Class<?> targetClass = getClassMapper().toClass(message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException("Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return JSON.parseObject(contentAsString, clazz); } }
RabbitmqService
package com.pptv.ucm.service.impl; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.util.List; import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import com.pptv.ucm.common.util.ImportExecl; import com.pptv.ucm.common.util.StringUtil; import net.sf.json.JSONObject; /** * rabbitmq監聽消息,消費者 * * @author st * */ public class RabbitmqService implements MessageListener { private static Logger log = LoggerFactory.getLogger(RabbitmqService.class); public void onMessage(Message message) { log.info("消息消費者 = " + message); String content = null; try { content = new String(message.getBody(), "utf-8"); if (StringUtil.isNotBlank(content)) { JSONObject object = JSONObject.fromObject(content.toString()); String batch_code = !object.has("batch_code") ? "" : object.getString("batch_code"); String file_url = !object.has("file_url") ? "" : object.getString("file_url"); log.info("batch_code=" + batch_code + "file_url=" + file_url); String localExcelPath = ImportExecl.getDiskPath(file_url); System.out.println(localExcelPath); ImportExecl poi = new ImportExecl(); List<List<String>> list = poi.read(localExcelPath); if (list != null) { for (int i = 0; i < list.size(); i++) { System.out.print("第" + (i) + "行"); List<String> cellList = list.get(i); for (int j = 0; j < cellList.size(); j++) { System.out.print(" " + cellList.get(j)); } System.out.println(); } } new File(localExcelPath).delete();//刪除本地文件 } } catch (Exception e) { log.info("報錯了e= " + e.getMessage()); e.printStackTrace(); } log.info(content); } }
MQProducerImpl
package com.pptv.ucm.service.impl; import javax.annotation.Resource; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.pptv.ucm.service.IMQProducer; @Service public class MQProducerImpl implements IMQProducer { @Value(value = "${mq.queue}") private String queueId; @Value(value = "${mq.exchange}") private String mqExchange; @Value(value = "${mq.patt}") private String mqPatt; @Resource private AmqpTemplate amqpTemplate; public void sendQueue(Object object) { // convertAndSend 將Java對象轉換為消息發送至匹配key的交換機中Exchange amqpTemplate.convertAndSend(mqExchange, mqPatt, object); } }
spring 集成rabbitmq