RabbitMQ客戶端原始碼分析(七)之Channel與ChannelManager
阿新 • • 發佈:2018-12-16
RabbitMQ-java-client版本
com.rabbitmq:amqp-client:4.3.0
RabbitMQ
版本宣告: 3.6.15
Channel
-
uml圖
-
transmit(AMQCommand c)
:傳輸方法,委託AMQCommand
進行傳輸public void transmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { ensureIsOpen(); quiescingTransmit(c); }
ChannelManager
-
ChannelManager:負責Channel的管理,建立Channel、新增新的Channel、獲取Channel、Channel關閉等
-
構造方法
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory, MetricsCollector metricsCollector) { if (channelMax == 0) { // The framing encoding only allows for unsigned 16-bit integers // for the channel number channelMax = (1 << 16) - 1; } _channelMax = channelMax; channelNumberAllocator = new IntAllocator(1, channelMax); this.workService = workService; this.threadFactory = threadFactory; this.metricsCollector = metricsCollector; }
-
建立Channel,通過
IntAllocator
分配一個Channel編號(channelNumber),使用Map維護ChannelNumber與Channel的對映關係。//維護ChannelNumber與Channel的對映 private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>(); public ChannelN createChannel(AMQConnection connection) throws IOException { ChannelN ch; synchronized (this.monitor) { int channelNumber = channelNumberAllocator.allocate(); if (channelNumber == -1) { return null; } else { ch = addNewChannel(connection, channelNumber); } } ch.open(); // now that it's been safely added return ch; } private ChannelN addNewChannel(AMQConnection connection, int channelNumber) { if (_channelMap.containsKey(channelNumber)) { // That number's already allocated! Can't do it // This should never happen unless something has gone // badly wrong with our implementation. throw new IllegalStateException("We have attempted to " + "create a channel with a number that is already in " + "use. This should never happen. " + "Please report this as a bug."); } ChannelN ch = instantiateChannel(connection, channelNumber, this.workService); _channelMap.put(ch.getChannelNumber(), ch); return ch; } protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { return new ChannelN(connection, channelNumber, workService, this.metricsCollector); }
-
handleSignal
:關閉所有被管理的Channel,在關閉Connection時需要關閉所有Channel,就是呼叫此方法。從原始碼可以看出,使用非同步關閉,主要是為了避免JDK socket wirte
死鎖,BIO下socket write沒有寫超時。詳細參考http://rabbitmq.1065348.n5.nabble.com/Long-timeout-if-server-host-becomes-unreachable-td30275.htmlpublic void handleSignal(final ShutdownSignalException signal) { Set<ChannelN> channels; synchronized(this.monitor) { channels = new HashSet<ChannelN>(_channelMap.values()); } for (final ChannelN channel : channels) { releaseChannelNumber(channel); // async shutdown if possible // see https://github.com/rabbitmq/rabbitmq-java-client/issues/194 Runnable channelShutdownRunnable = new Runnable() { @Override public void run() { channel.processShutdownSignal(signal, true, true); } }; if(this.shutdownExecutor == null) { channelShutdownRunnable.run(); } else { Future<?> channelShutdownTask = this.shutdownExecutor.submit(channelShutdownRunnable); try { channelShutdownTask.get(channelShutdownTimeout, TimeUnit.MILLISECONDS); } catch (Exception e) { LOGGER.warn("Couldn't properly close channel {} on shutdown after waiting for {} ms", channel.getChannelNumber(), channelShutdownTimeout); channelShutdownTask.cancel(true); } } shutdownSet.add(channel.getShutdownLatch()); channel.notifyListeners(); } scheduleShutdownProcessing(); }