1. 程式人生 > >Spring----監聽器容器

Spring----監聽器容器

first The move 得到 abstract ive mit .com 消息發送

消息監聽器容器是一個用於查看JMS目標等待消息到達的特殊的bean,一旦消息到達它就可以獲取到消息,並通過調用onMessage方法將消息傳遞一個MessageListener實現。Spring中消息監聽器容器的類型如下:

  ? SimpleMessageListenerContainer:最簡單的消息監聽器容器,只能處理固定數量的JMS會話,且不支持事務。

  ? DefaultMessageListenerContainer:這個消息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事物的支持。

  ? serversession.ServerSessionMessage.ListenerContainer:這是功能最強大的消息監聽器,與DefaultMessageListenerContainer相同,它支持事務,但是它還允許動態地管理JMS會話。

下面以DefaultMessageListenerContainer為例進行分析,看看消息監聽器容器的實現。使用消息監聽器容器時一定要將自定義的消息監聽器置於到容器中,這樣才可以在收到消息時,容器把消息轉向監聽器處理。下面看一下它的類圖:

技術分享圖片

同樣我們看到了此類實現了InitializingBean接口,按照以往的風格我們還是首先查看接口方法afterPropertiesSet中的邏輯,其方法的實現在其父類AbstractJmsListeningContainer中。

public void afterPropertiesSet() {
        //驗證connectionFactory
super.afterPropertiesSet(); //驗證配置文件 validateConfiguration(); //初始化 initialize(); }

監聽器容器的初始化只包含了三句代碼,其中前兩句只用於屬性的驗證,而真正用於初始化的操作委托在initialize中執行。

public void initialize() throws JmsException {
        try {
            synchronized (this.lifecycleMonitor) {
                
this.active = true; this.lifecycleMonitor.notifyAll(); } doInitialize(); } catch (JMSException ex) { synchronized (this.sharedConnectionMonitor) { ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); this.sharedConnection = null; } throw convertJmsAccessException(ex); } }
protected void doInitialize() throws JMSException {
        synchronized (this.lifecycleMonitor) {
            for (int i = 0; i < this.concurrentConsumers; i++) {
                scheduleNewInvoker();
            }
        }
    }

這裏用到了concurrentConsumers屬性,對於此屬性的說明如下:

  消息監聽器允許創建多個Session和MessageConsumer來接收消息。具體的個數由concurrentConsumers屬性指定。需要註意的是,應該只是在Destination為Queue的時候才使用多個MessageConsumer(Queue中的一個消息只能被一個Consumer接收),雖然使用多個MessageConsumer會提高消息的處理性能,但是消息處理的順序不能得到保證。消息被接收的順序仍然是消息發送時的順序,但是由於消息可能被並發處理,因此,消息的處理順序可能和消息發送順序不同,此外,不應該在Destination為Topic的時候使用多個MessageConsumer,因為多個MessageConsumer會接收到同樣的消息。

對於具體的實現邏輯我們只能繼續查看源碼:

private void scheduleNewInvoker() {
        AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
        if (rescheduleTaskIfNecessary(invoker)) {
            // This should always be true, since we‘re only calling this when active.
            this.scheduledInvokers.add(invoker);
        }
    }
protected final boolean rescheduleTaskIfNecessary(Object task) {
        if (this.running) {
            try {
                doRescheduleTask(task);
            }
            catch (RuntimeException ex) {
                logRejectedTask(task, ex);
                this.pausedTasks.add(task);
            }
            return true;
        }
        else if (this.active) {
            this.pausedTasks.add(task);
            return true;
        }
        else {
            return false;
        }
    }

分析源碼得知,根據concurrentConsumers數量建立了對應數量的線程,而每個線程都作為一個獨立的接收者在循環接收消息。

反向追蹤rescheduleTaskIfNecessary傳入的參數invoker,發現這個參數是AsyncMessageListenerInvoker類型的,於是我們把焦點轉向這個類的實現,由於它是作為一個Runnable角色去執行,所以我們從這個類的分析從run方法開始。

技術分享圖片
public void run() {
            //並發控制
            synchronized (lifecycleMonitor) {
                activeInvokerCount++;
                lifecycleMonitor.notifyAll();
            }
            boolean messageReceived = false;
            try {
                //根據每個任務設置的最大處理消息數量而做不同處理,小於0默認為是無限制,一致接收消息
                if (maxMessagesPerTask < 0) {
                    messageReceived = executeOngoingLoop();
                }
                else {
                    int messageCount = 0;
                    //消息數量控制,一旦超出數量則停止循環
                    while (isRunning() && messageCount < maxMessagesPerTask) {
                        messageReceived = (invokeListener() || messageReceived);
                        messageCount++;
                    }
                }
            }
            catch (Throwable ex) {
                //清理操作,包括關閉Session等
                clearResources();
                if (!this.lastMessageSucceeded) {
                    // We failed more than once in a row or on startup -
                    // wait before first recovery attempt.
                    waitBeforeRecoveryAttempt();
                }
                this.lastMessageSucceeded = false;
                boolean alreadyRecovered = false;
                synchronized (recoveryMonitor) {
                    if (this.lastRecoveryMarker == currentRecoveryMarker) {
                        handleListenerSetupFailure(ex, false);
                        recoverAfterListenerSetupFailure();
                        currentRecoveryMarker = new Object();
                    }
                    else {
                        alreadyRecovered = true;
                    }
                }
                if (alreadyRecovered) {
                    handleListenerSetupFailure(ex, true);
                }
            }
            finally {
                synchronized (lifecycleMonitor) {
                    decreaseActiveInvokerCount();
                    lifecycleMonitor.notifyAll();
                }
                if (!messageReceived) {
                    this.idleTaskExecutionCount++;
                }
                else {
                    this.idleTaskExecutionCount = 0;
                }
                synchronized (lifecycleMonitor) {
                    if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
                        // We‘re shutting down completely.
                        scheduledInvokers.remove(this);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
                        }
                        lifecycleMonitor.notifyAll();
                        clearResources();
                    }
                    else if (isRunning()) {
                        int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
                        if (nonPausedConsumers < 1) {
                            logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
                                    "Check your thread pool configuration! Manual recovery necessary through a start() call.");
                        }
                        else if (nonPausedConsumers < getConcurrentConsumers()) {
                            logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
                                    "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
                                    "to be triggered by remaining consumers.");
                        }
                    }
                }
            }
        }
run

以上函數主要根據變量maxMessagePerTask的值來分為不同的情況處理,當然,函數中還使用了大量的代碼異常處理機制的數據維護。

其實核心的處理就是調用invokeListener來接收消息並激活消息監聽器,但是之所以兩種情況分開處理,正是考慮到了在無限制的循環接收消息的情況下,用戶可以設置標誌位running來控制消息的接收的暫停與恢復,並維護當前消息監聽器的數量。

private boolean executeOngoingLoop() throws JMSException {
            boolean messageReceived = false;
            boolean active = true;
            while (active) {
                synchronized (lifecycleMonitor) {
                    boolean interrupted = false;
                    boolean wasWaiting = false;
                    //如果當前任務已經處於激活狀態但是卻給了暫時中止的命令
                    while ((active = isActive()) && !isRunning()) {
                        if (interrupted) {
                            throw new IllegalStateException("Thread was interrupted while waiting for " +
                                    "a restart of the listener container, but container is still stopped");
                        }
                        if (!wasWaiting) {
                            //如果並非處於等待狀態則說明第一次執行,需要將激活的任務數量減少
                            decreaseActiveInvokerCount();
                        }
                        //開始進入等待狀態,等待任務的恢復命令
                        wasWaiting = true;
                        try {
                            //通過wait等待,也就是等待notify或者notifyAll
                            lifecycleMonitor.wait();
                        }
                        catch (InterruptedException ex) {
                            // Re-interrupt current thread, to allow other threads to react.
                            Thread.currentThread().interrupt();
                            interrupted = true;
                        }
                    }
                    if (wasWaiting) {
                        activeInvokerCount++;
                    }
                    if (scheduledInvokers.size() > maxConcurrentConsumers) {
                        active = false;
                    }
                }
                //正常處理流程
                if (active) {
                    messageReceived = (invokeListener() || messageReceived);
                }
            }
            return messageReceived;
        }

如果按照正常的流程其實是不會進入while循環的,而是直接進入函數invokeListener來接收消息並激活監聽器,但是,我們不可能讓循環一直持續下去,我們要考慮到暫停線程或者恢復線程的情況,這時,isRunning函數就派上用場了。

isRunning用來檢測標誌位this.running狀態而判斷是否需要進入while循環。由於要維護當前線程的激活數量,所以引入了wasWaiting變量,用來判斷線程是否處理等待狀態。如果線程首次進入等待狀態,則需要減少線程激活數量計數器。

當然,還有個地方需要提一下,就是線程等待不是一味的采用while循環來控制,因為如果單純的采用while循環會浪費CPU的始終周期,給資源造成巨大的浪費。這裏,Spring采用的是全局控制變量LifecycleMonitor的wait方法來暫停線程,所以,如果終止線程需要再次恢復的話,除了更改this.running標誌位外,還需要調用LifecycleMonitor.notify或者LifecycleMonitor.notifyAll來使得線程恢復。

接下來就是消息接收的處理了。

private boolean invokeListener() throws JMSException {
            this.currentReceiveThread = Thread.currentThread();
            try {
                //初始化資源包括首次創建的時候創建Session與Consumer
                initResourcesIfNecessary();
                boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
                //改變標誌位,信息成功處理
                this.lastMessageSucceeded = true;
                return messageReceived;
            }
            finally {
                this.currentReceiveThread = null;
            }
        }
protected boolean receiveAndExecute(
            Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer)
            throws JMSException {

        if (this.transactionManager != null) {
            // Execute receive within transaction.
            TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
            boolean messageReceived;
            try {
                messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
            }
            catch (JMSException | RuntimeException | Error ex) {
                rollbackOnException(this.transactionManager, status, ex);
                throw ex;
            }
            this.transactionManager.commit(status);
            return messageReceived;
        }

        else {
            // Execute receive outside of transaction.
            return doReceiveAndExecute(invoker, session, consumer, null);
        }
    }

由於DefaultMessageListenerContainer消息監聽器容器建立在SimpleMessageListenerContainer容器之上,添加了對事務的支持,那麽此時,事務特性已經開始了。如果用戶設置了this.transcationManager,也就是配置了事務,那麽消息的接收會被控制在事務之內,一旦出現任何異常都會回滾,而回滾操作也會交於事務管理器統一處理。

doReceiveAndExecute包含了整個消息的接收處理過程,由於參雜著事務,所以並沒有復用模板中的方法。

技術分享圖片
protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session,
            @Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException {

        Connection conToClose = null;
        Session sessionToClose = null;
        MessageConsumer consumerToClose = null;
        try {
            Session sessionToUse = session;
            boolean transactional = false;
            if (sessionToUse == null) {
                sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
                        obtainConnectionFactory(), this.transactionalResourceFactory, true);
                transactional = (sessionToUse != null);
            }
            if (sessionToUse == null) {
                Connection conToUse;
                if (sharedConnectionEnabled()) {
                    conToUse = getSharedConnection();
                }
                else {
                    conToUse = createConnection();
                    conToClose = conToUse;
                    conToUse.start();
                }
                sessionToUse = createSession(conToUse);
                sessionToClose = sessionToUse;
            }
            MessageConsumer consumerToUse = consumer;
            if (consumerToUse == null) {
                consumerToUse = createListenerConsumer(sessionToUse);
                consumerToClose = consumerToUse;
            }
            //接收消息
            Message message = receiveMessage(consumerToUse);
            if (message != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
                            consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
                            sessionToUse + "]");
                }
                //模板方法,當消息接收且未處理前給子類機會做相應處理
                messageReceived(invoker, sessionToUse);
                boolean exposeResource = (!transactional && isExposeListenerSession() &&
                        !TransactionSynchronizationManager.hasResource(obtainConnectionFactory()));
                if (exposeResource) {
                    TransactionSynchronizationManager.bindResource(
                            obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
                }
                try {
                    //激活監聽器
                    doExecuteListener(sessionToUse, message);
                }
                catch (Throwable ex) {
                    if (status != null) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
                        }
                        status.setRollbackOnly();
                    }
                    handleListenerException(ex);
                    // Rethrow JMSException to indicate an infrastructure problem
                    // that may have to trigger recovery...
                    if (ex instanceof JMSException) {
                        throw (JMSException) ex;
                    }
                }
                finally {
                    if (exposeResource) {
                        TransactionSynchronizationManager.unbindResource(obtainConnectionFactory());
                    }
                }
                // Indicate that a message has been received.
                return true;
            }
            else {
                if (logger.isTraceEnabled()) {
                    logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
                            "session [" + sessionToUse + "] did not receive a message");
                }
                //接收到空消息的處理
                noMessageReceived(invoker, sessionToUse);
                // Nevertheless call commit, in order to reset the transaction timeout (if any).
                if (shouldCommitAfterNoMessageReceived(sessionToUse)) {
                    commitIfNecessary(sessionToUse, null);
                }
                // Indicate that no message has been received.
                return false;
            }
        }
        finally {
            JmsUtils.closeMessageConsumer(consumerToClose);
            JmsUtils.closeSession(sessionToClose);
            ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
        }
    }
doReceiveAndExecute

上述函數代碼看似復雜,但是真正的邏輯不多,大多是固定的套路,而我們最關心就是監聽器的激活。

protected void doExecuteListener(Session session, Message message) throws JMSException {
        if (!isAcceptMessagesWhileStopping() && !isRunning()) {
            if (logger.isWarnEnabled()) {
                logger.warn("Rejecting received message because of the listener container " +
                        "having been stopped in the meantime: " + message);
            }
            rollbackIfNecessary(session);
            throw new MessageRejectedWhileStoppingException();
        }

        try {
            invokeListener(session, message);
        }
        catch (JMSException | RuntimeException | Error ex) {
            rollbackOnExceptionIfNecessary(session, ex);
            throw ex;
        }
        commitIfNecessary(session, message);
    }
protected void invokeListener(Session session, Message message) throws JMSException {
        Object listener = getMessageListener();

        if (listener instanceof SessionAwareMessageListener) {
            doInvokeListener((SessionAwareMessageListener) listener, session, message);
        }
        else if (listener instanceof MessageListener) {
            doInvokeListener((MessageListener) listener, message);
        }
        else if (listener != null) {
            throw new IllegalArgumentException(
                    "Only MessageListener and SessionAwareMessageListener supported: " + listener);
        }
        else {
            throw new IllegalStateException("No message listener specified - see property ‘messageListener‘");
        }
    }

通過層層調用,最終提取監聽器並使用invokeListener激活了監聽器,也就是激活了用戶自定義的監聽器邏輯。這裏還有一句代碼commitIfNecessary(session, message),完成的功能是session.commit()。完成消息服務的事務提交,涉及兩個事務,我們常說的DefaultMessageListenerContainer增加了對事務的支持,是通用的事務,也就是說我們在消息接收過程中如果產生其他操作,比如向數據庫中插入數據,一旦出現異常時就需要全部回滾,也包括回滾插入數據庫中的數據。但是除了我們常說的事務之外,對於消息本身還有一個事務,當接收一個消息的時候,必須使用事務提交的方式,這是在告訴消息服務器本地已經正常接收消息,消息服務器接收到本地的事務提交後便可以將此消息刪除,否則,當前消息會被其他接收者重新接收。

參考:《Spring源碼深度解析》 郝佳 編著;

Spring----監聽器容器