netty-channel的attachment原始碼解讀
阿新 • • 發佈:2018-12-18
netty的每一個channel都會有一個綁定當前channel的attachment(附件),有點類似ThreadLocal,但在netty裡你不能直接用ThreadLocal,因為一個EventLoop可能與多個channel繫結,一個執行緒可能處理幾個channel,這時ThreadLocal無法做到與某一channel繫結。
直接上程式碼吧,attachment用起來很簡單,呼叫channel的attr方法拿到Attribute,然後get/set即可
private void setLoginFlag(Channel channel, String deviceId) { channel.attr(ChannelConfig._LOGIN).set(true); channel.attr(ChannelConfig._DEVICE_ID).set(deviceId); log.info("device login success ,seviceId:{}", deviceId); }
然後看看attr這個方法,這裡聲明瞭一個volatile的原子陣列,例項化在attr方法裡懶載入。一個channel可能會被多個執行緒引用,attr可能有併發問題,需要做同步,但可以看到這裡並沒有加鎖,而是先通過原子更新工具CAS通過反射載入到欄位裡,看能否更新成功,如果不成功,則說明其他執行緒把這個attributes陣列例項化過了。這個陣列是用來儲存attachment的容器,index方法用於生成隨機的陣列下標,具體的演算法是key.id & 陣列長度-1,key.id儲存在AttributeKey裡,由一個AtomicInteger遞增獲得。
然後通過這個陣列下標看能不能拿到Atturibute,如果拿不到,那先宣告一個連結串列頭,然後把你傳進去的key追加到連結串列尾部,再把連結串列頭head用CAS放到原子數組裡,這個CAS的過程如果失敗了,說明其他執行緒把它放進去了,這時就需要用加鎖的方法,把head的引用指向attributes.get(i),加鎖迴圈連結串列直到連結串列尾部,然後把追加到尾部,這個迴圈過程也會先判斷一下這個key是否已經存在,如果存在了那就直接取出來。
@SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, AtomicReferenceArray> updater = AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, AtomicReferenceArray.class, "attributes"); private static final int BUCKET_SIZE = 4; private static final int MASK = BUCKET_SIZE - 1; // Initialize lazily to reduce memory consumption; updated by AtomicReferenceFieldUpdater above. @SuppressWarnings("UnusedDeclaration") private volatile AtomicReferenceArray<DefaultAttribute<?>> attributes; @SuppressWarnings("unchecked") @Override public <T> Attribute<T> attr(AttributeKey<T> key) { if (key == null) { throw new NullPointerException("key"); } AtomicReferenceArray<DefaultAttribute<?>> attributes = this.attributes; if (attributes == null) { // Not using ConcurrentHashMap due to high memory consumption. attributes = new AtomicReferenceArray<DefaultAttribute<?>>(BUCKET_SIZE); if (!updater.compareAndSet(this, null, attributes)) { attributes = this.attributes; } } int i = index(key); DefaultAttribute<?> head = attributes.get(i); if (head == null) { // No head exists yet which means we may be able to add the attribute without synchronization and just // use compare and set. At worst we need to fallback to synchronization and waste two allocations. head = new DefaultAttribute(); DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key); head.next = attr; attr.prev = head; if (attributes.compareAndSet(i, null, head)) { // we were able to add it so return the attr right away return attr; } else { head = attributes.get(i); } } synchronized (head) { DefaultAttribute<?> curr = head; for (;;) { DefaultAttribute<?> next = curr.next; if (next == null) { DefaultAttribute<T> attr = new DefaultAttribute<T>(head, key); curr.next = attr; attr.prev = curr; return attr; } if (next.key == key && !next.removed) { return (Attribute<T>) next; } curr = next; } } } private static int index(AttributeKey<?> key) { return key.id() & MASK; }