1. 程式人生 > >分散式訊息通訊 ActiveMQ 原理 分析二

分散式訊息通訊 ActiveMQ 原理 分析二

本章重點:

1. unconsumedMessage 原始碼分析 
2. 消費端的 PrefetchSize 
3. 訊息的確認過程 
4. 訊息重發機制 
5. ActiveMQ 多節點高效能方案

訊息消費流程圖

unconsumedMessages資料的獲取過程

那我們來看看 ActiveMQConnectionFactory. createConnection 裡面做了什麼 事情。 

  1. 動態建立一個傳輸協議 
  2. 建立一個連線 
  3.  通過 transport.start() 
 protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        }
        ActiveMQConnection connection = null;
        try {
            Transport transport = createTransport();
            connection = createActiveMQConnection(transport, factoryStats);

            connection.setUserName(userName);
            connection.setPassword(password);

            configureConnection(connection);

            transport.start();

            if (clientID != null) {
                connection.setDefaultClientID(clientID);
            }

            return connection;
        } catch (JMSException e) {
            // Clean up!
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
            throw e;
        } catch (Exception e) {
            // Clean up!
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
        }
    }

transport.start()

在之前文章中,我已經分析了,其實transport是一個鏈式呼叫,是一個多層包裝的物件。

ResponseCorrelator(MutexTransport(WireFormatNegotiator(InactivityMonitor(TcpTransport()))) 
最終呼叫 TcpTransport.start()方法,然而這個類中並沒有 start,而是在父類ServiceSupport.start()中。

 public void start() throws Exception {
        if (started.compareAndSet(false, true)) {
            boolean success = false;
            stopped.set(false);
            try {
                preStart();
                doStart();
                success = true;
            } finally {
                started.set(success);
            }
            for(ServiceListener l:this.serviceListeners) {
                l.started(this);
            }
        }
    }

這塊程式碼看起來就比較熟悉了,我們之前看過的中介軟體的原始碼,通訊層都是獨 立來實現及解耦的。而 ActiveMQ 也是一樣,提供了 Transport 介面和TransportSupport 類。

這個介面的主要作用是為了讓客戶端有訊息被非同步傳送、同步傳送和被消費的能力。

接下來沿著 doStart()往下看,又呼叫TcpTransport.doStart() ,接著通過 super.doStart(),呼叫TransportThreadSupport.doStart(). 建立了一個執行緒,傳入的是 this,呼叫子類的 run 方法,也就是 TcpTransport.run()。

TcpTransport.run 

run 方法主要是從 socket 中讀取資料包,只要 TcpTransport 沒有停止,它就會不斷去呼叫 doRun。

@Override
    public void run() {
        LOG.trace("TCP consumer thread for " + this + " starting");
        this.runnerThread=Thread.currentThread();
        try {
            while (!isStopped()) {
                doRun();
            }
        } catch (IOException e) {
            stoppedLatch.get().countDown();
            onException(e);
        } catch (Throwable e){
            stoppedLatch.get().countDown();
            IOException ioe=new IOException("Unexpected error occurred: " + e);
            ioe.initCause(e);
            onException(ioe);
        }finally {
            stoppedLatch.get().countDown();
        }
    }

TcpTransport.run 
run 方法主要是從 socket 中讀取資料包,只要 TcpTransport 沒有停止,它就會不斷去呼叫 doRun。

protected void doRun() throws IOException {
        try {
            Object command = readCommand();
            doConsume(command);
        } catch (SocketTimeoutException e) {
        } catch (InterruptedIOException e) {
        }
    }

TcpTransport.readCommand 
這裡面,通過 wireFormat 對資料進行格式化,可以認為這是一個反序列化過程。wireFormat 預設實現是 OpenWireFormat,activeMQ 自定義的跨語言的
wire 協議 。

 protected Object readCommand() throws IOException {
        return wireFormat.unmarshal(dataIn);
    }

分析到這,我們差不多明白了傳輸層的主要工作是獲得資料並且把資料轉換為物件,再把物件物件傳給 ActiveMQConnection 

TransportSupport.doConsume 
TransportSupport 類中最重要的方法是 doConsume,它的作用就是用來“消費訊息” 。

    public void doConsume(Object command) {
        if (command != null) {
            if (transportListener != null) {
                transportListener.onCommand(command);
            } else {
                LOG.error("No transportListener available to process inbound command: " + command);
            }
        }
    }

TransportSupport 類中唯一的成員變數是 TransportListener ,這也意味著一個 Transport 支援類繫結一個傳送監聽器類,傳送監聽器介面TransportListener 最重要的方法就是 void  onCommand(Object command);,它用來處理命令, 這個 transportListener 是在哪裡賦值的呢?再回到 ActiveMQConnection 的構造方法中。傳遞了 ActiveMQConnection 自己本身,(ActiveMQConnection 是TransportListener 介面的實現類之一) 於是,訊息就這樣從傳送層到達了我們的連線層上。

 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
        
        //繫結傳輸物件
        this.transport = transport;
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;

        // Configure a single threaded executor who's core thread can timeout if
        // idle
        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
                //thread.setDaemon(true);
                return thread;
            }
        });
        // asyncConnectionThread.allowCoreThreadTimeOut(true);
        String uniqueId = connectionIdGenerator.generateId();
        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
        this.info.setManageable(true);
        this.info.setFaultTolerant(transport.isFaultTolerant());
        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

        //transport繫結為自己
        this.transport.setTransportListener(this);

        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        this.timeCreated = System.currentTimeMillis();
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }

從建構函式可以看出,建立 ActiveMQConnection 物件時,除了和 Transport相互繫結,還對執行緒池執行器 executor 進行了初始化。下面我們看看該類的核 心方法 。

onCommand 
這裡面會針對不同的訊息做分發,比如傳入的 command 是MessageDispatch,那麼這個 command 的 visit 方法就會呼叫processMessageDispatch 方法 

public void onCommand(final Object o) {
        final Command command = (Command)o;
        if (!closed.get() && command != null) {
            try {
                command.visit(new CommandVisitorAdapter() {
                    @Override
                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
                        //等待Transport中斷處理完成
                        waitForTransportInterruptionProcessingToComplete();
                        //這裡通過消費者id獲取要分發的消費者物件
                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
                        if (dispatcher != null) {
                            // Copy in case a embedded broker is dispatching via
                            // vm://
                            // md.getMessage() == null to signal end of queue
                            // browse.
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage(null);
                                md.setMessage(msg);
                            }
                            //呼叫會話ActiveMQSession自己的dispatch方法來處理這條訊息
                            dispatcher.dispatch(md);
                        } else {
                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
                        }
                        return null;
                    }
                     //如果傳入的是 ProducerAck,則呼叫的是下面這個方法,這裡我 們僅僅關注 MessageDispatch 
                       就行了 
                    @Override
                    public Response processProducerAck(ProducerAck pa) throws Exception {
                        if (pa != null && pa.getProducerId() != null) {
                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
                            if (producer != null) {
                                producer.onProducerAck(pa);
                            }
                        }
                        return null;
                    }

在現在這個場景中,我們只關注 processMessageDispatch 方法,在這個方法中,只是簡單的去呼叫 ActiveMQSession 的 dispatch 方法來處理訊息, 
tips: command.visit, 這裡使用了介面卡模式,如果 command 是一個MessageDispatch,那麼它就會呼叫 processMessageDispatch 方法,其他方
法他不會關心,程式碼如下:MessageDispatch.visit 。

@Override 
 
public Response visit(CommandVisitor visitor) throws Exception { 
 
   return visitor.processMessageDispatch(this); 
 
} 

ActiveMQSession.dispatch(md) 
executor 這個物件其實是一個成員物件 ActiveMQSessionExecutor,專門負 責來處理訊息分發 。

 public void dispatch(MessageDispatch messageDispatch) {
        try {
            executor.execute(messageDispatch);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            connection.onClientInternalException(e);
        }
    }

ActiveMQSessionExecutor.execute 
這個方法的核心功能就是處理訊息的分發。 

 void execute(MessageDispatch message) throws InterruptedException {

        if (!startedOrWarnedThatNotStarted) {

            ActiveMQConnection connection = session.connection;
            long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
            if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
                startedOrWarnedThatNotStarted = true;
            } else {
                long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();

                // lets only warn when a significant amount of time has passed
                // just in case its normal operation
                if (elapsedTime > aboutUnstartedConnectionTimeout) {
                    LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
                             + " Received: " + message);
                    startedOrWarnedThatNotStarted = true;
                }
            }
        }
   
        //如果會話不是非同步分發且沒有使用sessionpool分支,則呼叫dispatch傳送訊息
        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
            dispatch(message);
        } else {
            //將訊息直接放入到佇列裡
            messageQueue.enqueue(message);
            wakeup();
        }
    }

預設是採用非同步訊息分發。所以,直接呼叫 messageQueue.enqueue,把訊息放到佇列中,並且呼叫 wakeup 方法 。

非同步分發的流程 

    public void wakeup() {
        //進一步驗證
        if (!dispatchedBySessionPool) {
            //判斷session是否為非同步分發
            if (session.isSessionAsyncDispatch()) {
                try {
                    TaskRunner taskRunner = this.taskRunner;
                    if (taskRunner == null) {
                        synchronized (this) {
                            if (this.taskRunner == null) {
                                if (!isRunning()) {
                                    // stop has been called
                                    return;
                                }
                                //通過 TaskRunnerFactory 建立了一個任務執行類 taskRunner,這裡把自己作為一 
                                //個 task 傳入到 createTaskRunner 中,說明當前 
                                //的類一定是實現了 Task 介面的. 簡單來說,就是通過執行緒池去執行一個任務,完 成 
                                //非同步排程,簡單吧 
                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
                                        "ActiveMQ Session: " + session.getSessionId());
                            }
                            taskRunner = this.taskRunner;
                        }
                    }
                    taskRunner.wakeup();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } else {
                //同步分發
                while (iterate()) {
                }
            }
        }
    }

所以,對於非同步分發的方式,會呼叫 ActiveMQSessionExecutor 中的 iterate 方法,我們來看看這個方法的程式碼 。

iterate 
這個方法裡面做兩個事 

  1. 把消費者監聽的所有訊息轉存到待消費佇列中 
  2. 如果 messageQueue 還存在遺留訊息,同樣把訊息分發出去 
public boolean iterate() {

        // Deliver any messages queued on the consumer to their listeners.
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }

        // No messages left queued on the listeners.. so now dispatch messages
        // queued on the session
        MessageDispatch message = messageQueue.dequeueNoWait();
        if (message == null) {
            return false;
        } else {
            dispatch(message);
            return !messageQueue.isEmpty();
        }
    }

ActiveMQMessageConsumer.iterate 

 public boolean iterate() {
        MessageListener listener = this.messageListener.get();
        if (listener != null) {
            MessageDispatch md = unconsumedMessages.dequeueNoWait();
            if (md != null) {
                dispatch(md);
                return true;
            }
        }
        return false;
    }

同步分發的流程 
同步分發的流程,直接呼叫 ActiveMQSessionExcutor 中的 dispatch 方法,代 碼如下 

public void dispatch(MessageDispatch md) {
        MessageListener listener = this.messageListener.get();
        try {
            clearMessagesInProgress();
            clearDeliveredList();
            synchronized (unconsumedMessages.getMutex()) {
                if (!unconsumedMessages.isClosed()) {
                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
                        if (listener != null && unconsumedMessages.isRunning()) {
                            if (redeliveryExceeded(md)) {
                                posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
                                return;
                            }
                            ActiveMQMessage message = createActiveMQMessage(md);
                            beforeMessageIsConsumed(md);
                            try {
                                boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired();
                                if (!expired) {
                                    listener.onMessage(message);
                                }
                                afterMessageIsConsumed(md, expired);
                            } catch (RuntimeException e) {
                                LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
                                md.setRollbackCause(e);
                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
                                    // schedual redelivery and possible dlq processing
                                    rollback();
                                } else {
                                    // Transacted or Client ack: Deliver the next message.
                                    afterMessageIsConsumed(md, false);
                                }
                            }
                        } else {
                            if (!unconsumedMessages.isRunning()) {
                                // delayed redelivery, ensure it can be re delivered
                                session.connection.rollbackDuplicate(this, md.getMessage());
                            }

                            if (md.getMessage() == null) {
                                // End of browse or pull request timeout.
                                unconsumedMessages.enqueue(md);
                            } else {
                                if (!consumeExpiredMessage(md)) {
                                    unconsumedMessages.enqueue(md);
                                    if (availableListener != null) {
                                        availableListener.onMessageAvailable(this);
                                    }
                                } else {
                                    beforeMessageIsConsumed(md);
                                    afterMessageIsConsumed(md, true);

                                    // Pull consumer needs to check if pull timed out and send
                                    // a new pull command if not.
                                    if (info.getCurrentPrefetchSize() == 0) {
                                        unconsumedMessages.enqueue(null);
                                    }
                                }
                            }
                        }
                    } else {
                        // deal with duplicate delivery
                        ConsumerId consumerWithPendingTransaction;
                        if (redeliveryExpectedInCurrentTransaction(md, true)) {
                            LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
                            if (transactedIndividualAck) {
                                immediateIndividualTransactedAck(md);
                            } else {
                                session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
                            }
                        } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
                            session.getConnection().rollbackDuplicate(this, md.getMessage());
                            dispatch(md);
                        } else {
                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
                        }
                    }
                }
            }
            if (++dispatchedCount % 1000 == 0) {
                dispatchedCount = 0;
                Thread.yield();
            }
        } catch (Exception e) {
            session.connection.onClientInternalException(e);
        }
    }

 到這裡為止,訊息如何接受以及他的處理方式的流程,我們已經搞清楚了,希望對大家理解 activeMQ 的核心機制有一定的幫助 。

消費端的 PrefetchSize 
還記得我們在分析消費端的原始碼的時候,所講到的 prefetchsize 嗎?這個prefetchsize 是做什麼的?我們接下來去研究一下 

原理剖析 

  1. activemq 的 consumer 端也有視窗機制,通過 prefetchSize 就可以設定視窗大小。不同的型別的佇列,prefetchSize 的預設值也是不一樣的 持久化佇列和非持久化佇列的預設值為 1000 ,持久化 topic 預設值為 100 ,非持久化佇列的預設值為 Short.MAX_VALUE-1 。
  2. 通過上面的例子,我們基本上應該知道 prefetchSize 的作用了,消費端會根據prefetchSize 的大小批量獲取資料,比如預設值是 1000,那麼消費端會預先載入 1000 條資料到本地的記憶體中。 

prefetchSize 的設定方法 
在 createQueue 中新增 consumer.prefetchSize,就可以看到效果 Destination destination=session.createQueue("myQueue?consumer.prefetchSize=10"); 
既然有批量載入,那麼一定有批量確認,這樣才算是徹底的優化 
optimizeAcknowledge 

  1. ActiveMQ 提供了 optimizeAcknowledge 來優化確認,它表示是否開啟“優化ACK”,只有在為 true 的情況下,prefetchSize 以及optimizeAcknowledgeTimeout 引數才會有意義 。
  2. 優化確認一方面可以減輕 client 負擔(不需要頻繁的確認訊息)、減少通訊開銷,另一方面由於延遲了確認(預設 ack 了 0.65*prefetchSize 個訊息才確認),broker 再次傳送訊息時又可以批量傳送 。
  3. 如果只是開啟了 prefetchSize,每條訊息都去確認的話,broker 在收到確認後,也只是傳送一條訊息,並不是批量釋出,當然也可以通過設定DUPS_OK_ACK來手動延遲確認, 我們需要在 brokerUrl 指定 optimizeACK 選項 ConnectionFactory connectionFactory= new ActiveMQConnectionFactory ("tcp://192.168.11.153:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=10000"); 
  4. 注意,如果 optimizeAcknowledge 為 true,那麼 prefetchSize 必須大於 0. 當 prefetchSize=0 的時候,表示 consumer 通過 PULL 方式從 broker 獲取消 息 。

總結 

  1. 到目前為止,我們知道了 optimizeAcknowledge 和 prefetchSize 的作用,兩者協同工作,通過批量獲取訊息、並延遲批量確認,來達到一個高效的訊息消 費模型。它比僅減少了客戶端在獲取訊息時的阻塞次數,還能減少每次獲取消 息時的網路通訊開銷 。
  2. 需要注意的是,如果消費端的消費速度比較高,通過這兩者組合是能大大提升 consumer 的效能。如果 consumer 的消費效能本身就比較慢,設定比較大的 prefetchSize 反而不能有效的達到提升消費效能的目的。因為過大的prefetchSize 不利於 consumer 端訊息的負載均衡。因為通常情況下,我們都會部署多個 consumer 節點來提升消費端的消費效能。 
  3. 這個優化方案還會存在另外一個潛在風險,當訊息被消費之後還沒有來得及確 認時,client 端發生故障,那麼這些訊息就有可能會被重新發送給其他consumer,那麼這種風險就需要 client 端能夠容忍“重複”訊息。 

訊息的確認過程

ACK_MODE

通過前面的原始碼分析,基本上已經知道了訊息的消費過程,以及訊息的批量獲 取和批量確認,那麼接下來再瞭解下訊息的確認過程。

訊息確認有四種ACK_MODE,分別是:

  1. AUTO_ACKNOWLEDGE = 1 自動確認 
  2. CLIENT_ACKNOWLEDGE = 2 客戶端手動確認  
  3. DUPS_OK_ACKNOWLEDGE = 3 自動批量確認 
  4. SESSION_TRANSACTED = 0 事務提交併確認 

雖然 Client 端指定了 ACK 模式,但是在 Client 與 broker 在交換 ACK 指令的時候,還需要告知 ACK_TYPE,ACK_TYPE 表示此確認指令的型別,不同的
ACK_TYPE 將傳遞著訊息的狀態,broker 可以根據不同的 ACK_TYPE 對訊息進 行不同的操作。 

ACK_TYPE 

  1. DELIVERED_ACK_TYPE = 0 訊息"已接收",但尚未處理結束 
  2. STANDARD_ACK_TYPE = 2 "標準"型別,通常表示為訊息"處理成功",broker 端 可以刪除訊息了 
  3. POSION_ACK_TYPE = 1 訊息"錯誤",通常表示"拋棄"此訊息,比如訊息重發多次後,都無法正確處理時,訊息將會被刪除或者 DLQ(死信佇列) 
  4. REDELIVERED_ACK_TYPE = 3 訊息需"重發",比如 consumer 處理訊息時丟擲了異常,broker 稍後會重新發送此訊息 。
  5. INDIVIDUAL_ACK_TYPE = 4 表示只確認"單條訊息",無論在任何 ACK_MODE 下  UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一條訊息在轉發給“訂閱者”時,發現此訊息不符合 Selector 過濾條件,那麼此訊息將 不會轉發給訂閱者,訊息將會被儲存引擎刪除(相當於在 Broker 上確認了訊息)。 
    Client 端在不同的 ACK 模式時,將意味著在不同的時機發送 ACK 指令,每個 ACK Command 中會包含 ACK_TYPE,那麼 broker 端就可以根據 ACK_TYPE 來決定 此訊息的後續操作 。

訊息的重發機制原理 
訊息重發的情況 

在正常情況下,有幾中情況會導致訊息重新發送 :

  1. 在事務性會話中,沒有呼叫 session.commit 確認訊息或者呼叫session.rollback 方法回滾訊息 
  2. 在非事務性會話中,ACK 模式為 CLIENT_ACKNOWLEDGE 的情況下,沒有呼叫 acknowledge 或者呼叫了 recover 方法;  一個訊息被 redelivedred 超過預設的最大重發次數(預設 6 次)時,消費端會給 broker 傳送一個”poison ack”(ActiveMQMessageConsumer#dispatch),表示這個訊息有毒,告訴 broker 不要再發了。這個時候 broker 會把這個訊息放到 DLQ(死信佇列)。 

死信佇列 
ActiveMQ 中預設的死信佇列是 ActiveMQ.DLQ,如果沒有特別的配置,有毒的訊息都會被髮送到這個佇列。預設情況下,如果持久訊息過期以後,也會被 送到 DLQ 中。 

死信佇列配置策略 
預設所有佇列的死信訊息都被髮送到同一個預設死信佇列,不便於管理,可以 通過 individualDeadLetterStrategy 或 sharedDeadLetterStrategy 策略來進 行修改 。

<destinationPolicy> 
 
           <policyMap> 
 
             <policyEntries> 
 
               <policyEntry topic=">" > 
 
                 <pendingMessageLimitStrategy> 
 
                   <constantPendingMessageLimitStrategy limit="1000"/> 
 
                 </pendingMessageLimitStrategy> 
 
               </policyEntry>  
 
                // “>”表示對所有佇列生效,如果需要設定指定佇列,則直接寫隊 列名稱 
 
<policyEntry queue=">"> 
 
                 <deadLetterStrategy> 
 
                 //queuePrefix:設定死信佇列字首 
 
                //useQueueForQueueMessage 設定佇列儲存到死信。 
 
                   <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> 
 
               </deadLetterStrategy> 
 
               </policyEntry> 
 
             </policyEntries> 
 
           </policyMap> 
 
       </destinationPolicy> 

自動丟棄過期訊息 

<deadLetterStrategy> 
 
   <sharedDeadLetterStrategy processExpired="false" /> 
 
</deadLetterStrategy> 

死信佇列的再次消費 

當定位到訊息不能消費的原因後,就可以在解決掉這個問題之後,再次消費死 信佇列中的訊息。因為死信佇列仍然是一個佇列 。

ActiveMQ 靜態網路配置 
配置說明 
修改 activeMQ 伺服器的 activeMQ.xml, 增加如下配置 

<networkConnectors>       
 
    <networkConnector uri="static://(tcp://192.168.11.153:61616,tcp://192.168.11.154:61616 )"/>     
 
</networkConnectors> 

兩個 Brokers 通過一個 static 的協議來進行網路連線。一個 Consumer 連線到BrokerB 的一個地址上,當 Producer 在 BrokerA 上以相同的地址傳送訊息
是,此時訊息會被轉移到 BrokerB 上,也就是說 BrokerA 會轉發訊息到BrokerB 上 。

訊息迴流 
從 5.6 版本開始,在 destinationPolicy 上新增了一個選項replayWhenNoConsumers 屬性,這個屬性可以用來解決當 broker1 上有需要轉發的訊息但是沒有消費者時,把訊息迴流到它原始的 broker。同時把enableAudit 設定為 false,為了防止訊息迴流後被當作重複訊息而不被分發 通過如下配置,在 activeMQ.xml 中。 分別在兩臺伺服器都配置。即可完成消 息迴流處理 。

<policyEntry queue=">" enableAudit="false"> 
 
 <networkBridgeFilterFactory> 
 
    <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> 
 
 </networkBridgeFilterFactory> 
 
</policyEntry> 

動態網路連線 

  1. ActiveMQ 使用 Multicast 協議將一個 Service 和其他的 Broker 的 Service 連線起來。Multicast 能夠自動的發現其他 broker,從而替代了使用 static 功能列表 brokers。
  2. 用 multicast 協議可以在網路中頻繁 multicast://ipadaddress:port?transportOptions 。
  3. 基於 zookeeper+levelDB 的 HA 集 群搭建 activeMQ5.9 以後推出的基於 zookeeper 的 master/slave 主從實現。雖然ActiveMQ 不建議使用 LevelDB 作為儲存,主要原因是,社群的主要精力都幾種在 kahadb 的維護上,包括 bug 修復等。所以並沒有對 LevelDB 做太多的關注,所以他在是不做為推薦商用。但實際上在很多公司,仍然採用了LevelDB+zookeeper 的高可用叢集方案。而實際推薦的方案,仍然是基於KahaDB 的檔案共享以及 Jdbc 的方式來實現。 

配置 
在三臺機器上安裝 activemq,通過三個例項組成叢集。 
修改配置 

  1. directory:表示 LevelDB 所在的主工作目錄 
  2. replicas:表示總的節點數。比如我們的及群眾有 3 個節點,且最多允許一個節點出現故障,那麼這個值可以設定為 2,也可以設定為 3. 因為計算公式為 
  3. (replicas/2)+1. 如果我們設定為 4, 就表示不允許 3 個節點的任何一個節點出 錯。 bind:噹噹前的節點為 master 時,它會根據繫結好的地址和埠來進行

主從 複製協議 

  1. zkAddress:zk 的地址 
  2. hostname:本機 IP 
  3. sync:在認為訊息被消費完成前,同步資訊所儲存的策略。 
  4. local_mem/local_disk 

ActiveMQ 的優缺點 

  1. ActiveMQ 採用訊息推送方式,所以最適合的場景是預設訊息都可在短時間內被消費。
  2. 資料量越大,查詢和消費訊息就越慢,訊息積壓程度與訊息速度成反 比。
  3. 吞吐量低。由於 ActiveMQ 需要建立索引,導致吞吐量下降。這是無法克服的缺點,只要使用完全符合 JMS 規範的訊息中介軟體,就要接受這個級別的TPS。 
  4. 無分片功能。這是一個功能缺失,JMS 並沒有規定訊息中介軟體的叢集、分片機制。而由於 ActiveMQ 是偉企業級開發設計的訊息中介軟體,初衷並不是為了處理海量訊息和高併發請求。如果一臺伺服器不能承受更多訊息,則需要橫向 拆分。ActiveMQ 官方不提供分片機制,需要自己實現。 

適用場景 

  1. 對 TPS 要求比較低的系統,可以使用 ActiveMQ 來實現,一方面比較簡單,能 夠快速上手開發,另一方面可控性也比較好,還有比較好的監控機制和介面 不適用的場景 。
  2. 訊息量巨大的場景。ActiveMQ 不支援訊息自動分片機制,如果訊息量巨大, 導致一臺伺服器不能處理全部訊息,就需要自己開發訊息