聊聊rocketmq的AccessChannel
阿新 • • 發佈:2019-12-31
序
本文主要研究一下rocketmq的AccessChannel
AccessChannel
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java
public enum AccessChannel {
/**
* Means connect to private IDC cluster.
*/
LOCAL,/**
* Means connect to Cloud service.
*/
CLOUD,}
複製程式碼
- AccessChannel定義了兩個列舉值,分別是LOCAL及CLOUD
TraceDispatcher
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java
public interface TraceDispatcher {
/**
* Initialize asynchronous transfer data module
*/
void start(String nameSrvAddr,AccessChannel accessChannel) throws MQClientException;
/**
* Append the transfering data
* @param ctx data infomation
* @return
*/
boolean append(Object ctx);
/**
* Write flush action
*
* @throws IOException
*/
void flush() throws IOException;
/**
* Close the trace Hook
*/
void shutdown();
}
複製程式碼
- TraceDispatcher的start方法會接收AccessChannel型別的引數
AsyncTraceDispatcher
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
public class AsyncTraceDispatcher implements TraceDispatcher {
private final static InternalLogger log = ClientLogger.getLog();
private final int queueSize;
private final int batchSize;
private final int maxMsgSize;
private final DefaultMQProducer traceProducer;
private final ThreadPoolExecutor traceExecutor;
// The last discard number of log
private AtomicLong discardCount;
private Thread worker;
private ArrayBlockingQueue<TraceContext> traceContextQueue;
private ArrayBlockingQueue<Runnable> appenderQueue;
private volatile Thread shutDownHook;
private volatile boolean stopped = false;
private DefaultMQProducerImpl hostProducer;
private DefaultMQPushConsumerImpl hostConsumer;
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
//......
public void start(String nameSrvAddr,AccessChannel accessChannel) throws MQClientException {
if (isStarted.compareAndSet(false,true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}
this.accessChannel = accessChannel;
this.worker = new Thread(new AsyncRunnable(),"MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();
this.registerShutDownHook();
}
//......
class AsyncAppenderRequest implements Runnable {
List<TraceContext> contextList;
public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
}
}
private void sendTraceDataByMQ(Set<String> keySet,final String data,String dataTopic,String regionId) {
String traceTopic = traceTopicName;
if (AccessChannel.CLOUD == accessChannel) {
traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
}
final Message message = new Message(traceTopic,data.getBytes());
// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
//......
}
//......
}
//......
}
複製程式碼
- AsyncTraceDispatcher內部類AsyncAppenderRequest的sendTraceDataByMQ方法,針對accessChannel為AccessChannel.CLOUD型別的,會給TraceConstants.TRACE_TOPIC_PREFIX加上regionId作為traceTopic
小結
AccessChannel定義了兩個列舉值,分別是LOCAL及CLOUD;TraceDispatcher的start方法會接收AccessChannel型別的引數;AsyncTraceDispatcher內部類AsyncAppenderRequest的sendTraceDataByMQ方法,針對accessChannel為AccessChannel.CLOUD型別的,會給TraceConstants.TRACE_TOPIC_PREFIX加上regionId作為traceTopic