02 storm 原始碼閱讀 storm的程序間訊息通訊實現netty client實現
阿新 • • 發佈:2019-02-01
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package backtype.storm.messaging.netty; import backtype.storm.Config; import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.StormBoundedExponentialBackoffRetry; import backtype.storm.utils.Utils; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.HashMap; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import com.google.common.util.concurrent.*; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.Config; import backtype.storm.messaging.ConnectionWithStatus; import backtype.storm.metric.api.IStatefulObject; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.StormBoundedExponentialBackoffRetry; import backtype.storm.utils.Utils; /** * A Netty client for sending task messages to a remote destination (Netty * server). * * Implementation details: * * - Sending messages, i.e. writing to the channel, is performed asynchronously. * - Messages are sent in batches to optimize for network throughput at the * expense of network latency. The message batch size is configurable. - * Connecting and reconnecting are performed asynchronously. - Note: The current * implementation drops any messages that are being enqueued for sending if the * connection to the remote destination is currently unavailable. - A background * flusher thread is run in the background. It will, at fixed intervals, check * for any pending messages (i.e. messages buffered in memory) and flush them to * the remote destination iff background flushing is currently enabled. */ public class Client extends ConnectionWithStatus implements IStatefulObject { private static final Logger LOG = LoggerFactory.getLogger(Client.class); private static final String PREFIX = "Netty-Client-"; private static final long NO_DELAY_MS = 0L;// 0秒,隔天處理 private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;// 30秒延遲 private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;// 5分鐘結束 private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;// 一秒刷一次訊息到服務端 private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;// private final StormBoundedExponentialBackoffRetry retryPolicy; private final ClientBootstrap bootstrap;// netty,客戶端 private final InetSocketAddress dstAddress;// 服務端地址 protected final String dstAddressPrefixedName;// 服務端字首 /** * The channel used for all write operations from this client to the remote * destination. */ private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>( null); /** * Maximum number of reconnection attempts we will perform after a * disconnect before giving up. */ private final int maxReconnectionAttempts; /** * Total number of connection attempts. */ private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0); /** * Number of connection attempts since the last disconnect. */ private final AtomicInteger connectionAttempts = new AtomicInteger(0); /** * Number of messages successfully sent to the remote destination. */ private final AtomicInteger messagesSent = new AtomicInteger(0); /** * Number of messages that could not be sent to the remote destination. */ private final AtomicInteger messagesLost = new AtomicInteger(0); /** * Number of messages buffered in memory. */ private final AtomicLong pendingMessages = new AtomicLong(0); /** * This flag is set to true if and only if a client instance is being * closed. */ private volatile boolean closing = false; /** * When set to true, then the background flusher thread will flush any * pending messages on its next run. */ private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean( false); /** * The absolute time (in ms) when the next background flush should be * performed. * * Note: The flush operation will only be performed if * backgroundFlushingEnabled is true, too. */ private final AtomicLong nextBackgroundFlushTimeMs = new AtomicLong( DISTANT_FUTURE_TIME_MS); /** * The time interval (in ms) at which the background flusher thread will be * run to check for any pending messages to be flushed. */ private final int flushCheckIntervalMs; /** * How many messages should be batched together before sending them to the * remote destination. * * Messages are batched to optimize network throughput at the expense of * latency. */ private final int messageBatchSize; private MessageBatch messageBatch = null; private final ListeningScheduledExecutorService scheduler; protected final Map stormConf; @SuppressWarnings("rawtypes") Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) { closing = false; this.stormConf = stormConf; this.scheduler = MoreExecutors.listeningDecorator(scheduler); int bufferSize = Utils.getInt(stormConf .get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize); messageBatchSize = Utils.getInt( stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); flushCheckIntervalMs = Utils.getInt( stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); maxReconnectionAttempts = Utils.getInt(stormConf .get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)); int minWaitMs = Utils.getInt(stormConf .get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); int maxWaitMs = Utils.getInt(stormConf .get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts); // Initiate connection to remote destination bootstrap = createClientBootstrap(factory, bufferSize); dstAddress = new InetSocketAddress(host, port); dstAddressPrefixedName = prefixedName(dstAddress); connect(NO_DELAY_MS); // Launch background flushing thread pauseBackgroundFlushing(); long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts); scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs, TimeUnit.MILLISECONDS); } /** * 建立netty客戶端 * * @param factory * @param bufferSize * @return */ private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) { ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("sendBufferSize", bufferSize); bootstrap.setOption("keepAlive", true); bootstrap.setPipelineFactory(new StormClientPipelineF9actory(this)); return bootstrap; } private String prefixedName(InetSocketAddress dstAddress) { if (null != dstAddress) { return PREFIX + dstAddress.toString(); } return ""; } /** * 由google 的guava的同步包維護了一個執行緒池,在後臺定時的重新整理,pending中的msg。 * * @return */ private Runnable createBackgroundFlusher() { return new Runnable() { @Override public void run() { if (!closing && backgroundFlushingEnabled.get() && nowMillis() > nextBackgroundFlushTimeMs.get()) { LOG.debug( "flushing {} pending messages to {} in background", messageBatch.size(), dstAddressPrefixedName); flushPendingMessages(); } } }; } /** * 用來控制後臺的往服務端發射pending訊息的定時任務。暫停,當主動發射的時候,是暫停的。當isFull。批量到了的時候。會觸發 */ private void pauseBackgroundFlushing() { backgroundFlushingEnabled.set(false); } /** * 啟動發射訊息的定時任務。 */ private void resumeBackgroundFlushing() { backgroundFlushingEnabled.set(true); } /** * 定時任務執行的方法,針對pending佇列,發射訊息的方法。 */ private synchronized void flushPendingMessages() { Channel channel = channelRef.get(); if (containsMessages(messageBatch)) { if (connectionEstablished(channel)) { if (channel.isWritable()) { pauseBackgroundFlushing(); MessageBatch toBeFlushed = messageBatch; flushMessages(channel, toBeFlushed); messageBatch = null; } else if (closing) { // Ensure background flushing is enabled so that we // definitely have a chance to re-try the flush // operation in case the client is being gracefully closed // (where we have a brief time window where // the client will wait for pending messages to be sent). resumeBackgroundFlushing(); } } else { closeChannelAndReconnect(channel); } } } private long nowMillis() { return System.currentTimeMillis(); } /** * 和服務端,建立連線 We will retry connection with exponential back-off policy */ private synchronized void connect(long delayMs) { try { if (closing) { return; } if (connectionEstablished(channelRef.get())) { return; } connectionAttempts.getAndIncrement(); if (reconnectingAllowed()) { totalConnectionAttempts.getAndIncrement(); LOG.info( "connection attempt {} to {} scheduled to run in {} ms", connectionAttempts.get(), dstAddressPrefixedName, delayMs); ListenableFuture<Channel> channelFuture = scheduler.schedule( new Connector(dstAddress, connectionAttempts.get()), delayMs, TimeUnit.MILLISECONDS); Futures.addCallback(channelFuture, new FutureCallback<Channel>() { @Override public void onSuccess(Channel result) { if (connectionEstablished(result)) { setChannel(result); LOG.info("connection established to {}", dstAddressPrefixedName); connectionAttempts.set(0); } else { reconnectAgain(new RuntimeException( "Returned channel was actually not established")); } } @Override public void onFailure(Throwable t) { reconnectAgain(t); } private void reconnectAgain(Throwable t) { String baseMsg = String.format( "connection attempt %s to %s failed", connectionAttempts, dstAddressPrefixedName); String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString(); LOG.error(failureMsg); long nextDelayMs = retryPolicy.getSleepTimeMs( connectionAttempts.get(), 0); connect(nextDelayMs); } }); } else { close(); throw new RuntimeException("Giving up to connect to " + dstAddressPrefixedName + " after " + connectionAttempts + " failed attempts"); } } catch (Exception e) { throw new RuntimeException("Failed to connect to " + dstAddressPrefixedName, e); } } private void setChannel(Channel channel) { channelRef.set(channel); } private boolean reconnectingAllowed() { return !closing && connectionAttempts.get() <= (maxReconnectionAttempts + 1); } private boolean connectionEstablished(Channel channel) { // Because we are using TCP (which is a connection-oriented transport // unlike UDP), a connection is only fully // established iff the channel is connected. That is, a TCP-based // channel must be in the CONNECTED state before // anything can be read or written to the channel. // // See: // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html // - // http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions return channel != null && channel.isConnected(); } /** * 表示連線狀態的標示 Note: Storm will check via this method whether a worker can be * activated safely during the initial startup of a topology. The worker * will only be activated once all of the its connections are ready. */ @Override public Status status() { if (closing) { return Status.Closed; } else if (!connectionEstablished(channelRef.get())) { return Status.Connecting; } else { return Status.Ready; } } /** * Receiving messages is not supported by a client. * * @throws java.lang.UnsupportedOperationException * whenever this method is being called. */ @Override public Iterator<TaskMessage> recv(int flags, int clientId) { throw new UnsupportedOperationException( "Client connection should not receive any messages"); } /** * 發射單個訊息。都是批量傳送的,這個得看看在哪用了。 * * @param taskId * @param payload */ @Override public void send(int taskId, byte[] payload) { TaskMessage msg = new TaskMessage(taskId, payload); List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1); wrapper.add(msg); send(wrapper.iterator()); } /** * 批量發射訊息,追加一個isfull的批量。到的時候flush到服務端,屬於延遲發射,提高網路利用率 Enqueue task messages * to be sent to the remote destination (cf. `host` and `port`). */ @Override public synchronized void send(Iterator<TaskMessage> msgs) { if (closing) { int numMessages = iteratorSize(msgs); LOG.error( "discarding {} messages because the Netty client to {} is being closed", numMessages, dstAddressPrefixedName); return; } if (!hasMessages(msgs)) { return; } Channel channel = channelRef.get(); if (!connectionEstablished(channel)) { // Closing the channel and reconnecting should be done before // handling the messages. closeChannelAndReconnect(channel); handleMessagesWhenConnectionIsUnavailable(msgs); return; } // Collect messages into batches (to optimize network throughput), then // flush them. while (msgs.hasNext()) { TaskMessage message = msgs.next(); if (messageBatch == null) { messageBatch = new MessageBatch(messageBatchSize); } messageBatch.add(message); if (messageBatch.isFull()) { MessageBatch toBeFlushed = messageBatch; flushMessages(channel, toBeFlushed); messageBatch = null; } } // Handle any remaining messages in case the "last" batch was not full. if (containsMessages(messageBatch)) { if (connectionEstablished(channel) && channel.isWritable()) { // We can write to the channel, so we flush the remaining // messages immediately to minimize latency. pauseBackgroundFlushing(); MessageBatch toBeFlushed = messageBatch; messageBatch = null; flushMessages(channel, toBeFlushed); } else { // We cannot write to the channel, which means Netty's internal // write buffer is full. // In this case, we buffer the remaining messages and wait for // the next messages to arrive. // // Background: // Netty 3.x maintains an internal write buffer with a high // water mark for each channel (default: 64K). // This represents the amount of data waiting to be flushed to // operating system buffers. If the // outstanding data exceeds this value then the channel is set // to non-writable. When this happens, a // INTEREST_CHANGED channel event is triggered. Netty sets the // channel to writable again once the data // has been flushed to the system buffers. // // See http://stackoverflow.com/questions/14049260 resumeBackgroundFlushing(); nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs); } } } private boolean hasMessages(Iterator<TaskMessage> msgs) { return msgs != null && msgs.hasNext(); } /** * 當連線不可用是,丟掉訊息。訊息,通過send過來. We will drop pending messages and let * at-least-once message replay kick in. * * Another option would be to buffer the messages in memory. But this option * has the risk of causing OOM errors, especially for topologies that * disable message acking because we don't know whether the connection * recovery will succeed or not, and how long the recovery will take. */ private void handleMessagesWhenConnectionIsUnavailable( Iterator<TaskMessage> msgs) { LOG.error("connection to {} is unavailable", dstAddressPrefixedName); dropMessages(msgs); } /** * 追加丟掉訊息的計數器. * * @param msgs */ private void dropMessages(Iterator<TaskMessage> msgs) { // We consume the iterator by traversing and thus "emptying" it. int msgCount = iteratorSize(msgs); messagesLost.getAndAdd(msgCount); LOG.error("dropping {} message(s) destined for {}", msgCount, dstAddressPrefixedName); } private int iteratorSize(Iterator<TaskMessage> msgs) { int size = 0; if (msgs != null) { while (msgs.hasNext()) { size++; msgs.next(); } } return size; } /** * 定時任務執行的方法,同時也是當批量到達的時候執行的,發射訊息的方法。整個類基本就是為了做這件事 Asynchronously writes the * message batch to the channel. * * If the write operation fails, then we will close the channel and trigger * a reconnect. */ private synchronized void flushMessages(Channel channel, final MessageBatch batch) { if (!containsMessages(batch)) { return; } final int numMessages = batch.size(); pendingMessages.getAndAdd(numMessages); LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString()); ChannelFuture future = channel.write(batch); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { pendingMessages.getAndAdd(0 - numMessages); if (future.isSuccess()) { LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName); messagesSent.getAndAdd(batch.size()); } else { LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName, future.getCause()); closeChannelAndReconnect(future.getChannel()); messagesLost.getAndAdd(numMessages); } } }); } /** * 關netty連線.,並重連 * * @param channel */ private synchronized void closeChannelAndReconnect(Channel channel) { if (channel != null) { channel.close(); if (channelRef.compareAndSet(channel, null)) { connect(NO_DELAY_MS); } } } private boolean containsMessages(MessageBatch batch) { return batch != null && !batch.isEmpty(); } /** * Gracefully close this client. * * We will attempt to send any pending messages (i.e. messages currently * buffered in memory) before closing the client. */ @Override public void close() { if (!closing) { LOG.info("closing Netty Client {}", dstAddressPrefixedName); // Set closing to true to prevent any further reconnection attempts. closing = true; flushPendingMessages(); waitForPendingMessagesToBeSent(); closeChannel(); } } private synchronized void waitForPendingMessagesToBeSent() { LOG.info("waiting up to {} ms to send {} pending messages to {}", PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName); long totalPendingMsgs = pendingMessages.get(); long startMs = nowMillis(); while (pendingMessages.get() != 0) { try { long deltaMs = nowMillis() - startMs; if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) { LOG.error( "failed to send all pending messages to {} within timeout, {} of {} messages were not " + "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs); break; } Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS); } catch (InterruptedException e) { break; } } } private synchronized void closeChannel() { if (channelRef.get() != null) { channelRef.get().close(); LOG.debug("channel to {} closed", dstAddressPrefixedName); } } /** * 為metric統計框架,蒐集資訊 * * @return */ @Override public Object getState() { LOG.info("Getting metrics for client connection to {}", dstAddressPrefixedName); HashMap<String, Object> ret = new HashMap<String, Object>(); ret.put("reconnects", totalConnectionAttempts.getAndSet(0)); ret.put("sent", messagesSent.getAndSet(0)); ret.put("pending", pendingMessages.get()); ret.put("lostOnSend", messagesLost.getAndSet(0)); ret.put("dest", dstAddress.toString()); String src = srcAddressName(); if (src != null) { ret.put("src", src); } return ret; } private String srcAddressName() { String name = null; Channel c = channelRef.get(); if (c != null) { SocketAddress address = c.getLocalAddress(); if (address != null) { name = address.toString(); } } return name; } @Override public String toString() { return String.format("Netty client for connecting to %s", dstAddressPrefixedName); } /** * netty框架的回撥 * Asynchronously establishes a Netty connection to the remote address, * returning a Netty Channel on success. */ private class Connector implements Callable<Channel> { private final InetSocketAddress address; private final int connectionAttempt; public Connector(InetSocketAddress address, int connectionAttempt) { this.address = address; if (connectionAttempt < 1) { throw new IllegalArgumentException( "connection attempt must be >= 1 (you provided " + connectionAttempt + ")"); } this.connectionAttempt = connectionAttempt; } @Override public Channel call() throws Exception { LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt); Channel channel = null; ChannelFuture future = bootstrap.connect(address); future.awaitUninterruptibly(); Channel current = future.getChannel(); if (future.isSuccess() && connectionEstablished(current)) { channel = current; LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), channel.toString(), connectionAttempt); } else { LOG.debug("failed to connect to {} [attempt {}]", address.toString(), connectionAttempt); if (current != null) { current.close(); } } return channel; } } }