RabbitMQ (九) Spring整合RabbitMQ(1)
阿新 • • 發佈:2019-01-06
前面幾篇講解了如何使用rabbitMq,這一篇主要講解spring整合rabbitmq。
說明:
durable:是否持久化
exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除
auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列
三:生產者程式
下面是消費者監聽配置
首先引入配置檔案org.springframework.amqp,如下
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.5.1</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.5.RELEASE</version> </dependency>
一:配置消費者和生成者公共部分
<rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}" port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.virtualHost}" channel-cache-size="50"/> <rabbit:admin connection-factory="connectionFactory"/> <!--定義訊息佇列--> <rabbit:queue name="spittle.alert.queue.1" durable="true" auto-delete="false"/> <rabbit:queue name="spittle.alert.queue.2" durable="true" auto-delete="false"/> <rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/> <!--繫結佇列--> <rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true"> <rabbit:bindings> <rabbit:binding queue="spittle.alert.queue.1"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.2"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange>
說明:
durable:是否持久化
exclusive: 僅建立者可以使用的私有佇列,斷開後自動刪除
auto_delete: 當所有消費客戶端連線斷開後,是否自動刪除佇列
二:配置生成者
<!--建立訊息佇列模板--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spittle.fanout" message-converter="jsonMessageConverter"> </rabbit:template> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
三:生產者程式
public class Spittle implements Serializable {
private Long id;
private Spitter spitter;
private String message;
private Date postedTime;
public Spittle(Long id, Spitter spitter, String message, Date postedTime) {
this.id = id;
this.spitter = spitter;
this.message = message;
this.postedTime = postedTime;
}
public Long getId() {
return this.id;
}
public String getMessage() {
return this.message;
}
public Date getPostedTime() {
return this.postedTime;
}
public Spitter getSpitter() {
return this.spitter;
}
}
public class ProducerMain {
public static void main(String[] args) throws Exception {
ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml");
AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate");
for (int i = 0; i < 20; i++) {
System.out.println("Sending message #" + i);
Spittle spittle = new Spittle((long) i, null, "Hello world (" + i + ")", new Date());
template.convertAndSend(spittle);
Thread.sleep(5000);
}
System.out.println("Done!");
}
}
其中convertAndSend方法預設第一個引數是交換機名稱,第二個引數是路由名稱,第三個才是我們傳送的資料,現在我們啟動程式,效果如下
第四個:消費者程式
首先編寫一個用於監聽生產者傳送資訊的程式碼
public class SpittleAlertHandler implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String body=new String(message.getBody(),"UTF-8");
System.out.println(body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
一定要注意實現MessageListener,我們只需要獲取message的body即可,通過json來轉換我們需要的程式(比如我們可以傳送一個map,map存放方法和實體,這樣我們可以通過反射來呼叫不同的程式來執行)。
下面我們配置消費者
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="spittleListener" method="onMessage" queues="spittle.alert.queue.1,spittle.alert.queue.3,spittle.alert.queue.2"/>
</rabbit:listener-container>
<bean id="spittleListener" class="com.lp.summary.rabbitmq.impl.SpittleAlertHandler"/>
其中spittleListener是監聽的程式,method是執行的方法,queues是我們監聽的佇列,多個佇列可以逗號隔開(因為我們採用的是分發,所以三個佇列獲取的訊息是相同的,這裡為了簡便我放在一個監聽程式中了,其實我們可以寫三個消費者,每個消費者監聽一個佇列)
現在只需要啟動程式即可執行
public class ConsumerMain {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-consumer.xml");
}
}
當然direct跟上面的情況差不多,只不過這個是根據路由匹配,先把資料傳送到交換機,然後繫結路由和佇列,通過交換機id和路由來找到佇列,下面是一些主要的配置
<rabbit:queue id="spring-test-queue1" durable="true" auto-delete="false" exclusive="false" name="spring-test-queue1"></rabbit:queue>
<rabbit:queue name="spring-test-queue2" durable="true" auto-delete="false" exclusive="false"></rabbit:queue>
<!--交換機定義-->
<!--rabbit:direct-exchange:定義exchange模式為direct,
意思就是訊息與一個特定的路由鍵完全匹配,才會轉發。
rabbit:binding:設定訊息queue匹配的key-->
<rabbit:direct-exchange name="${rabbit.exchange.direct}" durable="true" auto-delete="false" id="${rabbit.exchange.direct}">
<rabbit:bindings>
<rabbit:binding queue="spring-test-queue1" key="spring.test.queueKey1"/>
<rabbit:binding queue="spring-test-queue2" key="spring.test.queueKey2"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--spring template宣告-->
<rabbit:template exchange="${rabbit.exchange.direct}" id="rabbitTemplate" connection-factory="connectionFactory"
message-converter="jsonMessageConverter"></rabbit:template>
<!--訊息物件轉成成json-->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
下面是消費者監聽配置
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="spring-test-queue1" method="onMessage" ref="queueListenter"></rabbit:listener>
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queues="spring-test-queue2" method="onMessage" ref="queueListenter"></rabbit:listener>
</rabbit:listener-container>
說明:
queues:監聽的佇列,多個的話用逗號(,)分隔
ref:監聽器
下面是程式
public static void main(String[] args) {
ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml");
MQProducer mqProducer=(MQProducer) context.getBean("mqProducer");
mqProducer.sendDateToQueue("spring.test.queueKey1","Hello World spring.test.queueKey1");
mqProducer.sendDateToQueue("spring.test.queueKey2","Hello World spring.test.queueKey2");
}