1. 程式人生 > >rabbitmq AMQP協議

rabbitmq AMQP協議

1. 協議說明

rabbitmq遵循 Advanced Message Queue Protocal(AMQP)協議。

2. spring 客戶端

spring java客戶端使用 com.rabbitmq.client.impl.FrameHandler處理底層二進位制協議(binary protocal)的傳輸。
實際實現類為:

package com.rabbitmq.client.impl;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import
java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import com.rabbitmq.client.AMQP; /** * A socket-based frame handler. */ public class SocketFrameHandler implements FrameHandler { /** The underlying socket */
private final Socket _socket; /** Socket's inputstream - data from the broker - synchronized on */ private final DataInputStream _inputStream; /** Socket's outputstream - data to the broker - synchronized on */ private final DataOutputStream _outputStream; /** Time to linger before closing the socket forcefully. */
public static final int SOCKET_CLOSING_TIMEOUT = 1; /** * @param socket the socket to use */ public SocketFrameHandler(Socket socket) throws IOException { _socket = socket; _inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream())); _outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream())); }

可以看到SocketFrameHandler持有一個Socket連線,並打開了tcp的read和write IO輸入輸出流。

3. tcp連線心跳維持

為了維護tcp連線的狀態,rabbitmq沒有選擇使用tcp預設機制的keepalive,而是自己實現了一套心跳機器(在iso 應用層)

rabbitmq客戶端專門起了一個定時執行緒池(ScheduledThreadPoolExecutor)開發送心跳包:

//  The contents of this file are subject to the Mozilla Public License
//  Version 1.1 (the "License"); you may not use this file except in
//  compliance with the License. You may obtain a copy of the License
//  at http://www.mozilla.org/MPL/
//
//  Software distributed under the License is distributed on an "AS IS"
//  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
//  the License for the specific language governing rights and
//  limitations under the License.
//
//  The Original Code is RabbitMQ.
//
//  The Initial Developer of the Original Code is GoPivotal, Inc.
//  Copyright (c) 2007-2015 Pivotal Software, Inc.  All rights reserved.
//


package com.rabbitmq.client.impl;

import com.rabbitmq.client.AMQP;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledFuture;
import java.io.IOException;

import static java.util.concurrent.TimeUnit.SECONDS;

/**
 * Manages heartbeat sending for a {@link AMQConnection}.
 * <p/>
 * Heartbeats are sent in a dedicated thread that is separate
 * from the main loop thread used for the connection.
 */
final class HeartbeatSender {

    private final Object monitor = new Object();

    private final FrameHandler frameHandler;
    private final ThreadFactory threadFactory;

    private ScheduledExecutorService executor;

    private ScheduledFuture<?> future;

    private boolean shutdown = false;

    private volatile long lastActivityTime;

    HeartbeatSender(FrameHandler frameHandler, ThreadFactory threadFactory) {
        this.frameHandler = frameHandler;
        this.threadFactory = threadFactory;
    }

    public void signalActivity() {
        this.lastActivityTime = System.nanoTime();
    }

    /**
     * Sets the heartbeat in seconds.
     */
    public void setHeartbeat(int heartbeatSeconds) {
        synchronized(this.monitor) {
            if(this.shutdown) {
                return;
            }

            // cancel any existing heartbeat task
            if(this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }

            if (heartbeatSeconds > 0) {
                // wake every heartbeatSeconds / 2 to avoid the worst case
                // where the last activity comes just after the last heartbeat
                long interval = SECONDS.toNanos(heartbeatSeconds) / 2;
                ScheduledExecutorService executor = createExecutorIfNecessary();
                Runnable task = new HeartbeatRunnable(interval);
                this.future = executor.scheduleAtFixedRate(
                    task, interval, interval, TimeUnit.NANOSECONDS);
            }
        }
    }

    private ScheduledExecutorService createExecutorIfNecessary() {
        synchronized (this.monitor) {
            if (this.executor == null) {
                this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
            }
            return this.executor;
        }
    }

    /**
     * Shutdown the heartbeat process, if any.
     */
    public void shutdown() {
        ExecutorService executorToShutdown = null;
        synchronized (this.monitor) {
            if (this.future != null) {
                this.future.cancel(true);
                this.future = null;
            }

            if (this.executor != null) {
                // to be safe, we shouldn't call shutdown holding the
                // monitor.
                executorToShutdown = this.executor;

                this.shutdown = true;
                this.executor = null;
            }
        }
        if(executorToShutdown != null) {
            executorToShutdown.shutdown();
        }
    }

    private final class HeartbeatRunnable implements Runnable {

        private final long heartbeatNanos;

        private HeartbeatRunnable(long heartbeatNanos) {
            this.heartbeatNanos = heartbeatNanos;
        }

        public void run() {
            try {
                long now = System.nanoTime();

                if (now > (lastActivityTime + this.heartbeatNanos)) {
                    frameHandler.writeFrame(new Frame(AMQP.FRAME_HEARTBEAT, 0));
                    frameHandler.flush();
                }
            } catch (IOException e) {
                // ignore
            }
        }
    }
}