1. 程式人生 > >Netty協議棧開發

Netty協議棧開發

     由於現代軟體的複雜性,一個大型軟體系統往往會被人為地拆分稱為多個模組,另外隨著移動網際網路的興起,網站的規模越來越大,為了能夠支撐業務的發展,需要叢集和分散式部署。模組之間的通訊就需要進行跨節點通訊。

在傳統的Java應用中,通常使用4中方式進行跨節點通訊:

  • 通過RMI進行遠端服務呼叫
  • 通過Java Socket + Java序列化
  • RPC框架 Thrift、Apache的Avro等
  • 利用標準的公有協議進行跨節點呼叫,例如HTTP+XML,Restful+JSON或WebService

    跨節點的遠端服務呼叫,除了鏈路層的物理連線外,還需要對請求和響應訊息進行編解碼。 在請求和應答之外,還需要控制和管理類指令,例如鏈路建立的握手資訊,鏈路檢測的心跳資訊。這些功能組合到一起後,就會形成私有協議 。事實上只要能夠用於跨程序,跨主機資料交換的非標準協議都可以稱為私有協議。

Netty協議棧功能設計

在分散式組網環境下,每個Netty節點(Netty程序)之間建立長連線,使用Netty協議進行通訊。Netty節點沒有服務端和客戶端的區分,誰先發起連線,誰就是客戶端,另一方則是服務端。一個Netty節點既能作為客戶端連線另外也能做服務端被其他節點連線。

協議棧功能描述

  1. 基於Netty的NIO通訊框架,提供高效能的非同步通訊能力;
  2. 提供訊息的編解碼框架,實現POJO的序列化和反序列化
  3. 提供基於IP地址的白名單接入認證機制;
  4. 鏈路的有效性校驗機制;
  5. 鏈路的斷線重連機制;

通訊模型

具體步驟:

  1. Netty協議棧客戶端傳送握手請求資訊,攜帶節點ID等有效身份認證資訊;
  2. Netty協議服務端對握手請求訊息進行合法性校驗,包括節點ID有效性校驗、節點重複登入校驗和IP地址合法性校驗,校驗通過後,返回登入成功的握手應答訊息;
  3. 鏈路建立成功之後,客戶端傳送業務訊息;
  4. 鏈路成功之後,服務端傳送心跳訊息;(採用Ping-Pong機制)
  5. 鏈路建立成功之後,客戶端傳送心跳訊息;
  6. 鏈路建立成功之後,服務端傳送業務訊息;
  7. 服務端退出時,服務端關閉連線,客戶端感知對方關閉連線後,被動關閉客戶端連線。

訊息定義

Netty協議棧訊息定義包含兩部分:

Netty訊息定義表(NettyMessage)

  1. 訊息頭
  2. 訊息體
名稱 型別 長度 描述
header Header 變長 訊息頭定義
body Object 變長 對於請求訊息,它是方法引數;對於響應訊息,它是返回值

 Netty協議訊息頭定義(Header)

名稱 型別 長度 描述
crcCode 整型int 32

Netty訊息的校驗碼,由三部分組成:

1.0xABEF:固定值,表明該訊息時Netty協議訊息,2個位元組

2.主版本號:1~255,1個位元組

3.次版本號:1~255,1個位元組

crcCode=0xABEF+主版本號+次版本號

sessionID 整型long

64

叢集節點被全域性唯一,由會話ID生成器生成
type Byte 8

0: 表示請求訊息

1: 業務響應訊息

2: 業務ONE WAY訊息(即是請求又是響應訊息)

3: 握手請求訊息

4: 握手應答訊息

5: 心跳請求訊息

6: 心跳應答訊息

priority Byte 8 訊息優先順序: 0-255
length 整型int 32 訊息長度,整個訊息,包括訊息頭和訊息體
attachment Map<String,Object> 變長 可選欄位,用於擴充套件訊息頭

Netty協議支援的訊息欄位型別

Netty協議的編解碼規範

Netty協議NettyMessage的編碼規範如下:

  1. crcCode: java.nio.ByteBuffer.putInt(int value),如果採用其它快取區實現,必須與其等價
  2. length: java.nio.ByteBuffer.putInt(int value),如果採用其它緩衝區實現,必須與其等價
  3. sessionID: java.nio.ByteBuffer.putLong(long value),如果採用其它緩衝區實現,必須與其等價
  4. type: java.nio.ByteBuffer.put(byte b),如果採用其它緩衝區實現,必須與其等價
  5. priority: java.nio.ByteBuffer.put(byte b),如果採用其它緩衝區實現,必須與其等價
  6. attachment: 如果長度為0,表示沒有可選附件,則將長度編碼為0,即java.nio.ByteBuffer.putInt(0),如果大於0,表示有附件需要編碼,具體規則如下:
  • 首先對附件的個數進行編碼,java.nio.ByteBuffer.putInt(attachment.size());
  •  然後對Key進行編碼,先編碼長度,再將它轉換成byte陣列之後編碼內容。具體程式碼如:
String key = null;
byte[] value = null;
for (Map.Entry<String, Object> param: attachment:entrySet()) {
    key = param.getKey();
    buffer.writeString(key);
    value = marshaller.writeObject(param.getValue());
    buffer.writeBinary(value);
}
key = null;
value = null;

7.body的編碼: 通過JBoss Marshalling將其序列化為byte陣列,然後呼叫java.nio.ByteBuffer.put(byte[] src);將其寫入ByteBuffer緩衝區中。

由於整個訊息的長度必須等全部欄位都編碼完成後才能確認,所以死最後需要更新訊息頭的length欄位,將其重寫入ByteBuffer中。

Netty協議的解碼:

 

  1. crcCode: java.nio.ByteBuffer.getInt()獲取校驗碼欄位,如果採用其它快取區實現,必須與其等價
  2. length: java.nio.ByteBuffer.getInt()獲取Netty訊息的長度,如果採用其它緩衝區實現,必須與其等價
  3. sessionID: java.nio.ByteBuffer.getLong()獲取會話ID,如果採用其它緩衝區實現,必須與其等價
  4. type: java.nio.ByteBuffer.get()獲取訊息型別,如果採用其它緩衝區實現,必須與其等價
  5. priority: java.nio.ByteBuffer.get()獲取訊息優先順序,如果採用其它緩衝區實現,必須與其等價
  6. attachment: 它的解碼規則為-首先建立一個新的attachment物件,呼叫java.nio.ByteBuffer.getInt()獲取附件的長度,如果為0,說明附件為空,解碼結束,解析解訊息體,否則,根據長度通過for迴圈進行解碼。
  7. body: 使用JBoss marshaller對其進行解碼
String key = null;
Object value = null;
for (int i =0; i < size;i++) {
    key = buffer.readString();
    value = unmarshaller.readObject(buffer.readBinary());
    this.attachment.put(key, value);
}
key = null;
value = null;

鏈路的建立

Netty協議棧支援服務端和客戶端,對於使用Netty協議棧的應用程式而言,不需要刻意區分是客戶端還是服務端。在分散式組網環境中,一個節點可能即是服務端也是客戶端。

客戶端與服務端鏈路建立成功後,由客戶端傳送握手請求訊息,請求訊息的定義如下:

  • 訊息頭的type欄位為3;
  • 可選附件個數為0;
  • 訊息體為空;
  • 握手訊息的長度為22個位元組;

服務端接受客戶端的握手請求訊息後,如果IP校驗通過,返回握手成功應答訊息給客戶端,應用層鏈路建立成功,握手應答訊息定義如下:

  • 訊息頭type為4
  • 可選附件個數為0
  • 訊息體為byte型別的結果,"0"表示認證成功,"-1"表示認證失敗。

鏈路的關閉

由於採用長連線通訊,正常的業務執行期間,雙方通過心跳和業務訊息維持鏈路,任何一方不需要主動關閉連線。

但是,在以下情況下,客戶端和服務端需要關閉連線。

  1. 當對方宕機或者重啟時,會主動釋放鏈路,另一方讀取到作業系統的通知訊號,得到對方REST鏈路,需要關閉連線,釋放自身的控制代碼等資源。由於採用TCP全雙工通訊,通訊雙方都需要關閉連線,釋放資源;
  2. 訊息在讀寫過程中,發生了I/O異常,需要主動關閉連線;
  3. 心跳訊息讀寫過程中發生了I/O異常,需要主動關閉連線;
  4. 心跳超時,需要主動關閉連線;
  5. 發生編碼異常等不可恢復的錯誤時,需要主動關閉連線;

可靠性設計

1.心跳機制

  1. 當網路處於空閒時間達到了T(連續週期T沒有讀寫訊息)時,客戶端主動傳送Ping心跳訊息給服務端;
  2. 如果在下一個週期T到來時客戶端沒有收到對方傳送的Pong心跳應答訊息或者讀取到服務端傳送的其他業務訊息,則心跳失敗計數器+1
  3. 每當客戶端接收到服務的業務訊息或者Pong應答訊息時,將心跳失敗計數器清0;連續N次沒有接收到服務端的Pong訊息或者業務訊息,則關閉鏈路間隔INTERVAL時間後發起重連操作;
  4. 服務端網路空閒狀態持續時間達到T後,伺服器端將心跳失敗計數器+1;只要接收到客戶端傳送的Ping訊息或者其他業務訊息,計數器清0
  5. 伺服器端連續N次沒有接收到客戶端的Ping訊息或者其他業務訊息,則關閉鏈路,釋放資源,等待客戶端重連。

2.重連機制

    如果鏈路中斷,等待INTERVAL時間後,由客戶端發起重連操作,如果重連失敗,間隔週期INTERVAL之後再繼續重連,直到重連成功。

    為了保證服務端能夠有充足的時間釋放控制代碼資源,在首次斷連時客戶端需要等待INTERVAL時間之後再發起重連而不是失敗後立即重連。

    為了控制代碼資源能夠及時釋放,無論什麼場景下的重連失敗,客戶端必須保證自身資源被成功及時釋放,包括企鵝不限於SocketChannel、Socket等。

    重連失敗,需要記錄異常堆疊資訊,方便問題定位。

3.重複登入保護

    客戶端握手成功之後,鏈路處於正常狀態下,不允許客戶端重複登入,以防止客戶端在異常狀態下反覆重連導致控制代碼資源被耗盡

    服務端在接收到握手訊息後,首先進行ip合法性校驗,如果成功,則在快取的地址表中檢視客戶端是否已經登入,如果已經登入,則拒絕重複登入,返回錯誤碼-1,同時關閉鏈路,並且在服務端日誌中列印錯誤資訊。

    客戶端接收到握手失敗的應答訊息之後,關閉客戶端的TCP連線,等待INTERVAL時間之後,再次發起TCP連線,知道認證成功。

    為了防止由服務端和客戶端對鏈路狀態理解不一致的問題,當服務端連續N次心跳超時之後需要主動關閉鏈路,同時清空該客戶端的快取資訊,保證後續的客戶端可以重連,防止被重複登入保護機制拒絕掉。

4.訊息快取重發

    無論是客戶端還是服務端,在發生鏈路中斷之後,恢復鏈路之前,快取在訊息佇列的待發送的訊息不能丟失。等鏈路恢復之後,重新發送這些訊息,保證鏈路中斷期間訊息不丟失。同時考慮到記憶體溢位風險,應該在訊息快取佇列中設定上限,當達到上限之後,應該拒絕繼續向該佇列新增新的訊息。

安全性設計

    為了保證整個叢集環境的安全,內部長連線採用基於IP地址的安全認證機制,服務端對握手請求的IP進行合法性校驗。如果將Neety放到公網使用,需要採用更嚴格的安全認證機制,如基於祕鑰和AES加密,也可以採用SSL/TSL安全傳輸。

可擴充套件性設計

    Netty協議棧需要具備一定的擴充套件能力,例如統一的訊息攔截、介面日誌、安全、加密解密等可以被方便地新增和刪除,推薦使用Servelt的FilterChain機制,考慮到效能因素,不推薦AOP。

Netty協議棧開發

需要的jar

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.30.Final</version>
        </dependency>        
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling</artifactId>
            <version>1.4.10.Final</version>
        </dependency>
        <dependency>
            <groupId>org.jboss.marshalling</groupId>
            <artifactId>jboss-marshalling-serial</artifactId>
            <version>1.4.10.Final</version>
        </dependency>

資料結構定義

首先對Netty協議棧使用到的資料結構進行定義,Netty訊息如下:


import java.util.HashMap;
import java.util.Map;

/**
 * 訊息頭
 */
public final class Header {

    private int crcCode = 0xabef0101;

    private int length;// 訊息長度

    private long sessionID;// 會話ID

    private byte type;// 訊息型別

    private byte priority;// 訊息優先順序

    private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件

    /**
     * @return the crcCode
     */
    public final int getCrcCode() {
        return crcCode;
    }

    /**
     * @param crcCode the crcCode to set
     */
    public final void setCrcCode(int crcCode) {
        this.crcCode = crcCode;
    }

    /**
     * @return the length
     */
    public final int getLength() {
        return length;
    }

    /**
     * @param length the length to set
     */
    public final void setLength(int length) {
        this.length = length;
    }

    /**
     * @return the sessionID
     */
    public final long getSessionID() {
        return sessionID;
    }

    /**
     * @param sessionID the sessionID to set
     */
    public final void setSessionID(long sessionID) {
        this.sessionID = sessionID;
    }

    /**
     * @return the type
     */
    public final byte getType() {
        return type;
    }

    /**
     * @param type the type to set
     */
    public final void setType(byte type) {
        this.type = type;
    }

    /**
     * @return the priority
     */
    public final byte getPriority() {
        return priority;
    }

    /**
     * @param priority the priority to set
     */
    public final void setPriority(byte priority) {
        this.priority = priority;
    }

    /**
     * @return the attachment
     */
    public final Map<String, Object> getAttachment() {
        return attachment;
    }

    /**
     * @param attachment the attachment to set
     */
    public final void setAttachment(Map<String, Object> attachment) {
        this.attachment = attachment;
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "資料頭:Header [crcCode=" + crcCode + ", length=" + length
                + ", sessionID=" + sessionID + ", type=" + type + ", priority="
                + priority + ", attachment=" + attachment + "]";
    }

}
import proprietary.protocol.dao.Header;

/**
 * Netty訊息
 */
public final class NettyMessage {

    private Header header;

    private Object body;

    /**
     * @return the header
     */
    public final Header getHeader() {
        return header;
    }

    /**
     * @param header the header to set
     */
    public final void setHeader(Header header) {
        this.header = header;
    }

    /**
     * @return the body
     */
    public final Object getBody() {
        return body;
    }

    /**
     * @param body the body to set
     */
    public final void setBody(Object body) {
        this.body = body;
    }

    /*
     * (non-Javadoc)
     *
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "NettyMessage [header=" + header + "]";
    }
}

訊息編解碼

分別定義NettyMessageDecoder和NettyMessageEncoder用於NettyMessage訊息的編解碼:

訊息編碼類:

/**
 * NettyMessage訊息編碼器
 */
public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {

    MarshallingEncoder marshallingEncoder;

    public NettyMessageEncoder() throws IOException {
        this.marshallingEncoder = new MarshallingEncoder();
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
        if (null == msg || null == msg.getHeader()) {
            throw new Exception("The encode message is null");
        }
        //---寫入crcCode---
        sendBuf.writeInt((msg.getHeader().getCrcCode()));
        //---寫入length---
        sendBuf.writeInt((msg.getHeader().getLength()));
        //---寫入sessionId---
        sendBuf.writeLong((msg.getHeader().getSessionID()));
        //---寫入type---
        sendBuf.writeByte((msg.getHeader().getType()));
        //---寫入priority---
        sendBuf.writeByte((msg.getHeader().getPriority()));
        //---寫入附件大小---
        sendBuf.writeInt((msg.getHeader().getAttachment().size()));

        String key = null;
        byte[] keyArray = null;
        Object value = null;
        for (Map.Entry<String, Object> param : msg.getHeader().getAttachment()
                .entrySet()) {
            key = param.getKey();
            keyArray = key.getBytes("UTF-8");
            sendBuf.writeInt(keyArray.length);
            sendBuf.writeBytes(keyArray);
            value = param.getValue();
            marshallingEncoder.encode(value, sendBuf);
        }
        // for gc
        key = null;
        keyArray = null;
        value = null;

        if (msg.getBody() != null) {
            marshallingEncoder.encode(msg.getBody(), sendBuf);
        } else
            sendBuf.writeInt(0);
        // 之前寫了crcCode 4bytes,除去crcCode和length 8bytes即為更新之後的位元組
        sendBuf.setInt(4, sendBuf.readableBytes() - 8);
    }
}

Netty訊息編碼工具類:MarshallingEncoder

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import org.jboss.marshalling.Marshaller;

import java.io.IOException;

/**
 * 訊息編碼工具類
 */
@Sharable
public class MarshallingEncoder {

    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
    Marshaller marshaller;

    public MarshallingEncoder() throws IOException {
        marshaller = MarshallingCodecFactory.buildMarshalling();
    }

    // 使用marshall對Object進行編碼,並且寫入bytebuf...
    protected void encode(Object msg, ByteBuf out) throws Exception {
        try {
            //1. 獲取寫入位置
            int lengthPos = out.writerIndex();
            //2. 先寫入4個bytes,用於記錄Object物件編碼後長度
            out.writeBytes(LENGTH_PLACEHOLDER);
            //3. 使用代理物件,防止marshaller寫完之後關閉byte buf
            ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
            //4. 開始使用marshaller往bytebuf中編碼
            marshaller.start(output);
            marshaller.writeObject(msg);
            //5. 結束編碼
            marshaller.finish();
            //6. 設定物件長度
            out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
        } finally {
            marshaller.close();
        }
    }
}

Netty訊息解碼類:NettyMessageDecoder:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import proprietary.protocol.dao.Header;
import proprietary.protocol.dao.NettyMessage;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Netty訊息解碼類
 */
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {

    MarshallingDecoder marshallingDecoder;

    public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,
                               int lengthFieldLength) throws IOException {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
        marshallingDecoder = new MarshallingDecoder();
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
            throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }

        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setCrcCode(frame.readInt());
        header.setLength(frame.readInt());
        header.setSessionID(frame.readLong());
        header.setType(frame.readByte());
        header.setPriority(frame.readByte());

        int size = frame.readInt();
        if (size > 0) {
            Map<String, Object> attch = new HashMap<String, Object>(size);
            int keySize = 0;
            byte[] keyArray = null;
            String key = null;
            for (int i = 0; i < size; i++) {
                keySize = frame.readInt();
                keyArray = new byte[keySize];
                frame.readBytes(keyArray);
                key = new String(keyArray, "UTF-8");
                attch.put(key, marshallingDecoder.decode(frame));
            }
            keyArray = null;
            key = null;
            header.setAttachment(attch);
        }
        if (frame.readableBytes() > 4) {
            message.setBody(marshallingDecoder.decode(frame));
        }
        message.setHeader(header);
        return message;
    }
}

這裡用Netty的LengthFieldBasedFrameDecoder解碼器,它支援自動的TCP粘包和半包處理,只需要給出標識訊息長度的欄位偏移量和訊息長度自身所佔的位元組數,Netty就能自動實現對半包的處理。

Netty訊息解碼工具類:MarshallingDecoder:

import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;

import java.io.IOException;
import java.io.StreamCorruptedException;

/**
 * Netty訊息解碼工具類
 */
public class MarshallingDecoder {

    private final Unmarshaller unmarshaller;

    /**
     * Creates a new decoder whose maximum object size is {@code 1048576} bytes.
     * If the size of the received object is greater than {@code 1048576} bytes,
     * a {@link StreamCorruptedException} will be raised.
     *
     * @throws IOException
     */
    public MarshallingDecoder() throws IOException {
        unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
    }

    protected Object decode(ByteBuf in) throws Exception {
        //1. 讀取第一個4bytes,裡面放置的是object物件的byte長度
        int objectSize = in.readInt();
        ByteBuf buf = in.slice(in.readerIndex(), objectSize);
        //2 . 使用bytebuf的代理類
        ByteInput input = new ChannelBufferByteInput(buf);
        try {
            //3. 開始解碼
            unmarshaller.start(input);
            Object obj = unmarshaller.readObject();
            unmarshaller.finish();
            //4. 讀完之後設定讀取的位置
            in.readerIndex(in.readerIndex() + objectSize);
            return obj;
        } finally {
            unmarshaller.close();
        }
    }
}
import org.jboss.marshalling.*;

import java.io.IOException;

/**
 * 解碼器工廠類
 */
public final class MarshallingCodecFactory {

    /**
     * 建立Jboss Marshaller
     *
     * @return
     * @throws IOException
     */
    protected static Marshaller buildMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        Marshaller marshaller = marshallerFactory
                .createMarshaller(configuration);
        return marshaller;
    }

    /**
     * 建立Jboss Unmarshaller
     *
     * @return
     * @throws IOException
     */
    protected static Unmarshaller buildUnMarshalling() throws IOException {
        final MarshallerFactory marshallerFactory = Marshalling
                .getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        final Unmarshaller unmarshaller = marshallerFactory
                .createUnmarshaller(configuration);
        return unmarshaller;
    }
}
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;

import java.io.IOException;

/**
 * {@link ByteInput} implementation which reads its data from a {@link ByteBuf}
 */
public class ChannelBufferByteInput implements ByteInput {

    private final ByteBuf buffer;

    public ChannelBufferByteInput(ByteBuf buffer) {
        this.buffer = buffer;
    }

    @Override
    public void close() throws IOException {
        // nothing to do
    }

    @Override
    public int available() throws IOException {
        return buffer.readableBytes();
    }

    @Override
    public int read() throws IOException {
        if (buffer.isReadable()) {
            return buffer.readByte() & 0xff;
        }
        return -1;
    }

    @Override
    public int read(byte[] array) throws IOException {
        return read(array, 0, array.length);
    }

    @Override
    public int read(byte[] dst, int dstIndex, int length) throws IOException {
        int available = available();
        if (available == 0) {
            return -1;
        }

        length = Math.min(available, length);
        buffer.readBytes(dst, dstIndex, length);
        return length;
    }

    @Override
    public long skip(long bytes) throws IOException {
        int readable = buffer.readableBytes();
        if (readable < bytes) {
            bytes = readable;
        }
        buffer.readerIndex((int) (buffer.readerIndex() + bytes));
        return bytes;
    }

}
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;

import java.io.IOException;

/**
 * {@link ByteOutput} implementation which writes the data to a {@link ByteBuf}
 *
 *
 */
public class ChannelBufferByteOutput implements ByteOutput {

    private final ByteBuf buffer;

    /**
     * Create a new instance which use the given {@link ByteBuf}
     */
    public ChannelBufferByteOutput(ByteBuf buffer) {
        this.buffer = buffer;
    }

    @Override
    public void close() throws IOException {
        // Nothing to do
    }

    @Override
    public void flush() throws IOException {
        // nothing to do
    }

    @Override
    public void write(int b) throws IOException {
        buffer.writeByte(b);
    }

    @Override
    public void write(byte[] bytes) throws IOException {
        buffer.writeBytes(bytes);
    }

    @Override
    public void write(byte[] bytes, int srcIndex, int length) throws IOException {
        buffer.writeBytes(bytes, srcIndex, length);
    }

    /**
     * Return the {@link ByteBuf} which contains the written content
     *
     */
    ByteBuf getBuffer() {
        return buffer;
    }
}

握手和安全認證

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import proprietary.protocol.dao.Header;
import proprietary.protocol.dao.NettyMessage;

/**
 * 握手認證的客戶端
 */
public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {

//    private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class);

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward to the
     * next {@link ChannelHandler} in the {@link ChannelPipeline}.
     * <p/>
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(buildLoginReq());
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to
     * the next {@link ChannelHandler} in the {@link ChannelPipeline}.
     * <p/>
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 如果是握手應答訊息,需要判斷是否認證成功
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            byte loginResult = (Byte) message.getBody();
            if (loginResult != (byte) 0) {
                // 握手失敗,關閉連線
                ctx.close();
            } else {
//                LOG.info("Login is ok : " + message);
                System.out.println("登入成功:"+message);
                ctx.fireChannelRead(msg);
            }
        } else
            //呼叫下一個channel鏈..
            ctx.fireChannelRead(msg);
    }

    /**
     * 構建登入請求
     */
    private NettyMessage buildLoginReq() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_REQ.value());
        message.setHeader(header);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

當客戶端和服務端TCP三次握手後,由客戶端構造握手請求訊息傳送給服務端,由於採用IP白名單認證機制,因此不需要攜帶訊息體,訊息體為空,訊息型別為“3:握手請求訊息”。握手請求傳送後,按照協議規範,服務端需要返回握手應答訊息。

這裡首先對握手應答訊息進行處理,判斷訊息是否是握手應答訊息,如果不是直接透傳給後面的ChannelHandler進行處理;如果是握手應答訊息則對應答結果進行判斷,如果非0說明認證失敗關閉鏈路,重新發起連線。

服務端的握手接入和安全認證程式碼:

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import proprietary.protocol.dao.Header;
import proprietary.protocol.dao.NettyMessage;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 服務端的握手和安全認證
 */
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {

//    private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class);

    /**
     * 本地快取
     */
    private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();
    private String[] whitekList = {"127.0.0.1", "192.168.1.104"};

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to
     * the next {@link ChannelHandler} in the {@link ChannelPipeline}.
     * <p>
     * Sub-classes may override this method to change behavior.
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;

        // 如果是握手請求訊息,處理,其它訊息透傳
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_REQ
                .value()) {
            String nodeIndex = ctx.channel().remoteAddress().toString();
            NettyMessage loginResp = null;
            // 重複登陸,拒絕
            if (nodeCheck.containsKey(nodeIndex)) {
                loginResp = buildResponse((byte) -1);
            } else {
                InetSocketAddress address = (InetSocketAddress) ctx.channel()
                        .remoteAddress();
                String ip = address.getAddress().getHostAddress();
                boolean isOK = false;
                for (String WIP : whitekList) {
                    if (WIP.equals(ip)) {
                        isOK = true;
                        break;
                    }
                }
                loginResp = isOK ? buildResponse((byte) 0)
                        : buildResponse((byte) -1);
                if (isOK)
                    nodeCheck.put(nodeIndex, true);
            }
            System.out.println("登入響應是:"+loginResp+" body :"+loginResp.getBody());
            ctx.writeAndFlush(loginResp);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private NettyMessage buildResponse(byte result) {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.LOGIN_RESP.value());
        message.setHeader(header);
        message.setBody(result);
        return message;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        nodeCheck.remove(ctx.channel().remoteAddress().toString());// 刪除快取
        ctx.close();
        ctx.fireExceptionCaught(cause);
    }
}

這裡首先定義了重複登入保護和IP認證白名單列表,主要用於提升握手的可靠性。在channelRead方法中用於接入認證,首先根據客戶端的源地址進行重複登入判斷,如果已經成功登入,拒絕重複登入,以防止由於客戶端重複登入導致控制代碼洩露。然後通過ChannelHandlerContext的Channel介面獲取客戶端的InetSocketAddress地址,從中取得傳送方的源地址資訊,通過源地址進行白名單校驗,最後通過buildResponse方法構造握手應答訊息返回給客戶端。當發生異常時,需要將客戶端資訊從登入登錄檔中去註冊,以保證後續客戶端可以重連成功。

心跳檢測機制

握手成功後由客戶端主動傳送心跳訊息,服務端接收到心跳訊息後,返回心跳應答訊息。由於心跳訊息的目的是為了檢測鏈路的可用性,因此不需要攜帶訊息體。

客戶端傳送心跳訊息請求程式碼如下:


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import proprietary.protocol.dao.Header;
import proprietary.protocol.dao.NettyMessage;

import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * 客戶端心跳訊息
 */
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {

//    private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class);

    //使用定時任務傳送
    private volatile ScheduledFuture<?> heartBeat;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;
        // 當握手成功後,Login響應向下透傳,主動傳送心跳訊息
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.LOGIN_RESP
                .value()) {
            //NioEventLoop是一個Schedule,因此支援定時器的執行,建立心跳計時器
            heartBeat = ctx.executor().scheduleAtFixedRate(
                    new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,
                    TimeUnit.MILLISECONDS);
        } else if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_RESP
                .value()) {
            System.out.println("客戶端接收伺服器心跳訊息--------->"+message);
//            LOG.info("Client receive server heart beat message : ---> "
//                    + message);
        } else
            ctx.fireChannelRead(msg);
    }

    //Ping訊息任務類
    private class HeartBeatTask implements Runnable {
        private final ChannelHandlerContext ctx;

        public HeartBeatTask(final ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            NettyMessage heatBeat = buildHeatBeat();
//            LOG.info("Client send heart beat messsage to server : ---> "
//                    + heatBeat);
            System.out.println("客戶端將心跳資訊傳送到伺服器------>"+heatBeat);
            ctx.writeAndFlush(heatBeat);
        }

        private NettyMessage buildHeatBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_REQ.value());
            message.setHeader(header);
            return message;
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        if (heartBeat != null) {
            heartBeat.cancel(true);
            heartBeat = null;
        }
        ctx.fireExceptionCaught(cause);
    }
}

當握手成功後,握手請求Handler會繼續將握手成功訊息進行向下傳遞,HeartBeatReqHandler接收到之後對訊息進行判斷,如果是握手成功訊息,則啟動無限迴圈定時器用於定期傳送心跳訊息。

為了統一在一個Handler中處理所有心跳訊息,因此下面用於接收服務端傳送心跳應答訊息,並列印客戶端接收和傳送的心跳訊息。

服務端的心跳應答Handler:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import proprietary.protocol.dao.Header;
import proprietary.protocol.dao.NettyMessage;

/**
 * 服務端的心跳應答Handler:
 */
public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {

//    private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class);


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        NettyMessage message = (NettyMessage) msg;
        // 返回心跳應答訊息
        if (message.getHeader() != null
                && message.getHeader().getType() == MessageType.HEARTBEAT_REQ
                .value()) {
//            LOG.info("Receive client heart beat message : ---> "
//                    + message);
            System.out.println("接收客戶心跳資訊:--->"+message);
            NettyMessage heartBeat = buildHeatBeat();
//            LOG.info("Send heart beat response message to client : ---> "
//                    + heartBeat);
            System.out.println("向客戶傳送心跳響應資訊:--->"+heartBeat);
            ctx.writeAndFlush(heartBeat);
        } else
            ctx.fireChannelRead(msg);
    }

    private NettyMessage buildHeatBeat() {
        NettyMessage message = new NettyMessage();
        Header header = new Header();
        header.setType(MessageType.HEARTBEAT_RESP.value());
        message.setHeader(header);
        return message;
    }

}

服務端的心跳Handler在接收到心跳請求訊息之後,構造心跳應答訊息返回,並列印接收和傳送的心跳訊息。

public enum MessageType {

    LOGIN_REQ((byte)3),
    LOGIN_RESP((byte)4),
    HEARTBEAT_REQ((byte)5),
    HEARTBEAT_RESP((byte)6),
    ;

    public byte value;

    MessageType(byte v){
        this.value = v;
    }

    public byte value(){
        return value;
    }

}
public final class NettyConstant {
    public static final String REMOTEIP = "127.0.0.1";
    public static final int PORT = 8080;
    public static final int LOCAL_PORT = 12088;
    public static final String LOCALIP = "127.0.0.1";
}

客戶端程式碼

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import proprietary.protocol.*;
import proprietary.protocol.NettyMessageDecoder;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * 客戶端
 */
public class NettyClient {

//    private static final Log LOG = LogFactory.getLog(NettyClient.class);

    private ScheduledExecutorService executor = Executors
            .newScheduledThreadPool(1);

    EventLoopGroup group = new NioEventLoopGroup();

    public void connect(int port, String host) throws Exception {

        // 配置客戶端NIO執行緒組

        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new NettyMessageDecoder(1024 * 1024, 4, 4));
                            ch.pipeline().addLast("MessageEncoder",
                                    new NettyMessageEncoder());
                            ch.pipeline().addLast("readTimeoutHandler",
                                    new ReadTimeoutHandler(50));
                            ch.pipeline().addLast("LoginAuthHandler",
                                    new LoginAuthReqHandler());
                            ch.pipeline().addLast("HeartBeatHandler",
                                    new HeartBeatReqHandler());
                        }
                    });
            // 發起非同步連線操作
            ChannelFuture future = b.connect(
                    new InetSocketAddress(host, port),
                    new InetSocketAddress(NettyConstant.LOCALIP,
                            NettyConstant.LOCAL_PORT)).sync();
            // 當對應的channel關閉的時候,就會返回對應的channel。
            // Returns the ChannelFuture which will be notified when this channel is closed. This method always returns the same future instance.
            future.channel().closeFuture().sync();
        } finally {
            // 所有資源釋放完成之後,清空資源,再次發起重連操作
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        try {
                            connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 發起重連操作
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
    }

}

此處用於Netty訊息解碼,為了防止由於單條訊息國道導致記憶體溢位或者畸形碼流導致解碼錯位引起記憶體分配失敗,這裡對單條訊息做了最大長度進行上限限制。

這是新增了Netty訊息解碼器,用於協議訊息的自動編碼。隨後依次增加了讀超時Handler,握手請求Handler和心跳Handler。

利用Netty的ChannelPipeline和ChannelHandler機制,可以非常方便地實現功能解耦和業務產品的定製。通過Handler Chain的機制可以方便地實現切面攔截和定製,相比於AOP它的效能更高。

服務端程式碼

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import proprietary.protocol.*;
import proprietary.protocol.NettyMessageDecoder;

import java.io.IOException;

/**
 * 服務端
 */
public class NettyServer {

//    private static final Log LOG = LogFactory.getLog(NettyServer.class);

    public void bind() throws Exception {
        // 配置服務端的NIO執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)
                            throws IOException {
                        ch.pipeline().addLast(
                                new NettyMessageDecoder(1024 * 1024, 4, 4));
                        ch.pipeline().addLast(new NettyMessageEncoder());
                        ch.pipeline().addLast("readTimeoutHandler",
                                new ReadTimeoutHandler(50));
                        ch.pipeline().addLast(new LoginAuthRespHandler());
                        ch.pipeline().addLast("HeartBeatHandler",
                                new HeartBeatRespHandler());
                    }
                });

        // 繫結埠,同步等待成功
        b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
        System.out.println("Netty服務啟動正常:"+(NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
//        LOG.info("Netty server start ok : "
//                + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
    }

    public static void main(String[] args) throws Exception {
        new NettyServer().bind();
    }
}

將服務端關閉後:

重試機制!!!!!!!

參考《Netty權威指南》