1. 程式人生 > >netty-channel的attachment原始碼解讀

netty-channel的attachment原始碼解讀

netty的每一個channel都會有一個綁定當前channel的attachment(附件),有點類似ThreadLocal,但在netty裡你不能直接用ThreadLocal,因為一個EventLoop可能與多個channel繫結,一個執行緒可能處理幾個channel,這時ThreadLocal無法做到與某一channel繫結。

直接上程式碼吧,attachment用起來很簡單,呼叫channel的attr方法拿到Attribute,然後get/set即可

 private void setLoginFlag(Channel channel, String deviceId) {
        channel.attr(ChannelConfig._LOGIN).set(true);
        channel.attr(ChannelConfig._DEVICE_ID).set(deviceId);
        log.info("device login success ,seviceId:{}", deviceId);
    }

然後看看attr這個方法,這裡聲明瞭一個volatile的原子陣列,例項化在attr方法裡懶載入。一個channel可能會被多個執行緒引用,attr可能有併發問題,需要做同步,但可以看到這裡並沒有加鎖,而是先通過原子更新工具CAS通過反射載入到欄位裡,看能否更新成功,如果不成功,則說明其他執行緒把這個attributes陣列例項化過了。這個陣列是用來儲存attachment的容器,index方法用於生成隨機的陣列下標,具體的演算法是key.id & 陣列長度-1,key.id儲存在AttributeKey裡,由一個AtomicInteger遞增獲得。

然後通過這個陣列下標看能不能拿到Atturibute,如果拿不到,那先宣告一個連結串列頭,然後把你傳進去的key追加到連結串列尾部,再把連結串列頭head用CAS放到原子數組裡,這個CAS的過程如果失敗了,說明其他執行緒把它放進去了,這時就需要用加鎖的方法,把head的引用指向attributes.get(i),加鎖迴圈連結串列直到連結串列尾部,然後把追加到尾部,這個迴圈過程也會先判斷一下這個key是否已經存在,如果存在了那就直接取出來。

 @SuppressWarnings("rawtypes")
    private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, AtomicReferenceArray> updater =
            AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, AtomicReferenceArray.class, "attributes");

    private static final int BUCKET_SIZE = 4;
    private static final int MASK = BUCKET_SIZE  - 1;

    // Initialize lazily to reduce memory consumption; updated by AtomicReferenceFieldUpdater above.
    @SuppressWarnings("UnusedDeclaration")
    private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes;

@SuppressWarnings("unchecked")
    @Override
    public <T> Attribute<T> attr(AttributeKey<T> key) {
        if (key == null) {
            throw new NullPointerException("key");
        }
        AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes;
        if (attributes == null) {
            // Not using ConcurrentHashMap due to high memory consumption.
            attributes = new AtomicReferenceArray<DefaultAttribute<?>>(BUCKET_SIZE);

            if (!updater.compareAndSet(this, null, attributes)) {
                attributes = this.attributes;
            }
        }

        int i = index(key);
        DefaultAttribute<?> head = attributes.get(i);
        if (head == null) {
            // No head exists yet which means we may be able to add the attribute without synchronization and just
            // use compare and set. At worst we need to fallback to synchronization and waste two allocations.
            head = new DefaultAttribute();
            DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key);
            head.next = attr;
            attr.prev = head;
            if (attributes.compareAndSet(i, null, head)) {
                // we were able to add it so return the attr right away
                return attr;
            } else {
                head = attributes.get(i);
            }
        }

        synchronized (head) {
            DefaultAttribute<?> curr = head;
            for (;;) {
                DefaultAttribute<?> next = curr.next;
                if (next == null) {
                    DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key);
                    curr.next = attr;
                    attr.prev = curr;
                    return attr;
                }

                if (next.key == key && !next.removed) {
                    return (Attribute<T>) next;
                }
                curr = next;
            }
        }
    }

    private static int index(AttributeKey<?> key) {
        return key.id() & MASK;
    }