RabbitMQ:RabbitMQ + Spring配置檔案rabbit標籤
阿新 • • 發佈:2018-11-10
RabbitMQ:RabbitMQ + Spring配置檔案rabbit標籤
1.消費者配置檔案和啟動類:
【Consumer.xml】:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 連線服務配置 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="caoxia" password="caoxia123456" port="5672" virtual-host="/" channel-cache-size="5" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 佇列宣告--> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/> <!-- exchange queue binging key 繫結 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/> </rabbit:bindings> </rabbit:direct-exchange> <bean id="receiveMessageListener" class="com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml.ReceiveMessageListener" /> <!-- queue litener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" /> </rabbit:listener-container> <!--<!– spring template宣告 –>--> <!--<rabbit:template id="amqpTemplate" exchange="spring.queue.exchange" routing-key="spring.queue.tag.key"--> <!--connection-factory="connectionFactory" message-converter="jsonMessageConverter" />--> <!--<bean id="jsonMessageConverter"--> <!--class="com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml.Gson2JsonMessageConverter" />--> </beans>
【ConsumerMain.java】:
package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml; /** * Created by nazi on 2018/7/30. */ import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ConsumerMain { public static void main(String[] args) { new ClassPathXmlApplicationContext("spring-rabbit-label/Consumer.xml"); } }
2.生產者配置檔案和啟動類:
【Producer.xml】:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 連線服務配置 --> <rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="caoxia" password="caoxia123456" port="5672" virtual-host="/" channel-cache-size="5" /> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 佇列宣告 --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag" /> <!-- 轉發器型別標籤:rabbit:fanout-exchange、rabbit:direct-exchange、rabbit:topic-exchange、rabbit:headers-exchange--> <!-- exchange queue binging key 繫結 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於Gson的速度快於jackson,這裡替換為Gson的一個實現 --> <bean id="jsonMessageConverter" class="com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml.Gson2JsonMessageConverter" /> <!-- spring template宣告 --> <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange" routing-key="spring.queue.tag.key" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 建立rabbitTemplate 訊息模板類 --> <!--<bean id="rabbitTemplate"--> <!--class="org.springframework.amqp.rabbit.core.RabbitTemplate">--> <!--<constructor-arg ref="connectionFactory"></constructor-arg>--> <!--<property name="queue" value="spring.queue.tag"></property>--> <!--<property name="routingKey" value="spring.queue.tag.key"></property>--> <!--</bean>--> </beans>
【ProducerMain.java】:
package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml;
/**
* Created by nazi on 2018/7/30.
*/
import com.caox.sharding.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProducerMain {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("spring-rabbit-label/Producer.xml");
AmqpTemplate amqpTemplate = context.getBean(RabbitTemplate.class);
User user = new User();
user.setName("niuniu");
amqpTemplate.convertAndSend(user);
}
}
3.實現一個訊息監聽器ReceiveMessageListener.java
package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml;
import org.springframework.amqp.core.Message;
/**
* Listener interface to receive asynchronous delivery of Amqp Messages.
*
* Created by nazi on 2018/7/30.
*/
public interface MessageListener {
void onMessage(Message message);
}
4.messageconver外掛實現(Gson)
spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於Gson的速度快於jackson,這裡替換為Gson的一個實現
【pom.xml引入jar包】:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3</version>
</dependency>
【Gson2JsonMessageConverter.java 】:
package com.caox.rabbitmq.demo._13_spring_rabbitmq_label_xml;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;
import com.google.gson.Gson;
/**
* Created by nazi on 2018/7/30.
* 由於考慮到效率,如下使用Gson實現訊息轉換。
*/
public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
private static ClassMapper classMapper = new DefaultClassMapper();
private static Gson gson = new Gson();
public Gson2JsonMessageConverter() {
super();
}
@Override
protected Message createMessage(Object object,
MessageProperties messageProperties) {
byte[] bytes = null;
try {
String jsonString = gson.toJson(object);
bytes = jsonString.getBytes(getDefaultCharset());
}
catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(getDefaultCharset());
if (bytes != null) {
messageProperties.setContentLength(bytes.length);
}
classMapper.fromClass(object.getClass(),messageProperties);
return new Message(bytes, messageProperties);
}
@Override
public Object fromMessage(Message message)
throws MessageConversionException {
Object content = null;
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
String contentType = properties.getContentType();
if (contentType != null && contentType.contains("json")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = getDefaultCharset();
}
try {
Class<?> targetClass = getClassMapper().toClass(
message.getMessageProperties());
content = convertBytesToObject(message.getBody(),
encoding, targetClass);
}
catch (IOException e) {
throw new MessageConversionException(
"Failed to convert Message content", e);
}
}
else {
log.warn("Could not convert incoming message with content-type ["
+ contentType + "]");
}
}
if (content == null) {
content = message.getBody();
}
return content;
}
private Object convertBytesToObject(byte[] body, String encoding,
Class<?> clazz) throws UnsupportedEncodingException {
String contentAsString = new String(body, encoding);
return gson.fromJson(contentAsString, clazz);
}
}