Spring框架之jms原始碼完全解析
Spring框架之jms原始碼完全解析
我們在前兩篇文章中介紹了Spring兩大核心IOC(Inversion of Control控制反轉)和AOP(Aspect Oriented Programming面向切面程式設計)技術:Spring框架之beans原始碼完全解析和Spring框架之AOP原始碼完全解析,下面對Spring的jms原始碼進行分析,先對jms進行簡單的介紹,其次對Spring中jms模組原始碼檔案清單進行梳理,然後對jms的單獨使用和Spring整合jms使用進行演示,最後對Spring中jms模組有兩個核心JmsTemplate和訊息監聽器原始碼進行分析。
一、jms簡介
分散式系統訊息通訊技術主要包括:(1) RPC(Remote Procedure Call Protocol),一般是C/S方式,同步的,跨語言跨平臺,面向過程。(2)CORBA(Common Object Request Broker Architecture),CORBA從概念上擴充套件了RPC,面向物件的,企業級的(面向物件中介軟體還有DCOM)。(3) RMI(Remote Method Invocation),面向物件方式的 Java RPC。(4)WebService基於Web,C/S或B/S,跨系統跨平臺跨網路。多為同步呼叫,實時性要求較高。(5)MOM(Message oriented Middleware) 面向訊息中介軟體。
面向訊息中介軟體,主要適用於訊息通道、訊息匯流排、訊息路由和釋出/訂閱的場景。目前主流標準有JMS(Java Message Service)、AMQP(Advanced Message Queuing Protocol)和STOMP(Streaming Text Oriented Messaging Protocol)。AMQP是一個面向協議的,跟語言平臺無關的訊息傳遞應用層協議規範。STOMP是流文字定向訊息協議,是一種為MOM設計的簡單文字協議。AMQP和STOMP都是跟http處於同一層的協議。JMS是Java平臺上的面向介面的訊息規範,是一套API標準,並沒有考慮異構系統。
JMS即Java訊息服務應用程式介面,是一個Java平臺關於面向訊息中介軟體的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務是一個與具體平臺無關的API,絕大多數面向訊息中介軟體提供商都對JMS提供支援,JMS類似於JDBC(Java Database Connectivity),JDBC 是可以用來訪問許多不同關係資料庫的API,而JMS則提供同樣與廠商無關的訪問方法,用來訪問訊息收發服務。所以兩個應用程式之間要進行通訊,我們使用了一個JMS服務,進行中間的轉發,通過使用JMS,可以解除兩個程式之間的耦合,提高訊息靈活性,支援非同步性。
JMS程式設計模型包含的幾個要素:
(1)連線工廠。 連線工廠(ConnectionFactory)是由管理員建立,並繫結到JNDI(Java命名和目錄介面)樹中。針對兩種不同的jms訊息模型(點對點和釋出/訂閱),分別有QueueConnectionFactory和TopicConnectionFactory兩種。客戶端使用JNDI查詢連線工廠,然後利用連線工廠建立一個JMS連線。
(2)JMS連線。JMS連線(Connection)表示JMS客戶端和伺服器端之間的一個活動的連線,是由客戶端通過呼叫連線工廠的方法建立的。Connection表示在客戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種型別:QueueConnection和TopicConnection。
(3)JMS會話。JMS會話(Session)表示JMS客戶與JMS伺服器之間的會話狀態。JMS會話建立在JMS連線上,表示客戶與伺服器之間的一個會話執行緒。Session是我們操作訊息的介面。可以通過session建立生產者、消費者、訊息等。Session提供了事務的功能。當我們需要使用session傳送/接收多個訊息時,可以將這些傳送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
通俗的講,Connection(連線)是一個物理概念,是指一個通過網路建立的客戶端和專有伺服器或排程器之間的一個網路連線。Session(會話)是一個邏輯概念,它存在於例項中,一個Connection可以擁有多個Session,也可以沒有Session,同一個Connection上的多個Session之間不會相互影響。connection相當於修路,而session相當於通過這條道路的一次運輸。
(4)JMS目的。JMS目的(Destination),又稱為訊息佇列,是實際的訊息源。Destination的意思是訊息生產者的訊息傳送目標或者說訊息消費者的訊息來源。對於訊息生產者來說,它的Destination是某個佇列(Queue)或某個主題(Topic);對於訊息消費者來說,它的Destination也是某個佇列或主題(即訊息來源)。所以Destination實際上就是兩種型別的物件:Queue、Topic。
(5)JMS訊息。訊息由兩部分組成:報頭和訊息主體。報頭由路由資訊以及有關該訊息的元資料組成。訊息主體則攜帶著應用程式的資料或有效負載。根據有效負載的型別來劃分,可以將訊息分為幾種型別:簡單文字(TextMessage)、可序列化的物件 (ObjectMessage)、屬性集合 (MapMessage)、位元組流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的訊息 (Message)。
通常有兩種型別:① 點對點(Point-to-Point)。在點對點的訊息系統中,訊息分發給一個單獨的使用者。點對點訊息往往與佇列(javax.jms.Queue)相關聯。② 釋出/訂閱(Publish/Subscribe)。釋出/訂閱訊息系統支援一個事件驅動模型,訊息生產者和消費者都參與訊息的傳遞。生產者釋出事件,而使用者訂閱感興趣的事件,並使用事件。該型別訊息一般與特定的主題(javax.jms.Topic)關聯。
(6)訊息的生產者。生產者(Message Producer)物件由Session物件建立,用於傳送訊息,將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息。
(7)訊息消費者。 消費者(Message Consumer)物件由Session物件建立,用於接收訊息,接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。
(8)訊息監聽器。訊息監聽器MessageListener,類似於鉤子函式,hook到訊息相關的事件中,換句話說,當訊息被建立、開始傳輸、轉發、傳輸中止、刪除時,會呼叫相應的鉤子函式。如註冊了訊息監聽器,一旦訊息到達,將自動呼叫監聽器的onMessage方法。
Spring中整合JMS:
JMS是一個 Java 標準,定義了使用訊息代理的通用API 。Spring 通過基於模板的抽象為 JMS 功能提供了支援,這個模板就是 JmsTemplate 。使用 JmsTemplate能夠非常容易地在訊息生產方傳送佇列和主題訊息,在消費訊息的一方也能夠非常容易地接收這些訊息。 (模板方法模式是一種設計模式。通俗的講就是完成一件事情,有固定的數個步驟,但是每個步驟根據物件的不同,而實現細節不同。這樣可以在父類中定義一個完成該事情的總方法,按照完成事件需要的步驟去呼叫其每個步驟的實現方法。每個步驟的具體實現,由子類完成)。後面我們的程式碼分析也以JmsTemplate為核心進行分析。對於類似Java EE的訊息驅動Bean形式的非同步接收,Spring提供了大量用於建立訊息驅動POJOs的訊息監聽器。Spring還提供了一種建立訊息監聽器的宣告式方法。
根據《Spring 5 官方文件》:
(1)org.springframework.jms:定義了各種不同的JmsException異常類。在org.springframework.jms.support.JmsUtils的convertJmsAccessException方法中將javax.jms.JMSException異常類轉成成等價的org.springframework.jms.JmsException。
(2)org.springframework.jms.annotation包提供了支援註解驅動監聽端點的必要基礎架構,通過使用@JmsListener實現。
(3)org.springframework.jms.config包提供了 JMS 名稱空間的解析實現,以及配置監聽容器和建立監聽端點的 java 配置支援。
(4)org.springframework.jms.connection包提供了適用於獨立應用程式的ConnectionFactory實現。 它還包含 Spring 對 JMS 的PlatformTransactionManager實現(即JmsTransactionManager)。這將允許 JMS 作為事務性資源無縫整合到 Spring 的事務管理機制中。
(5)org.springframework.jms.core包提供了使用 JMS 的核心功能。它包含了JMS 模板類,用來處理資源的建立與釋放,從而簡化 JMS 的使用,就像JdbcTemplate對 JDBC 做的一樣。
(6)org.springframework.jms.listener:提供了訊息監聽器及相關支援類。
(7)org.springframework.jms.remoting:提供基於JMS的RPC方案。
(8)org.springframework.jms.support包:提供了一些支援的功能函式。converter子包提供了 MessageConverter 抽象,進行 Java 物件和 JMS 訊息的互相轉換。destination子包提供了管理 JMS 目的地的不同策略,比如針對 JNDI 中儲存的目標的服務定位器。
二、jms模組原始碼檔案清單
1 jms/
1.1 JmsException:繼承自NestedRuntimeException,NestedRuntimeException又繼承自RuntimeException。
Java中所有異常的父類是Throwable類,在Throwable類下有兩大子類:一個是Error類,指系統錯誤異常,例如:VirtualMachineError 虛擬機器錯誤,ThreadDeath 執行緒死鎖。一般如果是Error類的異常的話,就是程式的硬傷,就好比是工廠裡斷水斷電,機器損壞了。另一個是Exception類,指編碼、環境、使用者操作輸入等異常,這個是比較常見的異常類,Exception類下面又有兩個子類,非檢查異常(又稱執行時異常RuntimeException)和檢查異常。
在RuntimeException異常中有幾個常見的子類,例如:InputMismatchException 輸入不匹配異常;ArithmeticException 算術運算異常;NullPointerException 空指標異常;ArrayIndexOutOfBoundsException 陣列下標越界異常;ClassCastException 型別轉換異常。
檢查異常中的子類有:IOException 檔案異常;SQLException SQL資料庫錯誤異常。
1.2 IllegalStateException
1.3 InvalidClientIDException
1.4 InvalidDestinationException
1.5 InvalidSelectorException
1.6 JmsSecurityException
1.7 MessageEOFException
1.8 MessageFormatException
1.9 MessageNotReadableException
1.10 MessageNotWriteableException
1.11 ResourceAllocationException
1.12 TransactionInProgressException
1.13 TransactionRolledBackException
1.14 UncategorizedJmsException
1.2-1.14的異常處理類都繼承自JmsException,適用場景如類名所示,其中UncategorizedJmsException表示當其他JmsException都匹配不到時丟擲該異常。org.springframework.jms.support.JmsUtils的convertJmsAccessException方法負責將javax.jms.JMSException異常類轉成成等價的org.springframework.jms.JmsException。
2 jms/annotation
2.1 JmsListener:該類是一個註解介面。java用@interface Annotation{ } 定義一個註解 @Annotation,一個註解是一個類。註解相當於一種標記,在程式中加上了註解就等於為程式加上了某種標記,以後javac編譯器,開發工具和其他程式可以用反射來了解你的類以及各種元素上有無任何標記,看你有什麼標記,就去幹相應的事。
@JmsListener註解用來宣告這是個監聽器方法,也就是標記這個方法被JMS訊息監聽器監聽。該類中屬性destination表示監聽的佇列名字,containerFactory表示用來建立JMS監聽器容器。處理@JmsListener註解主要靠JmsListenerAnnotationBeanPostProcessor。註冊JmsListenerAnnotationBeanPostProcessor可以手動進行,更便捷的是通過Spring的config檔案<jms:annotation-driven/>配置,或者使用@EnableJms註解兩種方式將註解的監聽器類自動放到監聽器容器中。
2.2 EnableJms:用@JmsListener這個註解的時候。需要在配置類(@Configuration類)上加上@EnableJms註解,並且要配置一個DefaultJmsListenerContainerFactory監聽容器工廠的Bean例項。
Spring根據註解@EnableJms自動掃描帶有@JmsListener的方法,併為其建立一個MessageListener把它包裝起來。而JmsListenerContainerFactory的Bean的作用就是為每個MessageListener建立MessageConsumer並啟動訊息接收迴圈。
Spring接收訊息的步驟:通過JmsListenerContainerFactory配合@EnableJms掃描所有@JmsListener方法,自動建立MessageConsumer、MessageListener以及執行緒池,啟動訊息迴圈接收處理訊息,最終由我們自己編寫的@JmsListener方法處理訊息,可能會由多執行緒同時併發處理。
2.3 JmsListenerAnnotationBeanPostProcessor:該後置處理器用來實現@JmsListener註解,將帶有@JmsListener方法註冊到指定的JMS訊息監聽器容器中。該類中afterSingletonsInstantiated方法的最關鍵的一句 registrar.afterPropertiesSet()即可完成所有監聽的註冊。這個後置處理器可以通過 <jms:annotation-driven> XML配置或者@EnableJms註解兩種方式自動註冊。
2.4 JmsBootstrapConfiguration:配置類,註冊一個用於處理@JmsListener註解的JmsListenerAnnotationBeanPostProcessor後置處理器。同時也註冊一個預設的JmsListenerEndpointRegistry。當使用@EnableJms註解時,這個配置類會被自動載入。
2.5 JmsListenerConfigurer:Spring管理bean實現的可選介面,這些管理bean用來自定義JMS監聽器端點的配置方式。
2.6 JmsListeners:註解容器,多個@JmsListener註解的組成的集合。
3 jms/config
3.1 JmsListenerContainerFactory:訊息監聽容器工廠介面,基於JmsListenerEndpoint。
3.2 AbstractJmsListenerContainerFactory:訊息監聽容器工廠的抽象基類。
3.3 DefaultJmsListenerContainerFactory:JmsListenerContainerFactory介面的預設實現,該工廠用來建立DefaultMessageListenerContainer。
3.4 DefaultJcaListenerContainerFactory:JmsListenerContainerFactory介面的一個實現,用來建立一個基於 JCA的MessageListener容器JmsMessageEndpointManager。
JCA (J2EE 聯結器架構,Java Connector Architecture)是對J2EE標準集的重要補充。因為它注重的是將Java程式連線到非Java程式和軟體包中介軟體的開發。
3.5 SimpleJmsListenerContainerFactory:JmsListenerContainerFactory介面的一個簡單實現,用來建立一個標準的SimpleMessageListenerContainer。
3.6 JmsNamespaceHandler:JMS名稱空間處理器。註冊了三種標籤元素對應的處理函式:"listener-container"、"jca-listener-container"、"annotation-driven"。
3.7 AbstractListenerContainerParser:用來解析JMS監聽器容器元素。
3.8 JmsListenerContainerParser:解析JMS的<listener-container>元素。
3.9 JcaListenerContainerParser:解析JMS的<jca-listener-container>元素。
3.10 AnnotationDrivenJmsBeanDefinitionParser:解析jms名字空間中的 'annotation-driven' 元素。
3.11 AbstractJmsListenerEndpoint:Jms監聽端點的基礎模型。
3.12 JmsListenerEndpoint:JMS listener endpoint的模型。藉助JmsListenerConfigurer可以用來註冊端點。
3.13 MethodJmsListenerEndpoint:JmsListenerEndpoint的一個實現,提供了一些方法用來為該endpoint處理到來的訊息。包括get/set 所屬bean、所屬方法、jms相關引數、spring上下文、訊息處理工廠等。
3.14 SimpleJmsListenerEndpoint:JmsListenerEndpoint介面的一個實現,提供了MessageListener,用來為給endpoint處理到來的訊息。
3.15 JmsListenerEndpointRegistrar:將JmsListenerEndpoint物件註冊到JmsListenerEndpointRegistry物件中。
3.16 JmsListenerEndpointRegistry:建立MessageListenerContainer例項,用來儲存註冊過的JmsListenerEndpoint,同時對這些訊息監聽容器的生命週期進行管理。不同於手動建立的MessageListenerContainer,通過註冊生成的監聽容器不屬於ApplicationContext管理的bean,不會被自動裝配。
如果需要管理註冊的訊息監聽容器則呼叫getListenerContainers()函式。如果要使用一個指定的訊息監聽容器,使用函式getListenerContainer(String),引數就是endpoint的id值。
3.17 JmsListenerConfigUtils:配置常量值,用於子包間的內部共享。
4 jms/connection
4.1 SingleConnectionFactory:connectionFactory是Spring用於建立到JMS伺服器連結的。Spring提供了多種connectionFactory,主要有SingleConnectionFactory和CachingConnectionFactory。
SingleConnectionFactory:對於建立JMS伺服器連結的請求會一直返回同一個連結,並且會忽略Connection的close方法呼叫。
4.2 CachingConnectionFactory:繼承自SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還提供快取JMS資源功能,包括快取Session、MessageProducer和MessageConsumer。
Spring中傳送訊息的核心是JmsTemplate,然而Jmstemplate的問題是在每次呼叫時都要開啟/關閉session和producter,效率很低,所以引申出了PooledConnectionFactory連線池,用於快取session和producter。然而這還不是最好的。從spring2.5.3版本後,Spring又提供了CachingConnectionFactory,這才是首選的方案。預設情況下, CachingConnectionFactory只快取一個session。
4.3 SmartConnectionFactory:繼承自ConnectionFactory介面,指示從該connectionFactory得到的Connection怎樣釋放掉。
4.4 DelegatingConnectionFactory:ConnectionFactory介面的實現類,對所有呼叫給定的目標ConnectionFactory進行代理。
4.5 ConnectionFactoryUtils:ConnectionFactory類的功能函式,特別是用於從一個指定的ConnectionFactory獲得transactional JMS resources。主要在框架內部使用,比如JmsTemplate、DefaultMessageListenerContainer會使用到該類。
4.6 CachedMessageConsumer:MessageConsumer的裝飾器,使一個共享的MessageConsumer例項能適應所有的呼叫。
4.7 CachedMessageProducer:MessageProducer裝飾器,使得一個共享的MessageProducer例項能適應多數呼叫。
4.8 ChainedExceptionListener:ExceptionListener介面的實現類,支援異常鏈。在java程式碼中常常會再捕獲一個異常後丟擲另外一個異常,並且希望把異常原始資訊儲存下來,這被稱為異常鏈。
4.9 JmsResourceHolder:JmsResourceHolder繼承了ResourceHolderSupport,作為Jms資源控制代碼,封裝了JMS的connection、session等資源。
4.10 JmsTransactionManager:JmsTransactionManager用於對JMS ConnectionFactory做事務管理。這將允許JMS應用利用Spring的事務管理特性。JmsTransactionManager在執行本地資源事務管理時將從指定的ConnectionFactory繫結一個Connection/Session這樣的配對到執行緒中。JmsTemplate會自動檢測這樣的事務資源,並對它們進行相應操作。
4.11 SessionProxy:繼承自Session的介面,被Session代理實現,用來獲得該代理的目標Session。
4.12 SynchedLocalTransactionFailedException:同步本地事務未完成時丟擲的異常。
4.13 TransactionAwareConnectionFactoryProxy:ConnectionFactory的代理,添加了Spring的事務功能,同事務JNDI ConnectionFactory類似。
4.14 UserCredentialsConnectionFactoryAdapter:ConnectionFactory的一個介面卡,授予使用者對於每個標準的 createConnection()方法呼叫的許可權。
5 jms/core
5.1 JmsMessageOperations:繼承了MessageSendingOperations、MessageReceivingOperations、MessageRequestReplyOperations幾個介面,包含關於JMS訊息的操作方法,包括send、receive、convertAndSend、receiveAndConvert等。
5.2 JmsMessagingTemplate:JmsMessageOperations介面的一個實現。
5.3 JmsOperations:詳細列出JMS一系列操作,該介面會被JmsTemplate實現。
5.4 JmsTemplate:核心類。在JDBC中,Spring提供了一個JdbcTemplate來簡化JDBC程式碼開發,同樣,Spring也提供了JmsTemplate來簡化JMS訊息處理的開發。
JmsTemplate其實是Spring對JMS更高一層的抽象,它封裝了大部分建立連線、獲取session及傳送接收訊息相關的程式碼,使得我們可以把精力集中在訊息的傳送和接收上。
5.5 MessageCreator:利用給定的Session建立一個JMS訊息。
5.6 MessagePostProcessor:和JmsTemplate的send方法一起用,將一個物件轉換成message。在一個訊息被轉換器處理後可以進行進一步修改。在設定JMS頭部和屬性的時候有用。
5.7 BrowserCallback:瀏覽JMS queue中的資訊的回撥函式。在JmsTemplate類中的browse、browseSelected函式中BrowserCallback作為一個引數傳入。(有些函式要求應用先傳給它一個函式,好在合適的時候呼叫,以完成目標任務。這個被傳入的、後又被呼叫的函式就稱為回撥函式callback function)。
5.8 ProducerCallback:send一個訊息到JMS destination的回撥函式。作為JmsTemplate類的execute函式一個引數。
5.9 SessionCallback:在一個給定的Session執行一系列操作的回撥函式。作為JmsTemplate的execute函式的引數使用。
jms/core/support
5.10 JmsGatewaySupport:方便應用類訪問JMS。使用該類時需要設定一個ConnectionFactory 或者JmsTemplate例項。如果存在ConnectionFactory ,那麼它通過createJmsTemplate方法會建立自己的JmsTemplate。
6 jms/listener
6.1 AbstractJmsListeningContainer:繼承自JmsDestinationAccessor,作為所有Message Listener Container的公共基類。它主要提供了JMS connection的生命週期管理的功能,但是沒有對訊息接收的方式(主動接收方式或者非同步接收方式)等做任何假定。
6.2 MessageListenerContainer:框架內部使用的一個抽象類,用來表示一個訊息監聽器容器,不會被用來支援JMS和JCA模式的外部容器實現。
6.3 AbstractMessageListenerContainer: Spring訊息監聽器容器(如SimpleMessageListenerContainer、SimpleMessageListenerContainer)的父類。
6.4 AbstractPollingMessageListenerContainer:繼承自AbstractMessageListenerContainer,它提供了對於主動接收訊息(polling)的支援,以及支援外部的事務管理。
6.5 SimpleMessageListenerContainer:最簡單的訊息監聽器容器,用來從jms 訊息佇列中接收訊息,然後推送註冊到它內部的訊息監聽器(MessageListener)中,只能處理固定數量的JMS會話,且不支援事務。
在Spring框架中使用JMS傳遞訊息有兩種方式:JMS template和message listener container,前者用於同步收發訊息,後者主要用於非同步收訊息。
6.6 DefaultMessageListenerContainer:用於非同步訊息監聽的訊息監聽器容器。跟SimpleMessageListenerContainer一樣,DefaultMessageListenerContainer也支援建立多個Session和MessageConsumer來接收訊息。跟SimpleMessageListenerContainer不同的是,DefaultMessageListenerContainer建立了concurrentConsumers所指定個數的AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable介面),並交給taskExecutor執行。
6.7 LocallyExposedJmsResourceHolder:JMS資源控制代碼JmsResourceHolder的子類,指示本地的資源。
6.8 SessionAwareMessageListener:SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收訊息的,假如我們在使用MessageListener處理接收到的訊息時我們需要傳送一個訊息通知對方我們已經收到這個訊息了,那麼這個時候我們就需要在程式碼裡面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到訊息後傳送一個回覆的訊息,它同樣為我們提供了一個處理接收到的訊息的onMessage方法。
6.9 SubscriptionNameProvider:訊息監聽器會實現該介面,表示一個持久的訂閱,否則訊息監聽器被用作一個預設的訂閱。
jms/listener/adapter
6.10 AbstractAdaptableMessageListener:JMS訊息監聽器介面卡的抽象類,提供系列方法用來提取JMS訊息的有效資訊。
6.11 JmsResponse:在執行狀態時,destination需要計算時使用該類,返回JMS監聽器的方法用來指示destination。如果在執行中不需要計算destination,推薦使用org.springframework.messaging.handler.annotation.SendTo @SendTo。
6.12 MessageListenerAdapter:MessageListenerAdapter類實現了MessageListener介面和SessionAwareMessageListener介面,它的主要作用是將接收到的訊息進行型別轉換,然後通過反射的形式把它委託給目標監聽器進行處理。MessageListenerAdapter會把接收到的訊息做如下轉換:
TextMessage轉換為String物件;
BytesMessage轉換為byte陣列;
MapMessage轉換為Map物件;
ObjectMessage轉換為對應的Serializable物件。
6.13 MessagingMessageListenerAdapter:MessageListener介面卡,援引一個可配置的InvocableHandlerMethod(用於在某個請求被控制器方法處理時,包裝處理所需的各種引數和執行處理邏輯)。
6.14 ListenerExecutionFailedException:監聽器方法執行失敗時丟擲的異常。
6.15 ReplyFailureException:需要回復的訊息傳送失敗丟擲的異常。
jms/listener/endpoint
6.16 JmsMessageEndpointFactory:JCA 1.7 MessageEndpointFactory工廠類的一個實現,為JMS監聽器提供了事務管理能力。
6.17 JmsMessageEndpointManager:GenericMessageEndpointManager的一個拓展,ActivationSpec配置中加入了對JMS的支援。
6.18 JmsActivationSpecFactory:基於JmsActivationSpecConfig用來建立JCA 1.5 ActivationSpec物件的工廠。
6.19 StandardJmsActivationSpecFactory:JmsActivationSpecFactory介面的標準實現,支援JMS 1.5中定義的標準JMS屬性,忽視Spring的"maxConcurrency" 、 "prefetchSize" 設定。
6.20 DefaultJmsActivationSpecFactory: JmsActivationSpecFactory介面的預設實現。支援JCA 1.5中所定義的標準額JMS屬性,也包括Spring拓展的一些設定,如"maxConcurrency" 、 "prefetchSize" 。
6.21 JmsActivationSpecConfig:啟用JMS message endpoint的一些通用的配置物件。
7 jms/remoting
7.1 JmsInvokerClientInterceptor:方法攔截器,序列化遠端觸發物件和反序列化遠端觸發結果物件,使用Java序列化方法,例如RMI。
7.2 JmsInvokerProxyFactoryBean:jms觸發代理的工廠bean,暴露bean引用的代理服務,使用特定的服務介面。
7.3 JmsInvokerServiceExporter:為了支援基於訊息的RPC,Spring提供了JmsInvokerServiceExporter,它可以把bean匯出為基於訊息的服務;同時為客戶端提供了JmsInvokerProxyFactoryBean來使用這些服務。
8 jms/support
8.1 JmsAccessor:定義了幾個用於訪問JMS服務的共通屬性,提供了建立Connection和Session的方法。是JmsTemplate、SimpleMessageListenerContainer和DefaultMessageListenerContainer的父類。
8.2 JmsHeaderMapper:將訊息頭整合到要向外傳送的JMS訊息中的介面,或者從接收到的JMS訊息提取出訊息頭的資訊。
8.3 SimpleJmsHeaderMapper :JmsHeaderMapper介面的簡單實現。
8.4 JmsHeaders:將JMS屬性設定到通用訊息頭部或者從其提取出JMS屬性用到的預定義的名字或者字首。
8.5 JmsMessageHeaderAccessor:MessageHeaderAccessor介面的一個實現,能夠訪問JMS規範的頭header。
8.6 JmsUtils:JMS工具包,主要是框架內部使用。
8.7 QosSettings:收集Quality-of-Service設定,在傳送訊息時使用。
jms/support/converter
8.8 MessageConverter:在收發訊息時,將Java objects和JMS messages相互轉換。
8.9 SimpleMessageConverter: 實現String與TextMessage之間的相互轉換,位元組陣列與BytesMessage之間的相互轉換,Map與MapMessage之間的相互轉換以及Serializable物件與ObjectMessage之間的相互轉換。
8.10 MarshallingMessageConverter:使用JAXB庫實現訊息與XML格式之間的相互轉換。
8.11 MessagingMessageConverter:利用MessageConverter將messaging abstraction的Message和javax.jms.Message相互轉換。
8.12 SmartMessageConverter:MessageConverter的拓展,增加了轉換提示功能。
8.13 MappingJackson2MessageConverter:使用Jackson 2 JSON庫實現訊息與JSON格式之間相互轉換。
8.14 MessageType:定義幾個常量表示要轉換成得目標訊息的型別,有text、bytes、map、object。
8.15 MessageConversionException:MessageConverter出錯時丟擲的異常。
jms/support/destination
8.16 DestinationResolver:將指定的目的地名解析為目的地例項。 引數pubSubDomain用於指定是使用“釋出/訂閱”模式(解析後的目的地是Topic),還是使用“點對點”模式(解析後的目的地是Queue)。
8.17 BeanFactoryDestinationResolver:實現了DestinationResolver介面和BeanFactoryAware介面。它會根據指定的目的地名從BeanFactory中查詢目的地例項。
8.18 DynamicDestinationResolver:實現了DestinationResolver介面。根據指定的目的地名動態建立目的地例項。
8.19 CachingDestinationResolver:繼承了DestinationResolver,增加了快取的功能,在目的地失效的時候,removeFromCache方法會被呼叫;在JMS provider失效的時候,clearCache方法會被呼叫。
8.20 JndiDestinationResolver:繼承自JndiLocatorSupport, 同時實現了CachingDestinationResolver介面。如果在JMS provider中配置了靜態目的地,那麼JndiDestinationResolver通過JNDI查詢的方式獲得目的地例項。
8.21 JmsDestinationAccessor:提供了用於解析目的地的方法。destinationResolver屬性的預設值是DynamicDestinationResolver的例項,也就是說預設採用動態目的地解析的方式;pubSubDomain屬性用於指定是使用“釋出/訂閱”模式還是使用“點對點”模式,預設值是false(點對點模式)。
8.22 DestinationResolutionException:將指定的目的地名解析為目的地例項出錯丟擲的異常。
三、jms的使用演示
(一)jms的單獨使用
為了更好的理解Spring整合jms,先來看下單獨使用Java訊息服務的方式。以Java訊息服務的開源實現產品ActiveMQ為例。使用訊息服務,需要做三件事:1、開啟訊息伺服器。2、建立訊息生產者。3、建立訊息消費者。
1、開啟訊息伺服器
如果是Windows系統下可以直接雙擊ActiveMQ安裝目錄下的bin目錄下的activemq.bat檔案來啟動訊息伺服器。如果是Linux系統,進入activeMq安裝包下的bin目錄,使用./activemq start命令就可以啟動activemq服務。
2、建立訊息生產者
訊息的生產者主要用來將包含業務邏輯的訊息傳送到訊息伺服器。以下為傳送訊息測試,嘗試傳送三條訊息到訊息伺服器,訊息內容為“大家好,這是個測試”。
1 public class Sender{ 2 public static void main(String[] args) throws Exception{ 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start() 6 7 Session session = connection.creatSession(Boolean.True, Session.AUTO_ACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 10 MessageProducer producer = session.createProducer(destionation); 11 for (int i=0; i<3; i++){ 12 TextMessage message = session.createTextMessage("大家好,這是個測試"); 13 Tread.sleep(1000); 14 //通過訊息生產者發出訊息 15 producer.send(message); 16 } 17 session.commit(); 18 session.close(); 19 connection.close(); 20 } 21 }
3、建立訊息消費者
訊息的消費者用於連線訊息伺服器將伺服器中的訊息提取出來進行相應的處理。
1 public class Receiver{ 2 public static void main(String[] args) throws Exception { 3 ConnectionFactory connectionFactory = new ActiveMQConnectionFactroy(); 4 Connection connection = connectionFactory.createConnection(); 5 connection.start(); 6 7 final Session session = connection.createSession(Boolean.TRUE, Session.AUTOACKNOWLEDGE); 8 Destination destination = session.createQueue("my-queue"); 9 MessageConsumer consumer = session.createConsumer(destination); 10 11 int i = 0; 12 while(i<3){ 13 i++; 14 TextMessage message = (TextMessage) consumer.receive(); 15 session.commit(); 16 //TODO something... 17 System.out.println("收到訊息:" + message.getText()); 18 } 19 20 session.close(); 21 connection.close(); 22 } 23 }
執行時,先開啟訊息的生產者,向伺服器傳送訊息,然後開啟訊息的消費者。上述程式碼可以看出,和資料庫實現很相似,一系列的冗餘但是必不可少的程式碼用於建立connectionFactory、connection、session,利用session來 createQueue、createProducer、createConsumer,真正用於傳送和接收訊息的程式碼並不多。
Spring 通過基於模板方法的設計模式來解決這個問題,這個模板就是 JmsTemplate 。JmsTemplate封裝了大部分建立連線、獲取session及傳送接收訊息相關的程式碼,使得我們可以把精力集中在訊息的傳送和接收上。所以使用 JmsTemplate能夠非常容易地在訊息生產方傳送佇列和主題訊息,在消費訊息的一方也能夠非常容易地接收這些訊息。
(二)Spring整合jms
在Spring中使用jms同樣需要做三件事。1、配置檔案的配置。2、傳送訊息。3、接收訊息。
1、配置檔案的配置
上面我們提到了Spring將Connection的建立和關閉,Session的建立和關閉等操作都封裝到了JmsTemplate中,所以在Spring的核心配置檔案中首先要註冊JmsTemplate型別的bean。ActiveMQConnectionFactory用於連線訊息伺服器,ActiveMQQueue訊息佇列是實際的訊息源,也要註冊。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 </beans>
2、傳送訊息
Spring中使用jmsTemplate傳送訊息到訊息伺服器中去,省去了冗餘的Connection以及Session等的建立和銷燬過程。
1 public class HelloWorldSender{ 2 public static void main(String[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXmlApplicationContext(new string[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 5 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 6 Destination destination = (Destination) context.getBean("destination"); 7 8 jmsTemplate.send(destination, new MessageCreator(){ 9 public Message createMessage(Session session) throws JMSException{ 10 return session.createTextMessage("大家好,這是個測試"); 11 } 12 }); 13 } 14 }
3、接收訊息。
Spring中連線伺服器接收訊息示例如下:
1 public class HelloWorldReceiver{ 2 public static void main(string[] args) throws Exception{ 3 ApplicationContext context = new ClassPathXMLApplicationContext(new String[] {"test/activeMQ/Spring/applicationContext.xml"}); 4 JmsTemplate jmsTemplate = (JmsTemplate) context.getBean("jmsTemplate"); 5 Destination destination = (Destination) context.getBean("destination"); 6 7 TextMessage msg = (TextMessage) jmsTemplate.receive(destination); 8 System.out.println("received msg is:" + msg.getText()); 9 } 10 }
經過上面3步就完成了Spring訊息的傳送和接收。在HelloWorldSender傳送訊息類中使用jmsTemplate.send方法來發送訊息,沒有問題。在HelloWorldReceiver接收訊息類中,使用jmsTemplate.receive方法來接收訊息會存在一個問題,該方法只能接收一次訊息,如果未收到訊息則一直等待。利用訊息監聽器來解決這個問題,訊息監聽器可以迴圈監聽訊息伺服器上的訊息。訊息監聽器並非Spring獨有, Spring整合JMS的應用提供三種類型的訊息監聽器,分別是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。
MessageListener是最原始的訊息監聽器,它是JMS規範中定義的一個介面。其中定義了一個用於處理接收到的訊息的onMessage方法,該方法只接收一個Message引數。
SessionAwareMessageListener是Spring為我們提供的,它不是標準的JMS MessageListener。MessageListener的設計只是純粹用來接收訊息的,假如我們在使用MessageListener處理接收到的訊息時我們需要傳送一個訊息通知對方我們已經收到這個訊息了,那麼這個時候我們就需要在程式碼裡面去重新獲取一個Connection或Session。SessionAwareMessageListener的設計就是為了方便我們在接收到訊息後傳送一個回覆的訊息,它同樣為我們提供了一個處理接收到的訊息的onMessage方法,但是這個方法可以同時接收兩個引數,一個是表示當前接收到的訊息Message,另一個就是可以用來發送訊息的Session物件。
MessageListenerAdapter類實現了SessionAwareMessageListener介面和MessageListener介面,它的主要作用是將接收到的訊息進行型別轉換,然後通過反射的形式把它交給一個普通的Java類進行處理。TextMessage轉換為String物件;BytesMessage轉換為byte陣列;MapMessage轉換為Map物件;ObjectMessage轉換為對應的Serializable物件。
4、利用訊息監聽器接收訊息(第3步改進版)。
我們需要做兩步,第一建立一個訊息監聽器,第二為了使用訊息監聽器,修改配置檔案。
第一, 我們先來建立一個訊息監聽器:
1 public class MyMessageListener implements MessageListener{ 2 3 @Override 4 public void onMessage(Message arg0){ 5 TextMessage msg = (TextMessage) arg0; 6 try{ 7 System.out.println(msg.getText()); 8 } catch (JMSException e){ 9 e.printStackTrace(); 10 } 11 } 12 }
一旦有新訊息Spring會將訊息引導至訊息監聽器以方便使用者進行相應的邏輯處理。
第二,修改配置檔案。
為了使用訊息監聽器,需要在配置檔案中註冊訊息監聽器容器,並將訊息監聽器注入到訊息監聽器容器中。
1 <beans> 2 <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 3 <Property name="brokerURL"> 4 <value>tcp://localhost:61616</value> 5 </Property> 6 </bean> 7 8 <bean id="jmsTemplate" class="org.Springframework.jms.core.JmsTemplate"> 9 <Property name="connectionFactory"> 10 <ref bean="connectionFactory" /> 11 </Property> 12 </bean> 13 14 <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> 15 <constructor-arg index="0"> 16 <vaule>HelloWordQueue</vaule> 17 </constructor-arg> 18 </bean> 19 20 <bean id="myTextListener" class="test.activeMQ.Spring.MyMessageListener" /> 21 22 <bean id="javaConsumer" class="org.Springframework.jms.listener.DefaultMessageListenerContainer"> 23 <property name="ConnectionFactory" ref="connectionFactory" /> 24 <Property name="destination" ref="destination" /> 25 <Property name="messageListener" ref="myTextListener" /> 26 </bean> 27 28 </beans>
通過以上的修改配置就可以進行訊息的監聽功能了,一旦有訊息傳入訊息伺服器,則會被訊息監聽器監聽到,並由Spring將訊息內容引導至訊息監聽器的處理函式中等待使用者的進一步邏輯處理。
四、Spring中jms模組核心原始碼分析
從第三節可以看出,Spring中使用JmsTemplate模板類來進行傳送訊息和接收訊息操作,接收訊息可以使用訊息監聽器的方法來替代模板方法。所以Spring中jms模組核心主要有兩個:JmsTemplate和訊息監聽器。
(一)JmsTemplate傳送訊息
我們先來看JmsTemplate,在上面使用示例中,使用JmsTemplate傳送訊息的函式為:
1 jmsTemplate.send(destination, new MessageCreator(){ 2 public Message createMessage(Session session) throws JMSException{ 3 return session.createTextMessage("大家好,這是個測試"); 4 } 5 });
進入JmsTemplate類的send函式:
1 public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { 2 execute(session -> { 3 doSend(session, destination, messageCreator); 4 return null; 5 }, false); 6 }
(1)通用程式碼的抽取
呼叫了該類中的execute函式,繼續進入execute函式檢視原始碼邏輯:
1 public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException { 2 Assert.notNull(action, "Callback object must not be null"); 3 Connection conToClose = null; 4 Session sessionToClose = null; 5 try { 6 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 7 obtainConnectionFactory(), this.transactionalResourceFactory, startConnection); 8 if (sessionToUse == null) { 9 //建立connection 10 conToClose = createConnection(); 11 //根據connection建立session 12 sessionToClose = createSession(conToClose); 13 //是否開啟向服務推送連線資訊,只有接收資訊時需要,傳送時不需要 14 if (startConnection) { 15 conToClose.start(); 16 } 17 sessionToUse = sessionToClose; 18 } 19 if (logger.isDebugEnabled()) { 20 logger.debug("Executing callback on JMS Session: " + sessionToUse); 21 } 22 //呼叫回撥函式 23 return action.doInJms(sessionToUse); 24 } 25 catch (JMSException ex) { 26 throw convertJmsAccessException(ex); 27 } 28 finally { 29 //關閉session 30 JmsUtils.closeSession(sessionToClose); 31 //釋放連線 32 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); 33 } 34 }
execute函式封裝建立Connection、建立Session、關閉Session和關閉Connection等操作,這些程式碼都是傳送訊息都要做的工作,沒有差異,execute方法幫助我們抽離了這些冗餘程式碼是我們更加專注業務邏輯的實現。做完這些通用的操作後,通過呼叫回撥函式將程式引入使用者自定義實現的個性化處理。Spring使用execute方法封裝了冗餘程式碼,而將個性化的程式碼實現放在了回撥函式action.doInJms(sessionToUse)中。
(2)傳送訊息的實現
我們繼續看回調函式action.doInJms(sessionToUse)。在傳送訊息的功能中回撥函式通過區域性類實現。
1 new SessionCallback<Object>(){ 2 public Object doInJms(Session session) throws JMSException { 3 doSend(session, destination, messageCreator); 4 return null; 5 } 6 }
此時的傳送邏輯轉向了doSend方法,我們只需要關注該方法:
1 protected void doSend(Session session, Destination destination, MessageCreator messageCreator) 2 throws JMSException { 3 4 Assert.notNull(messageCreator, "MessageCreator must not be null"); 5 MessageProducer producer = createProducer(session, destination); 6 try { 7 Message message = messageCreator.createMessage(session); 8 if (logger.isDebugEnabled()) { 9 logger.debug("Sending created message: " + message); 10 } 11 doSend(producer, message); 12 // Check commit - avoid commit call within a JTA transaction. 13 if (session.getTransacted() && isSessionLocallyTransacted(session)) { 14 // Transacted session created by this template -> commit. 15 JmsUtils.commitIfNecessary(session); 16 } 17 } 18 finally { 19 JmsUtils.closeMessageProducer(producer); 20 } 21 }
1 protected void doSend(MessageProducer producer, Message message) throws JMSException { 2 if (this.deliveryDelay >= 0) { 3 producer.setDeliveryDelay(this.deliveryDelay); 4 } 5 if (isExplicitQosEnabled()) { 6 producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); 7 } 8 else { 9 producer.send(message); 10 } 11 }
最終的目標還是通過MessageProducer的send來發送訊息。
(二)JmsTemplate接收訊息
在上面使用示例中,使用JmsTemplate接收訊息的函式為:TextMessage msg = (TextMessage) jmsTemplate.receive(destination);我們進入jmsTemplate類的receive函式:
1 public Message receive(Destination destination) throws JmsException { 2 return receiveSelected(destination, null); 3 }
繼續進入JmsTemplate類的receiveSelected函式:
1 public Message receiveSelected(final Destination destination, @Nullable final String messageSelector) throws JmsException { 2 return execute(session -> doReceive(session, destination, messageSelector), true); 3 }
繼續進入JmsTemplate類的doReceive函式:
1 protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector) 2 throws JMSException { 3 4 return doReceive(session, createConsumer(session, destination, messageSelector)); 5 }
1 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { 2 try { 3 // Use transaction timeout (if available). 4 long timeout = getReceiveTimeout(); 5 ConnectionFactory connectionFactory = getConnectionFactory(); 6 JmsResourceHolder resourceHolder = null; 7 if (connectionFactory != null) { 8 resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 9 } 10 if (resourceHolder != null && resourceHolder.hasTimeout()) { 11 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); 12 } 13 Message message = receiveFromConsumer(consumer, timeout); 14 if (session.getTransacted()) { 15 // Commit necessary - but avoid commit call within a JTA transaction. 16 if (isSessionLocallyTransacted(session)) { 17 // Transacted session created by this template -> commit. 18 JmsUtils.commitIfNecessary(session); 19 } 20 } 21 else if (isClientAcknowledge(session)) { 22 // Manually acknowledge message, if any. 23 if (message != null) { 24 message.acknowledge(); 25 } 26 } 27 return message; 28 } 29 finally { 30 JmsUtils.closeMessageConsumer(consumer); 31 } 32 }
其中程式碼Message message = receiveFromConsumer(consumer, timeout);中的receiveFromConsumer函式在JmsDestinationAccessor類(package org.springframework.jms.support.destination,JmsTemplate的父類)中定義,我們進入檢視原始碼:
1 protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException { 2 if (timeout > 0) { 3 return consumer.receive(timeout); 4 } 5 else if (timeout < 0) { 6 return consumer.receiveNoWait(); 7 } 8 else { 9 return consumer.receive(); 10 } 11 }
實現的方式和傳送相似,使用execute函式來封裝冗餘的公共操作,包括建立MessageConsumer,而最終的目標還是通過MessageConsumer(javax.jms.MessageConsumer包中)的receive來接收訊息。
(三)訊息監聽器
訊息監聽器容器是一個特殊的bean,一旦有訊息到達就可以獲取訊息,並通過呼叫onMessage()方法將訊息傳遞給一個訊息監聽器MessageListener。Spring提供了兩種訊息監聽器容器:
SimpleMessageListenerContainer(package org.springframework.jms.listener):最簡單的訊息監聽器容器,只能處理固定數量的JMS會話,且不支援事務。
DefaultMessageListenerContainer(package org.springframework.jms.listener):這個訊息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事務的支援。
下面以DefaultMessageListenerContainer為例進行分析。在上面訊息監聽器的使用示例中,需要在配置檔案中註冊訊息監聽器容器,並將訊息監聽器注入到訊息監聽器容器中。我們只有把自定義的訊息監聽器注入到訊息監聽器容器中,容器才會把訊息轉給訊息監聽器進行處理。
DefaultMessageListenerContainer類的繼承關係如下:
DefaultMessageListenerContainer
-- AbstractPollingMessageListenerContainer
-- AbstractMessageListenerContainer
-- AbstractJmsListeningContainer
-- JmsDestinationAccessor
-- JmsAccessor
-- InitializingBean
-- BeanNameAware
-- DisposableBean
-- SmartLifecycle
-- MessageListenerContainer
-- SmartLifecycle
我們看到DefaultMessageListenerContainer類實現了InitializingBean介面,InitializingBean介面為bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是繼承該介面的類,在初始化bean的時候都會執行該方法。也就是說spring初始化bean的時候,如果該bean實現了InitializingBean介面,會自動呼叫afterPropertiesSet方法。DefaultMessageListenerContainer在其父類AbstractJmsListeningContainer中實現了該方法:
1 public void afterPropertiesSet() { 2 //驗證connectionFactory 3 super.afterPropertiesSet(); 4 //驗證配置檔案 5 validateConfiguration(); 6 初始化 7 initialize(); 8 }
DefaultMessageListenerContainer監聽器容器的初始化中包含了三句程式碼,前兩句用於屬性驗證,比如connectionFactory或者destination等屬性是否為空等,而真正用於初始化的操作委託在initialize函式中執行:
1 public void initialize() throws JmsException { 2 try { 3 //lifecycleMonitor用於控制生命週期的同步處理 4 synchronized (this.lifecycleMonitor) { 5 this.active = true; 6 this.lifecycleMonitor.notifyAll(); 7 } 8 doInitialize(); 9 } 10 catch (JMSException ex) { 11 synchronized (this.sharedConnectionMonitor) { 12 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); 13 this.sharedConnection = null; 14 } 15 throw convertJmsAccessException(ex); 16 } 17 }
函式中呼叫了該類的抽象方法doInitialize,該函式實際在其子類DefaultMessageListenerContainer中實現(父類呼叫抽象方法,該抽象方法由子類實現):
1 protected void doInitialize() throws JMSException { 2 synchronized (this.lifecycleMonitor) { 3 for (int i = 0; i < this.concurrentConsumers; i++) { 4 scheduleNewInvoker(); 5 } 6 } 7 }
concurrentConsumers設定的是對每個listener在初始化的時候設定的併發消費者的個數,因為在spring中messageListener例項是單例的,spring-jms不能自作主張的建立多個messageListener例項來併發消費。所以spring在內部,建立了多個MessageConsumer例項,並使用consumer.receive()方法以阻塞的方式來獲取訊息,當獲取訊息後,再執行messageListener.onMessage()方法。concurrentConsumers屬性就是為了指定spring內部可以建立MessageConsumer的最大個數,當messageConsumer例項被建立後,將會封裝在一個Runner介面並交給taskExecutor來排程;如果consumer在一直沒有收到訊息,則會被置為“idle”並從consumer列表中移除;如果所有的consumer都處於active狀態,則會建立新的consumer例項直到達到maxConcurrentConsumers個數上限。通常taskExecutor的執行緒池容量稍大於concurrentConsumer。
我們繼續上述原始碼,doInitialize函式中呼叫了本類DefaultMessageListenerContainer中的scheduleNewInvoker方法:
1 private void scheduleNewInvoker() { 2 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); 3 if (rescheduleTaskIfNecessary(invoker)) { 4 // This should always be true, since we're only calling this when active. 5 this.scheduledInvokers.add(invoker); 6 } 7 }
其中呼叫了父類AbstractJmsListeningContainer(package org.springframework.jms.listener)的rescheduleTaskIfNecessary方法:
1 protected final boolean rescheduleTaskIfNecessary(Object task) { 2 if (this.running) { 3 try { 4 doRescheduleTask(task); 5 } 6 catch (RuntimeException ex) { 7 logRejectedTask(task, ex); 8 this.pausedTasks.add(task); 9 } 10 return true; 11 } 12 else if (this.active) { 13 this.pausedTasks.add(task); 14 return true; 15 } 16 else { 17 return false; 18 } 19 }
這裡需要注意的是,子類DefaultMessageListenerContainer呼叫了父類AbstractJmsListeningContainer的rescheduleTaskIfNecessary方法,rescheduleTaskIfNecessary方法又呼叫回子類DefaultMessageListenerContainer的方法doRescheduleTask,即鉤子方法。所以doRescheduleTask方法是在DefaultMessageListenerContainer中定義的。
1 protected void doRescheduleTask(Object task) { 2 Assert.state(this.taskExecutor != null, "No TaskExecutor available"); 3 this.taskExecutor.execute((Runnable) task); 4 }
doRescheduleTask函式其實是在開啟一個執行緒執行Runnable。Spring根據concurrentConsumer數量建立了對應數量的執行緒,而每一個執行緒都作為一個獨立的接收者在迴圈接收訊息。
現在回到DefaultMessageListenerContainer的scheduleNewInvoker方法。我們上面介紹過DefaultMessageListenerContainer建立了concurrentConsumers所指定個數的AsyncMessageListenerInvoker(實現了SchedulingAwareRunnable介面),並交給taskExecutor執行。我們重點關注AsyncMessageListenerInvoker類(該類是DefaultMessageListenerContainer的一個內部類)。它是作為一個Runnable去執行,我們看下其run方法:
1 public void run() { 2 //併發控制 3 synchronized (lifecycleMonitor) { 4 activeInvokerCount++; 5 lifecycleMonitor.notifyAll(); 6 } 7 boolean messageReceived = false; 8 try { 9 //根據每個任務設定的最大處理訊息數量而做不同的處理 10 //小於0預設為無限制,一直能接収訊息 11 if (maxMessagesPerTask < 0) { 12 messageReceived = executeOngoingLoop(); 13