第17章-Spring消息
1 異步消息簡介
像RMI和Hessian/Burlap這樣的遠程調用機制是同步的。如圖17.1所示,當客戶端調用遠程方法時,客戶端必須等到遠程方法完成後,才能繼續執行。即使遠程方法不向客戶端返回任何信息,客戶端也要被阻塞直到服務完成。
消息則是異步發送的,如圖17.2所示,客戶端不需要等待服務處理消息,甚至不需要等待消息投遞完成。客戶端發送消息,然後繼續執行,這是因為客戶端假定服務最終可以收到並處理這條消息。
1.1 發送消息
在異步消息中有兩個主要的概念:消息代理(message broker)和目的地(destination)。當一個應用發送消息時,會將消息交給一個消息代理。
盡管不同的消息系統會提供不同的消息路由模式,但是有兩種通用的目的地:隊列(queue)和主題(topic)。每種類型都與特定的消息模型相關聯,分別是點對點模型(隊列)和發布/訂閱模型(主題)。
點對點消息模型
在點對點模型中,每一條消息都有一個發送者和一個接收者,如圖17.3所示。當消息代理得到消息時,它將消息放入一個隊列中。當接收者請求隊列中的下一條消息時,消息會從隊列中取出,並投遞給接收者。因為消息投遞後會從隊列中刪除,這樣就可以保證消息只能投遞給一個接收者。
圖17.3 消息隊列對消息發送者和消息接收者進行了解耦。雖然隊列可以有多個接收者,但是每一條消息只能被一個接收者取走
盡管消息隊列中的每一條消息只被投遞給一個接收者,但是並不意味著只能使用一個接收者從隊列中獲取消息。事實上,通常可以使用幾個接收者來處理隊列中的消息。不過,每個接收者都會處理自己所接收到的消息。
發布—訂閱消息模型
在發布—訂閱消息模型中,消息會發送給一個主題。與隊列類似,多個接收者都可以監聽一個主題。但是,與隊列不同的是,消息不再是只投遞給一個接收者,而是主題的所有訂閱者都會接收到此消息的副本,如圖17.4所示。
圖17.4 與隊列類似,主題可以將消息發送者與消息接收者進行解耦。與隊列不同的是,主題消息可以發送給多個主題訂閱者
1.2 評估異步消息的優點
同步通信機制訪問遠程服務的客戶端存在幾個限制,最主要的是:
- 同步通信意味著等待。當客戶端調用遠程服務的方法時,它必須等待遠程方法結束後才能繼續執行。如果客戶端與遠程服務頻繁通信,或者遠程服務響應很慢,就會對客戶端應用的性能帶來負面影響。
- 客戶端通過服務接口與遠程服務相耦合。如果服務的接口發生變化,此服務的所有客戶端都需要做相應的改變。
- 客戶端與遠程服務的位置耦合。客戶端必須配置服務的網絡位置,這樣它才知道如何與遠程服務進行交互。如果網絡拓撲進行調整,客戶端也需要重新配置新的網絡位置。
- 客戶端與服務的可用性相耦合。如果遠程服務不可用,客戶端實際上也無法正常運行。
無需等待
當使用JMS發送消息時,客戶端不必等待消息被處理,甚至是被投遞。客戶端只需要將消息發送給消息代理,就可以確信消息會被投遞給相應的目的地。
因為不需要等待,所以客戶端可以繼續執行其他任務。這種方式可以有效地節省時間,所以客戶端的性能能夠極大的提高。
面向消息和解耦
與面向方法調用的RPC通信不同,發送異步消息是以數據為中心的。這意味著客戶端並沒有與特定的方法簽名綁定。任何可以處理數據的隊列或主題訂閱者都可以處理由客戶端發送的消息,而客戶端不必了解遠程服務的任何規範。
位置獨立
同步RPC服務通常需要網絡地址來定位。這意味著客戶端無法靈活地適應網絡拓撲的改變。如果服務的IP地址改變了,或者服務被配置為監聽其他端口,客戶端必須進行相應的調整,否則無法訪問服務。
與之相反,消息客戶端不必知道誰會處理它們的消息,或者服務的位置在哪裏。客戶端只需要了解需要通過哪個隊列或主題來發送消息。因此,只要服務能夠從隊列或主題中獲取消息即可,消息客戶端根本不需要關註服務來自哪裏。
確保投遞
為了使客戶端可以與同步服務通信,服務必須監聽指定的IP地址和端口。如果服務崩潰了,或者由於某種原因無法使用了,客戶端將不能繼續處理。
但是,當發送異步消息時,客戶端完全可以相信消息會被投遞。即使在消息發送時,服務無法使用,消息也會被存儲起來,直到服務重新可以使用為止。
2 使用JMS發送消息
Java消息服務(Java Message Service ,JMS)是一個Java標準,定義了使用消息代理的通用API。
Spring通過基於模板的抽象為JMS功能提供了支持,這個模板也就是JmsTemplate。使用JmsTemplate,能夠非常容易地在消息生產方發送隊列和主題消息,在消費消息的那一方,也能夠非常容易地接收這些消息。Spring還提供了消息驅動POJO的理念:這是一個簡單的Java對象,它能夠以異步的方式響應隊列或主題上到達的消息。
我們將會討論Spring對JMS的支持,包括JmsTemplate和消息驅動POJO。但是在發送和接收消息之前,我們首先需要一個消息代理,它能夠在消息的生產者和消費者之間傳遞消息.
2.1 在Spring中搭建消息代理
ActiveMQ是一個偉大的開源消息代理產品,也是使用JMS進行異步消息傳遞的最佳選擇.
創建連接工廠
默認情況下,ActiveMQConnectionFactory會假設ActiveMQ代理監聽localhost的61616端口。對於開發環境來說,這沒有什麽問題,但是在生產環境下,ActiveMQ可能會在不同的主機和/端口上。如果是這樣的話,我們可以使用brokerURL屬性來指定代理的URL:
<bean id="connectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory"
p:brokerURL="tcp://localhost:61616" />
聲明ActiveMQ消息目的地
除了連接工廠外,我們還需要消息傳遞的目的地。目的地可以是一個隊列,也可以是一個主題,這取決於應用的需求。
不論使用的是隊列還是主題,我們都必須使用特定的消息代理實現類在Spring中配置目的地bean。例如,下面的<bean>
聲明定義了一個ActiveMQ隊列:
<bean id="spittleQueue" class="org.apache.activemq.command.ActiveMQQueue"
c:_="spittle.alert.queue" />
同樣,下面的<bean>
聲明定義了一個ActiveMQ主題:
<bean id="spittleTopic" class="org.apache.activemq.command.ActiveMQTopic"
c:_="spittle.alert.topic" />
ActiveMQ命名空間提供了另一種方式來聲明隊列和主題。對於隊列,我們可以使用<amq:quence>
元素來聲明:
<amq:queue id="spittleQueue" physicalName="spittle.alert.queue" />
如果是JMS主題,我們可以使用<amq:topic>
元素來聲明:
<amq:topic id="spittleTopic" physicalName="spittle.alert.topic" />
不管是哪種類型,都是借助physicalName屬性指定消息通道的名稱。
2.2 使用Spring的JMS模板
處理失控的JMS代碼
傳統的JDBC代碼在處理連接、語句、結果集和異常時是多麽冗長和繁雜。遺憾的是,傳統的JMS使用了類似的編程模型.
使用JMS模版
針對如何消除冗長和重復的JMS代碼,Spring給出的解決方案是JmsTemplate。JmsTemplate可以創建連接、獲得會話以及發送和接收消息。這使得我們可以專註於構建要發送的消息或者處理接收到的消息。
另外,JmsTemplate可以處理所有拋出的笨拙的JMSException異常:
表17.1 Spring的JmsTemplate會捕獲標準的JMSException異常,再以Spring的非檢查型異常JmsException子類重新拋出
為了使用JmsTemplate,我們需要在Spring的配置文件中將它聲明為一個bean。如下的XML可以完成這項工作:
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate"
c:_-ref="connectionFactory" />
發送消息
在我們想建立的Spittr應用程序中,其中有一個特性就是當創建Spittle的時候提醒其他用戶(或許是通過E-mail)。我們可以在增加Spittle的地方直接實現該特性。但是搞清楚發送提醒給誰以及實際發送這些提醒可能需要一段時間,這會影響到應用的性能。當增加一個新的Spittle時,我們希望應用是敏捷的,能夠快速做出響應。
為了在Spittle創建的時候異步發送spittle提醒,讓我們為Spittr應用引入AlertService:
public interface AlertService {
void sendSpittleAlert(Spittle spittle);
}
- 實現類:
public class AlertServiceImpl implements AlertService {
private JmsOperations jmsOperations;
public AlertServiceImpl(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
public void sendSpittleAlert(final Spittle spittle) {
jmsOperations.send(
"spittle.alert.queue",
new MessageCreator() {
public Message createMessage(Session session)
throws JMSException {
return session.createObjectMessage(spittle);
}
}
);
}
}
設置默認目的地
如果你想指定要創建的目的地類型的話,那麽你可以將之前創建的隊列或主題的目的地bean裝配進來:
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate"
c:_-ref="connectionFactory"
p:defaultDestination-ref="spittleTopic"/>
現在,調用JmsTemplate的send()方法時,我們可以去除第一個參數了:
jmsOperations.send(
new MessageCreator() {
//…
)
在發送時,對消息進行轉換
除了send()方法,JmsTemplate還提供了convertAndSend()方法。與send()方法不同,convertAndSend()方法並不需要MessageCreator作為參數。這是因為convertAndSend()會使用內置的消息轉換器(message converter)為我們創建消息。
當我們使用convertAndSend()時,sendSpittleAlert()可以減少到方法體中只包含一行代碼:
public void sendSpittleAlert(Spittle spittle) {
jmsOperations.convertAndSend(spittle);
}
接收消息
當調用JmsTemplate的receive()方法時,JmsTemplate會嘗試從消息代理中獲取一個消息。如果沒有可用的消息,receive()方法會一直等待,直到獲得消息為止。圖17.6展示了這個交互過程。
在convertAndSend()中,我們已經看到了如何將對象轉換為Message。不過,它們還可以用在接收端,也就是使用JmsTemplate的receiveAndConvert():
public Spittle retrieveSpittleAlert() {
return (Spittle) jmsOperations.receiveAndConvert();
}
使用JmsTemplate接收消息的最大缺點在於receive()和receiveAndConvert()方法都是同步的。這意味著接收者必須耐心等待消息的到來,因此這些方法會一直被阻塞,直到有可用消息(或者直到超時)。同步接收異步發送的消息,是不是感覺很怪異?
這就是消息驅動POJO的用武之處。讓我們看看如何使用能夠響應消息的組件異步接收消息,而不是一直等待消息的到來。
2.3 創建消息驅動的POJO
配置消息監聽器
為POJO賦予消息接收能力的訣竅是在Spring中把它配置為消息監聽器。Spring的jms命名空間為我們提供了所需要的一切。首先,讓我們先把處理器聲明為bean:
<bean id="spittleHandler" class="spittr.alerts.SpittleAlertHandler" />
然後,為了把SpittleAlertHandler轉變為消息驅動的POJO,我們需要把這個bean聲明為消息監聽器:
<jms:listener-container>
<jms:listener destination="spittle.alert.queue"
ref="spittleHandler"
method="handleSpittleAlert" />
</jms:listener-container>
在這裏,我們在消息監聽器容器中包含了一個消息監聽器。消息監聽器容器(message listener container)是一個特殊的bean,它可以監控JMS目的地並等待消息到達。一旦有消息到達,它取出消息,然後把消息傳給任意一個對此消息感興趣的消息監聽器。如圖17.7展示了這個交互過程。
圖17.7 消息監聽器容器監聽隊列和主題。當消息到達時,消息將轉給消息監聽器(例如消息驅動的POJO)
為了在Spring中配置消息監聽器容器和消息監聽器,我們使用了Spring jms命名空間中的兩個元素。<jms:listener-container>
中包含了<jms:listener>
元素。這裏的connection-factory屬性配置了對connectionFactory的引用,容器中的每個<jms:listener>
都使用這個連接工廠進行消息監聽。
2.4 使用基於消息的RPC
導出基於JMS的服務
如果HttpInvokerServiceExporter可以導出基於HTTP通信的服務,那麽JmsInvoker-ServiceExporter就應該可以導出基於JMS的服務。
為了演示JmsInvokerServiceExporter是如何工作的,考慮如下的AlertServiceImpl。
,AlertServiceImpl使用了@Component註解來標註,所以它會被Spring自動發現並註冊為Spring應用上下文中ID
為alertService的bean。在配置JmsInvokerServiceExporter時,我們將引用這個bean:
導出器的屬性並沒有描述服務如何基於JMS通信的細節。但好消息是JmsInvokerServiceExporter可以充當JMS監聽器。因此,我們使用<jms:listenercontainer>
元素配置它:
使用基於JMS的服務
為了使用提醒服務,我們可以像下面那樣配置JmsInvokerProxyFactoryBean:
connectionFactory和queryName屬性指定了RPC消息如何被投遞——在這裏,也就是在給定的連接工廠中,我們所配置的消息代理裏面名為spitter.alert.queue的隊列。對於serviceInterface,指定了代理應該通過AlertService接口暴露功能。
多年來,JMS一直是Java應用中主流的消息解決方案。但是對於Java和Spring開發者來說,JMS並不是唯一的消息可選方案。在過去的幾年
中,高級消息隊列協議(Advanced Message Queuing Protocol ,AMQP)得到了廣泛的關註。
3 使用AMQP實現消息功能
AMQP的線路層協議規範了消息的格式,消息在生產者和消費者間傳送的時候會遵循這個格式。這樣AMQP在互相協作方面就要優於JMS——它不僅能跨不同的AMQP實現,還能跨語言和平臺。(AMQP能夠不局限於Java語言和平臺,那說明你已經快速抓到了重點。)
3.1 AMQP簡介
與之不同的是,AMQP的生產者並不會直接將消息發布到隊列中。AMQP在消息的生產者以及傳遞信息的隊列之間引入了一種間接的機制:Exchange。這種關系如圖17.8所示。
可以看到,消息的生產者將信息發布到一個Exchange。Exchange會綁定到一個或多個隊列上,它負責將信息路由到隊列上。信息的消費者會從隊列中提取數據並進行處理。
AMQP定義了四種不同類型的Exchange,每一種都有不同的路由算法,這些算法決定了是否要將信息放到隊列中。根據Exchange的算法不同,它可能會使用消息的routing key和/或參數,並將其與Exchange和隊列之間binding的routing key和參數進行對比。(routing key可以大致理解為Email的收件人地址,指定了預期的接收者。)如果對比結果滿足相應的算法,那麽消息將會路由到隊列上。否則的話,將不會路由到隊列上。
四種標準的AMQP Exchange如下所示:
- Direct:如果消息的routing key與binding的routing key直接匹配的話,消息將會路由到該隊列上;
- Topic:如果消息的routing key與binding的routing key符合通配符匹配的話,消息將會路由到該隊列上;
- Headers:如果消息參數表中的頭信息和值都與bingding參數表中相匹配,消息將會路由到該隊列上;
- Fanout:不管消息的routing key和參數表的頭信息/值是什麽,消息將會路由到所有隊列上。
3.2 配置Spring支持AMQP消息
當我們第一次使用Spring JMS抽象的時候,首先配置了一個連接工廠。與之類似,使用Spring AMQP前也要配置一個連接工廠。只不過,所要配置的不是JMS的連接工廠,而是需要配置AMQP的連接工廠。更具體來講,需要配置RabbitMQ連接工廠。
什麽是RabbitMQ
<connection-factory id="connectionFactory"
host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}" />
聲明隊列、Exchange以及binding
表17.3 Spring AMQP的rabbit命名空間包含了多個元素,用來創建隊列、Exchange以及將它們結合在一起的binding
這些配置元素要與
例如,如果你希望聲明名為spittle.alert.queue的隊列,只需要在Spring配置中添加如下的兩個元素即可:
<admin connection-factory="connectionFactory"/>
<queue id="spittleAlertQueue" name="spittle.alerts" />
對於簡單的消息來說,我們只需做這些就足夠了。這是因為默認會有一個沒有名稱的direct Exchange,所有的隊列都會綁定到這個Exchange上,並且routing key與隊列的名稱相同。在這個簡單的配置中,我們可以將消息發送到這個沒有名稱的Exchange上,並將routing key設定為spittle.alert.queue,這樣消息就會路由到這個隊列中。實際上,我們重新創建了JMS的點對點模型。
3.3 使用RabbitTemplate發送消息
配置RabbitTemplate的最簡單方式是使用rabbit命名空間的<template>
元素,如下所示:
<template id="rabbitTemplate"
connection-factory="connectionFactory"
routing-key="spittle.alerts" />
現在,要發送消息的話,我們只需要將模板bean註入到AlertServiceImpl中,並使用它來發送Spittle。如下的程序清單展現了一個新版本的AlertServiceImpl,它使用RabbitTemplate代替JmsTemplate來發送Spittle提醒。
public class AlertServiceImpl implements AlertService {
private RabbitTemplate rabbit;
@Autowired
public AlertServiceImpl(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendSpittleAlert(Spittle spittle) {
rabbit.convertAndSend("spittle.alert.exchange",
"spittle.alerts",
spittle);
}
}
可以看到,現在sendSpittleAlert()調用RabbitTemplate的convertAndSend()方法,其中RabbitTemplate是被註入進來的。它傳入了三個參數:Exchange的名稱、routing key以及要發送的對象。註意,這裏並沒有指定消息該路由到何處、要發送給哪個隊列以及期望哪個消費者來獲取消息。
convertAndSend()方法,它會自動將對象轉換為Message。它需要一個消息轉換器的幫助來完成該任務,默認的消息轉換器是SimpleMessageConverter,它適用於String、Serializable實例以及字節數組。Spring AMQP還提供了其他幾個有用的消息轉換器,其中包括使用JSON和XML數據的消息轉換器。
3.4 接收AMQP消息
使用RabbitTemplate來接收消息
RabbitTemplate提供了多個接收信息的方法。最簡單就是receive()方法,它位於消息的消費者端,對應於RabbitTemplate的send()方法。借助receive()方法,我們可以從隊列中獲取一個Message對象:
Message message=rabbit.receive("spittle.alert.queue");
或者,如果願意的話,你還可以配置獲取消息的默認隊列,這是通過在配置模板的時候,設置queue屬性實現的:
<template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spittle.alert.exchange"
routing-key="spittle.alerts"
queue="spittle.alert.queue"/>
這樣的話,我們在調用receive()方法的時候,不需要設置任何參數就能從默認隊列中獲取消息了:
Message message=rabbit.receive();
在獲取到Message對象之後,我們可能需要將它body屬性中的字節數組轉換為想要的對象。就像在發送的時候將領域對象轉換為Message一樣,將接收到的Message轉換為領域對象同樣非常繁瑣。因此,我們可以考慮使用RabbitTemplate的receiveAndConvert()方法作為替代方案:
Spittle spittle = (Spittle)rabbit.receiveAndConvert();
receiveAndConvert()方法會使用與sendAndConvert()方法相同的消息轉換器,將Message對象轉換為原始的類型。
調用receive()和receiveAndConvert()方法都會立即返回,如果隊列中沒有等待的消息時,將會得到null。這就需要我們來管理輪詢(polling)以及必要的線程,實現隊列的監控。
我們並非必須同步輪詢並等待消息到達,Spring AMQP還提供了消息驅動POJO的支持,這不禁使我們回憶起Spring JMS中的相同特性。讓我們看一下如何通過消息驅動AMQP POJO的方式來接收消息。
定義消息驅動的AMQP POJO
public class SpittleAlertHandler {
public void handleSpittleAlert(Spittle spittle) {
System.out.println(spittle.getMessage());
}
}
我們還需要在Spring應用上下文中將SpittleAlertHandler聲明為一個bean:
<beans:bean id="spittleListener" class="spittr.alerts.SpittleAlertHandler" />
最後,我們需要聲明一個監聽器容器和監聽器,當消息到達的時候,能夠調用SpittleAlertHandler。在基於JMS的MDP中,我們做過相同的事情,但是基於AMQP的MDP在配置上有一個細微的差別:
<listener-container connection-factory="connectionFactory" >
<listener ref="spittleListener"
method="handleSpittleAlert"
queues="spittleAlertQueue" />
</listener-container>
其中:
<queue id="spittleAlertQueue" name="spittle.alert.queue" />
源碼
https://github.com/myitroad/spring-in-action-4/tree/master/Chapter_17
附件列表
- alertSEBean.jpg
- AlertSI.jpg
- amqp.jpg
- asyn.jpg
- jmsExcep1.jpg
- jmsExcep2.jpg
- jmsInvokerProxy.jpg
- jmsRec.jpg
- listenBean.jpg
- mdp.jpg
- p2p.jpg
- rabbitSN.jpg
- syn.jpg
- topic.jpg
第17章-Spring消息