? SimpleMessageListenerContainer:最簡單的消息監聽器容器,只能處理固定數量的JMS會話,且不支持事務。
? DefaultMessageListenerContainer:這個消息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事物的支持。
? serversession.ServerSessionMessage.ListenerContainer:這是功能最強大的消息監聽器,與DefaultMessageListenerContainer相同,它支持事務,但是它還允許動態地管理JMS會話。
public void afterPropertiesSet() { //驗證connectionFactorysuper.afterPropertiesSet(); //驗證配置文件 validateConfiguration(); //初始化 initialize(); }
public void initialize() throws JmsException { try { synchronized (this.lifecycleMonitor) {this.active = true; this.lifecycleMonitor.notifyAll(); } doInitialize(); } catch (JMSException ex) { synchronized (this.sharedConnectionMonitor) { ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); this.sharedConnection = null; } throw convertJmsAccessException(ex); } }
protected void doInitialize() throws JMSException { synchronized (this.lifecycleMonitor) { for (int i = 0; i < this.concurrentConsumers; i++) { scheduleNewInvoker(); } } }
private void scheduleNewInvoker() { AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); if (rescheduleTaskIfNecessary(invoker)) { // This should always be true, since we‘re only calling this when active. this.scheduledInvokers.add(invoker); } }
protected final boolean rescheduleTaskIfNecessary(Object task) { if (this.running) { try { doRescheduleTask(task); } catch (RuntimeException ex) { logRejectedTask(task, ex); this.pausedTasks.add(task); } return true; } else if (this.active) { this.pausedTasks.add(task); return true; } else { return false; } }
public void run() { //並發控制 synchronized (lifecycleMonitor) { activeInvokerCount++; lifecycleMonitor.notifyAll(); } boolean messageReceived = false; try { //根據每個任務設置的最大處理消息數量而做不同處理,小於0默認為是無限制,一致接收消息 if (maxMessagesPerTask < 0) { messageReceived = executeOngoingLoop(); } else { int messageCount = 0; //消息數量控制,一旦超出數量則停止循環 while (isRunning() && messageCount < maxMessagesPerTask) { messageReceived = (invokeListener() || messageReceived); messageCount++; } } } catch (Throwable ex) { //清理操作,包括關閉Session等 clearResources(); if (!this.lastMessageSucceeded) { // We failed more than once in a row or on startup - // wait before first recovery attempt. waitBeforeRecoveryAttempt(); } this.lastMessageSucceeded = false; boolean alreadyRecovered = false; synchronized (recoveryMonitor) { if (this.lastRecoveryMarker == currentRecoveryMarker) { handleListenerSetupFailure(ex, false); recoverAfterListenerSetupFailure(); currentRecoveryMarker = new Object(); } else { alreadyRecovered = true; } } if (alreadyRecovered) { handleListenerSetupFailure(ex, true); } } finally { synchronized (lifecycleMonitor) { decreaseActiveInvokerCount(); lifecycleMonitor.notifyAll(); } if (!messageReceived) { this.idleTaskExecutionCount++; } else { this.idleTaskExecutionCount = 0; } synchronized (lifecycleMonitor) { if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { // We‘re shutting down completely. scheduledInvokers.remove(this); if (logger.isDebugEnabled()) { logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); } lifecycleMonitor.notifyAll(); clearResources(); } else if (isRunning()) { int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); if (nonPausedConsumers < 1) { logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + "Check your thread pool configuration! Manual recovery necessary through a start() call."); } else if (nonPausedConsumers < getConcurrentConsumers()) { logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + "to be triggered by remaining consumers."); } } } } }run
private boolean executeOngoingLoop() throws JMSException { boolean messageReceived = false; boolean active = true; while (active) { synchronized (lifecycleMonitor) { boolean interrupted = false; boolean wasWaiting = false; //如果當前任務已經處於激活狀態但是卻給了暫時中止的命令 while ((active = isActive()) && !isRunning()) { if (interrupted) { throw new IllegalStateException("Thread was interrupted while waiting for " + "a restart of the listener container, but container is still stopped"); } if (!wasWaiting) { //如果並非處於等待狀態則說明第一次執行,需要將激活的任務數量減少 decreaseActiveInvokerCount(); } //開始進入等待狀態,等待任務的恢復命令 wasWaiting = true; try { //通過wait等待,也就是等待notify或者notifyAll lifecycleMonitor.wait(); } catch (InterruptedException ex) { // Re-interrupt current thread, to allow other threads to react. Thread.currentThread().interrupt(); interrupted = true; } } if (wasWaiting) { activeInvokerCount++; } if (scheduledInvokers.size() > maxConcurrentConsumers) { active = false; } } //正常處理流程 if (active) { messageReceived = (invokeListener() || messageReceived); } } return messageReceived; }
private boolean invokeListener() throws JMSException { this.currentReceiveThread = Thread.currentThread(); try { //初始化資源包括首次創建的時候創建Session與Consumer initResourcesIfNecessary(); boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); //改變標誌位,信息成功處理 this.lastMessageSucceeded = true; return messageReceived; } finally { this.currentReceiveThread = null; } }
protected boolean receiveAndExecute( Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) throws JMSException { if (this.transactionManager != null) { // Execute receive within transaction. TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition); boolean messageReceived; try { messageReceived = doReceiveAndExecute(invoker, session, consumer, status); } catch (JMSException | RuntimeException | Error ex) { rollbackOnException(this.transactionManager, status, ex); throw ex; } this.transactionManager.commit(status); return messageReceived; } else { // Execute receive outside of transaction. return doReceiveAndExecute(invoker, session, consumer, null); } }
protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException { Connection conToClose = null; Session sessionToClose = null; MessageConsumer consumerToClose = null; try { Session sessionToUse = session; boolean transactional = false; if (sessionToUse == null) { sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( obtainConnectionFactory(), this.transactionalResourceFactory, true); transactional = (sessionToUse != null); } if (sessionToUse == null) { Connection conToUse; if (sharedConnectionEnabled()) { conToUse = getSharedConnection(); } else { conToUse = createConnection(); conToClose = conToUse; conToUse.start(); } sessionToUse = createSession(conToUse); sessionToClose = sessionToUse; } MessageConsumer consumerToUse = consumer; if (consumerToUse == null) { consumerToUse = createListenerConsumer(sessionToUse); consumerToClose = consumerToUse; } //接收消息 Message message = receiveMessage(consumerToUse); if (message != null) { if (logger.isDebugEnabled()) { logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + sessionToUse + "]"); } //模板方法,當消息接收且未處理前給子類機會做相應處理 messageReceived(invoker, sessionToUse); boolean exposeResource = (!transactional && isExposeListenerSession() && !TransactionSynchronizationManager.hasResource(obtainConnectionFactory())); if (exposeResource) { TransactionSynchronizationManager.bindResource( obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); } try { //激活監聽器 doExecuteListener(sessionToUse, message); } catch (Throwable ex) { if (status != null) { if (logger.isDebugEnabled()) { logger.debug("Rolling back transaction because of listener exception thrown: " + ex); } status.setRollbackOnly(); } handleListenerException(ex); // Rethrow JMSException to indicate an infrastructure problem // that may have to trigger recovery... if (ex instanceof JMSException) { throw (JMSException) ex; } } finally { if (exposeResource) { TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); } } // Indicate that a message has been received. return true; } else { if (logger.isTraceEnabled()) { logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + sessionToUse + "] did not receive a message"); } //接收到空消息的處理 noMessageReceived(invoker, sessionToUse); // Nevertheless call commit, in order to reset the transaction timeout (if any). if (shouldCommitAfterNoMessageReceived(sessionToUse)) { commitIfNecessary(sessionToUse, null); } // Indicate that no message has been received. return false; } } finally { JmsUtils.closeMessageConsumer(consumerToClose); JmsUtils.closeSession(sessionToClose); ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); } }doReceiveAndExecute
protected void doExecuteListener(Session session, Message message) throws JMSException { if (!isAcceptMessagesWhileStopping() && !isRunning()) { if (logger.isWarnEnabled()) { logger.warn("Rejecting received message because of the listener container " + "having been stopped in the meantime: " + message); } rollbackIfNecessary(session); throw new MessageRejectedWhileStoppingException(); } try { invokeListener(session, message); } catch (JMSException | RuntimeException | Error ex) { rollbackOnExceptionIfNecessary(session, ex); throw ex; } commitIfNecessary(session, message); }
protected void invokeListener(Session session, Message message) throws JMSException { Object listener = getMessageListener(); if (listener instanceof SessionAwareMessageListener) { doInvokeListener((SessionAwareMessageListener) listener, session, message); } else if (listener instanceof MessageListener) { doInvokeListener((MessageListener) listener, message); } else if (listener != null) { throw new IllegalArgumentException( "Only MessageListener and SessionAwareMessageListener supported: " + listener); } else { throw new IllegalStateException("No message listener specified - see property ‘messageListener‘"); } }
通過層層調用,最終提取監聽器並使用invokeListener激活了監聽器,也就是激活了用戶自定義的監聽器邏輯。這裏還有一句代碼commitIfNecessary(session, message),完成的功能是session.commit()。完成消息服務的事務提交,涉及兩個事務,我們常說的DefaultMessageListenerContainer增加了對事務的支持,是通用的事務,也就是說我們在消息接收過程中如果產生其他操作,比如向數據庫中插入數據,一旦出現異常時就需要全部回滾,也包括回滾插入數據庫中的數據。但是除了我們常說的事務之外,對於消息本身還有一個事務,當接收一個消息的時候,必須使用事務提交的方式,這是在告訴消息服務器本地已經正常接收消息,消息服務器接收到本地的事務提交後便可以將此消息刪除,否則,當前消息會被其他接收者重新接收。
參考:《Spring源碼深度解析》 郝佳 編著;