activemq傳送同步傳送和非同步傳送
在預設大多數情況下,AcitveMQ 是以非同步模式傳送訊息。例外的情況:在沒有使用
事務的情況下,生產者以PERSISTENT 傳送模式傳送訊息。在這種情況下,send 方法都
是同步的,並且一直阻塞直到ActiveMQ 發回確認訊息:訊息已經儲存在永續性資料存
儲中。這種確認機制保證訊息不會丟失,但會造成生產者阻塞從而影響反應時間。
高效能的程式一般都能容忍在故障情況下丟失少量資料。如果編寫這樣的程式,可
以通過使用非同步傳送來提高吞吐量(甚至在使用PERSISTENT 傳送模式的情況下)。
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
doStartTransaction();
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);
message.setJMSPriority(priority);
message.setJMSRedelivered(false);
// transform to our own message format here
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
msg.setDestination(destination);
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
msg.setBrokerPath(null);
msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
// 請看下面這一行就是判斷是同步傳送還是非同步傳送的。
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
接下來看看同步傳送方法。
public Object request(Object command) throws IOException {
FutureResponse response = asyncRequest(command, null); // 呼叫傳送方法
return response.getResult(); // 從future方法阻塞等待返回
}
看下asyncRequest方法。構建了一個FutureResponse物件。並且將其存到了
requestMap.put(new Integer(command.getCommandId()), future);這個map當中。
等broker有返回的時候,從requestMap裡面取出來這個future.
public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
Command command = (Command) o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback, this);
IOException priorError = null;
synchronized (requestMap) {
priorError = this.error;
if (priorError == null) {
requestMap.put(new Integer(command.getCommandId()), future);
}
}
if (priorError != null) {
future.set(new ExceptionResponse(priorError));
throw priorError;
}
next.oneway(command);
return future;
}
看下FutureResponse裡面的這個回撥方法。
public void set(Response result) {
if (responseSlot.offer(result)) {
if (responseCallback != null) {
responseCallback.onCompletion(this);
}
}
}
其中responseSlot是個阻塞佇列 private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
。複習下阻塞佇列的知識。
使用BlockingQueue的關鍵技術點如下:
1.BlockingQueue定義的常用方法如下:
1)add(anObject):把anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則報異常
2)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue裡,即如果BlockingQueue可以容納,則返回true,否則返回false.
3)put(anObject):把anObject加到BlockingQueue裡,如果BlockQueue沒有空間,則呼叫此方法的執行緒被阻斷直到BlockingQueue裡面有空間再繼續.
4)poll(time):取走BlockingQueue裡排在首位的物件,若不能立即取出,則可以等time引數規定的時間,取不到時返回null
5)take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的物件被加入為止
2.BlockingQueue有四個具體的實現類,根據不同需求,選擇不同的實現類
1)ArrayBlockingQueue:規定大小的BlockingQueue,其建構函式必須帶一個int引數來指明其大小.其所含的物件是以FIFO(先入先出)順序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其建構函式帶一個規定大小的引數,生成的BlockingQueue有大小限制,若不帶大小引數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定.其所含的物件是以FIFO(先入先出)順序排序的
3)PriorityBlockingQueue:類似於LinkedBlockQueue,但其所含物件的排序不是FIFO,而是依據物件的自然排序順序或者是建構函式的Comparator決定的順序.
4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的.
3.LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的資料結構不一樣,導致LinkedBlockingQueue的資料吞吐量要大於ArrayBlockingQueue,但線上程數量很大時其效能的可預見性低於ArrayBlockingQueue.