1. 程式人生 > >activemq傳送同步傳送和非同步傳送

activemq傳送同步傳送和非同步傳送

在預設大多數情況下,AcitveMQ 是以非同步模式傳送訊息。例外的情況:在沒有使用
事務的情況下,生產者以PERSISTENT 傳送模式傳送訊息。在這種情況下,send 方法都
是同步的,並且一直阻塞直到ActiveMQ 發回確認訊息:訊息已經儲存在永續性資料存
儲中。這種確認機制保證訊息不會丟失,但會造成生產者阻塞從而影響反應時間。
高效能的程式一般都能容忍在故障情況下丟失少量資料。如果編寫這樣的程式,可
以通過使用非同步傳送來提高吞吐量(甚至在使用PERSISTENT 傳送模式的情況下)。

    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int
deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: "
+ destination); } synchronized (sendMutex) { // tell the Broker we are about to start a new transaction doStartTransaction(); TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode); long expiration = 0L; if (!producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } message.setJMSExpiration(expiration); message.setJMSPriority(priority); message.setJMSRedelivered(false); // transform to our own message format here ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); msg.setDestination(destination); msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); // Set the message id. if (msg != message) { message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message msg.setBrokerPath(null); msg.setTransactionId(txid); if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } msg.setConnection(connection); msg.onSend(); msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } // 請看下面這一行就是判斷是同步傳送還是非同步傳送的。 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { this.connection.asyncSendPacket(msg); if (producerWindow != null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize(); producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout); }else { this.connection.syncSendPacket(msg, onComplete); } } } }

接下來看看同步傳送方法。

    public Object request(Object command) throws IOException {
        FutureResponse response = asyncRequest(command, null);  // 呼叫傳送方法
        return response.getResult(); // 從future方法阻塞等待返回
    }

看下asyncRequest方法。構建了一個FutureResponse物件。並且將其存到了
requestMap.put(new Integer(command.getCommandId()), future);這個map當中。
等broker有返回的時候,從requestMap裡面取出來這個future.

    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
        Command command = (Command) o;
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(true);
        FutureResponse future = new FutureResponse(responseCallback, this);
        IOException priorError = null;
        synchronized (requestMap) {
            priorError = this.error;
            if (priorError == null) {
                requestMap.put(new Integer(command.getCommandId()), future);
            }
        }

        if (priorError != null) {
            future.set(new ExceptionResponse(priorError));
            throw priorError;
        }

        next.oneway(command);
        return future;
    }

看下FutureResponse裡面的這個回撥方法。

    public void set(Response result) {
        if (responseSlot.offer(result)) {
            if (responseCallback != null) {
                responseCallback.onCompletion(this);
            }
        }
    }

其中responseSlot是個阻塞佇列 private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
。複習下阻塞佇列的知識。

使用BlockingQueue的關鍵技術點如下:
1.BlockingQueue定義的常用方法如下:
1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則報異常
2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.
4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null
5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止
2.BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類
1)ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的
3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序.
4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.
3.LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue.