RabbitMQ整合Spring配置檔案詳解
阿新 • • 發佈:2018-12-07
一、rabbitmq 配置檔案 在web 專案開發過程中,一般分為生產者配置檔案和消費者配置檔案。
一、準備工作
安裝好rabbitmq,並在專案中增加配置檔案 rabbit.properties 內容如下:
rmq.ip=192.188.113.114
rmq.port=5672
rmq.producer.num=20
rmq.manager.user=admin
rmq.manager.password=admin
二、生產者配置檔案:producer.xml
- 配置連線類的bean
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <property name="username" value="${rmq.manager.user}" /> <property name="password" value="${rmq.manager.password}" /> <property name="host" value="${rmq.ip}" /> <property name="port" value="${rmq.port}" /> </bean>
- spring amqp預設的是jackson 的一個外掛,目的將生產者生產的資料轉換為json存入訊息佇列,由於fastjson的速度快於jackson,這裡替換為fastjson的一個實現,(可以替換成任意的MessageConvert實現)
<bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean>
- spring template 宣告, durable:是否持久化 ; exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除;auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列.。也可以在配置時指明預設使用的exchange與routingkey、·
<rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" routing-key="xxx" exchange="xxx"/>
- 佇列宣告,使用name屬性或者id屬性指定佇列名。durable屬性設定是夠將訊息持久化,exclusive屬性設定訊息佇列是否是獨佔的
<rabbit:queue id="test_delay_queue" durable="true" auto-delete="false" exclusive="false" name="test_delay_queue"> <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">60000</value> </entry> <entry key="x-dead-letter-exchange" value="test_Exchange"/> </rabbit:queue-arguments> </rabbit:queue>
- 配置exchange
direct
<rabbit:direct-exchange name="test_Exchange" durable="true" auto-delete="false" id="test_Exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue" key="test_key" />
</rabbit:bindings>
</rabbit:direct-exchange>
fanout
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_delay_queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
topic
<binding queue="testqueue" pattern="*.*.test1"
<rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange">
<rabbit:bindings>
<rabbit:binding queue="test_queue" pattern="test_key.*.*" />
</rabbit:bindings>
</rabbit:topic-exchange>
在子標籤<rabbit:bindings>中宣告繫結鍵與佇列的對應關係
。
- 生產者(傳送端)程式碼:
@Resource
private RabbitTemplate rabbitTemplate;
public void sendMessage(CommonMessage msg){
try {
logger.error("傳送資訊開始");
System.out.println(rabbitTemplate.getConnectionFactory().getHost());
//傳送資訊 message-exchange 交換機 msg.getSource() 為 test_key
rabbitTemplate.convertAndSend("message-exchange",msg.getSource(), msg);
logger.error("傳送資訊結束");
} catch (Exception e) {
e.printStackTrace();
}
}
rabbitTemplate傳送訊息時若不指定exchange和routingkey則使用配置時預設的。
三、消費者配置
- 宣告訊息佇列的服務端連線,ConnectionFactory的配置與生存者相同
<rabbit:admin connection-factory="newRabbitConnFactory" />
- 配置佇列的中訊息的處理類
<bean id="refreshSingleStatisticJmsListener"
class="com.netease.edu.k12.server.course.manager.jms.listener.RefreshSingleStatisticJmsListener" />
類內容
@Component("refreshSingleStatisticJmsListener")
public class RefreshSingleStatisticJmsListener implements MessageListener {
@Override
public void onMessage(Message message) {
...
}
}
監聽類需要實現MessageListener介面並重寫onMessage方法,方法的入參Message中包含了訊息的載荷(一個位元組陣列,請根據需要進行反序列化)。
- 繫結監聽的佇列
<rabbit:listener-container
connection-factory="newRabbitConnFactory" prefetch="3" concurrency="50"
acknowledge="auto">
<rabbit:listener ref="refreshSingleStatisticJmsListener"
queues="refreshSingleStatistic-course-1.0.0${local_service_version_suffix}" />
<rabbit:listener ref="syncYktCourseEnrollJmsListener"
queues="ykt-courseEnrollSyncJmsQueue-key-${dubbo_study_version}"/>
</rabbit:listener-container>
<rabbit:listener>子標籤的ref屬性指定監聽處理類,queues指定被監聽的佇列。
這樣當監聽的佇列中有訊息時,訊息就會被取出交給監聽訊息的佇列進行相應操作。