1. 程式人生 > >MQTT 大訊息失敗原因排查

MQTT 大訊息失敗原因排查

Background

小組內使用 MQTT 協議搭建了一個聊天伺服器,前天在測大訊息(超過5000漢字)時,連線直接變得不可用,後續傳送的訊息全部都收不到回覆。

伺服器環境:
Netty :4.1.32.Final
使用的是 Netty 包中自帶的 MqttDecoder

客戶端: Android

排查過程

  1. 由於所有的訊息都列印了日誌,因此先搜了一下伺服器日誌,發現日誌中並沒有傳送的訊息內容。
  2. 難道是客戶端在超長訊息時沒有傳送?使用 tcpdump 抓了包,發現客戶端正常傳送,並且所有的包服務端都已經 ack,但是後續服務端沒有發回響應,猜測是服務端在大訊息的情況下處理失敗了。
    1. tcpdump
      使用 -nn 打印出ip和埠,-X 列印網路包的內容,也可以使用-w 選項儲存到檔案裡,然後使用 tcpdumpwireshark 來分析
  3. 於是查了一下 MQTT 支援的最大 payload,MQTT 官方文件 中說明是 256M,這個大小肯定不會超過。
  4. 在服務端抓了下包,確認訊息已經收到,但是無確認訊息返回
  5. 開啟線上debug,發現收到了一個 PUBLISH 型別的訊息,但是訊息的 class 不為 MqttPublishMessage, 且 payload 中無資料,但在 Message 中有一個報錯訊息 too large message: 56234 bytes
  6. Google 一下,有網友遇到了
    同樣的問題
    , 雖然這個問題裡 MQTT 是 C 語言的。
  7. 檢視 MqttDecoder, 發現 decoder 有最長 payload 限制(以下為部分程式碼),啟動程式碼裡呼叫的是預設建構函式,因此預設最長資料為 8092 位元組。
public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
    public MqttDecoder() {
      this(DEFAULT_MAX_BYTES_IN_MESSAGE);
    }

    public MqttDecoder(int maxBytesInMessage) {
        super(DecoderState.READ_FIXED_HEADER);
        this.maxBytesInMessage = maxBytesInMessage;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        switch (state()) {
            case READ_FIXED_HEADER: try {
                mqttFixedHeader = decodeFixedHeader(buffer);
                bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
                checkpoint(DecoderState.READ_VARIABLE_HEADER);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_VARIABLE_HEADER:  try {
                final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
                variableHeader = decodedVariableHeader.value;
                if (bytesRemainingInVariablePart > maxBytesInMessage) {
                    throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
                }
                bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
                checkpoint(DecoderState.READ_PAYLOAD);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_PAYLOAD: try {
                final Result<?> decodedPayload =
                        decodePayload(
                                buffer,
                                mqttFixedHeader.messageType(),
                                bytesRemainingInVariablePart,
                                variableHeader);
                bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
                if (bytesRemainingInVariablePart != 0) {
                    throw new DecoderException(
                            "non-zero remaining payload bytes: " +
                                    bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
                }
                checkpoint(DecoderState.READ_FIXED_HEADER);
                MqttMessage message = MqttMessageFactory.newMessage(
                        mqttFixedHeader, variableHeader, decodedPayload.value);
                mqttFixedHeader = null;
                variableHeader = null;
                out.add(message);
                break;
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case BAD_MESSAGE:
                // Keep discarding until disconnection.
                buffer.skipBytes(actualReadableBytes());
                break;

            default:
                // Shouldn't reach here.
                throw new Error();
        }
    }

    private MqttMessage invalidMessage(Throwable cause) {
      checkpoint(DecoderState.BAD_MESSAGE);
      return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
    }
}
  1. 長訊息的原因找到了,還剩一個問題,為什麼後續的訊息包括 ping 訊息就再也發不出去了?經過檢視程式碼,這與 MqttDecoder 的父類 ReplayingDecoder 有關係,檢視原始碼有詳盡的類說明, 在讀取可變長度頭部時,如果payload 超過了最大限制,那麼直接丟擲異常。摘出程式碼如下:
case READ_VARIABLE_HEADER:  try {
    final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
    variableHeader = decodedVariableHeader.value;
    if (bytesRemainingInVariablePart > maxBytesInMessage) {
        throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
    }
    bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
    checkpoint(DecoderState.READ_PAYLOAD);
    // fall through
} catch (Exception cause) {
    out.add(invalidMessage(cause));
    return;
}

在異常處理中,呼叫了 invalidMessage 方法,這個方法將 狀態設為 DecoderState.BAD_MESSAGE, 在這個狀態下,所有的位元組都直接被丟棄。

case BAD_MESSAGE:
    // Keep discarding until disconnection.
    buffer.skipBytes(actualReadableBytes());
    break;

也就是說此後的訊息都不會進入到業務處理邏輯,這條長連線廢掉了。

解決方案

  1. 客戶端對長訊息做字數限制和拆分,保證單條訊息不超過最大限制
  2. 服務端增大最大載荷長度,MqttDecoder 提供了建構函式(不建議使用,這樣會增大伺服器處理時間和記