1. 程式人生 > 程式設計 >聊聊rocketmq的LatencyFaultTolerance

聊聊rocketmq的LatencyFaultTolerance

本文主要研究一下rocketmq的LatencyFaultTolerance

LatencyFaultTolerance

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java

public interface LatencyFaultTolerance<T> {
    void updateFaultItem(final T name,final long currentLatency,final long notAvailableDuration);

    boolean isAvailable(final T name);

    void remove(final T name);

    T pickOneAtLeast();
}
複製程式碼
  • LatencyFaultTolerance介面定義了updateFaultItem、isAvailable、remove、pickOneAtLeast方法

LatencyFaultToleranceImpl

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    private final ConcurrentHashMap<String,FaultItem> faultItemTable = new ConcurrentHashMap<String,FaultItem>(16);

    private final ThreadLocalIndex which
ItemWorst = new ThreadLocalIndex(); @Override public void updateFaultItem(final String name,final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name,faultItem); if
(old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } @Override public void remove(final String name) { this.faultItemTable.remove(name); } @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; } @Override public String toString() { return "LatencyFaultToleranceImpl{" + "faultItemTable=" + faultItemTable + ",whichItemWorst=" + whichItemWorst + '}'; } //...... } 複製程式碼
  • LatencyFaultToleranceImpl實現了LatencyFaultTolerance介面;它維護了一個faultItemTable,其key為name,value為FaultItem;其updateFaultItem方法會將對應name的currentLatency及notAvailableDuration更新到對應的FaultItem中,沒有則建立
  • isAvailable方法則先從faultItemTable獲取faultItem,不為null則返回faultItem.isAvailable(),為null則返回true;remove方法則執行faultItemTable.remove(name)
  • pickOneAtLeast方法首先拷貝一份faultItemTable的FaultItem的列表,若該列表為空則返回null;不為空則對tmpList進行shuffle以及sort,然後取half值(tmpList.size() / 2),若half小於等於0則返回tmpList.get(0).getName(),否則取tmpList.get(i).getName(),其中i由whichItemWorst.getAndIncrement() % half計算而來

FaultItem

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java

    class FaultItem implements Comparable<FaultItem> {
        private final String name;
        private volatile long currentLatency;
        private volatile long startTimestamp;

        public FaultItem(final String name) {
            this.name = name;
        }

        @Override
        public int compareTo(final FaultItem other) {
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable())
                    return -1;

                if (other.isAvailable())
                    return 1;
            }

            if (this.currentLatency < other.currentLatency)
                return -1;
            else if (this.currentLatency > other.currentLatency) {
                return 1;
            }

            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }

        public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }

        @Override
        public int hashCode() {
            int result = getName() != null ? getName().hashCode() : 0;
            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
            return result;
        }

        @Override
        public boolean equals(final Object o) {
            if (this == o)
                return true;
            if (!(o instanceof FaultItem))
                return false;

            final FaultItem faultItem = (FaultItem) o;

            if (getCurrentLatency() != faultItem.getCurrentLatency())
                return false;
            if (getStartTimestamp() != faultItem.getStartTimestamp())
                return false;
            return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;

        }

        @Override
        public String toString() {
            return "FaultItem{" +
                "name='" + name + '\'' +
                ",currentLatency=" + currentLatency +
                ",startTimestamp=" + startTimestamp +
                '}';
        }

        public String getName() {
            return name;
        }

        public long getCurrentLatency() {
            return currentLatency;
        }

        public void setCurrentLatency(final long currentLatency) {
            this.currentLatency = currentLatency;
        }

        public long getStartTimestamp() {
            return startTimestamp;
        }

        public void setStartTimestamp(final long startTimestamp) {
            this.startTimestamp = startTimestamp;
        }

    }
複製程式碼
  • FaultItem首先了Comparable<FaultItem>介面,它定義了name、currentLatency、startTimestamp屬性,其isAvailable方法的計算公式為(System.currentTimeMillis() - startTimestamp) >= 0;其compareTo方法依次根據isAvailable()、currentLatency、startTimestamp來排序

ThreadLocalIndex

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/common/ThreadLocalIndex.java

public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }

        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }

    @Override
    public String toString() {
        return "ThreadLocalIndex{" +
            "threadLocalIndex=" + threadLocalIndex.get() +
            '}';
    }
}
複製程式碼
  • ThreadLocalIndex定義了threadLocalIndex及random屬性,其getAndIncrement方法先從threadLocalIndex.get()獲取index,若為null則使用Math.abs(random.nextInt())初始化,若結果小於0則重置為0;之後使用Math.abs(index + 1)來threadLocalIndex.set(index),若index小於0也重置為0

小結

  • LatencyFaultTolerance介面定義了updateFaultItem、isAvailable、remove、pickOneAtLeast方法;LatencyFaultToleranceImpl實現了LatencyFaultTolerance介面;它維護了一個faultItemTable,其key為name,value為FaultItem;其updateFaultItem方法會將對應name的currentLatency及notAvailableDuration更新到對應的FaultItem中,沒有則建立
  • isAvailable方法則先從faultItemTable獲取faultItem,不為null則返回faultItem.isAvailable(),為null則返回true;remove方法則執行faultItemTable.remove(name)
  • pickOneAtLeast方法首先拷貝一份faultItemTable的FaultItem的列表,若該列表為空則返回null;不為空則對tmpList進行shuffle以及sort,然後取half值(tmpList.size() / 2),若half小於等於0則返回tmpList.get(0).getName(),否則取tmpList.get(i).getName(),其中i由whichItemWorst.getAndIncrement() % half計算而來

doc