MQTT 大訊息失敗原因排查
阿新 • • 發佈:2019-08-05
Background
小組內使用 MQTT 協議搭建了一個聊天伺服器,前天在測大訊息(超過5000漢字)時,連線直接變得不可用,後續傳送的訊息全部都收不到回覆。
伺服器環境:
Netty :4.1.32.Final
使用的是 Netty 包中自帶的 MqttDecoder
客戶端: Android
排查過程
- 由於所有的訊息都列印了日誌,因此先搜了一下伺服器日誌,發現日誌中並沒有傳送的訊息內容。
- 難道是客戶端在超長訊息時沒有傳送?使用
tcpdump
抓了包,發現客戶端正常傳送,並且所有的包服務端都已經 ack,但是後續服務端沒有發回響應,猜測是服務端在大訊息的情況下處理失敗了。tcpdump
-nn
打印出ip和埠,-X
列印網路包的內容,也可以使用-w
選項儲存到檔案裡,然後使用tcpdump
或wireshark
來分析
- 於是查了一下 MQTT 支援的最大 payload,MQTT 官方文件 中說明是 256M,這個大小肯定不會超過。
- 在服務端抓了下包,確認訊息已經收到,但是無確認訊息返回
- 開啟線上debug,發現收到了一個
PUBLISH
型別的訊息,但是訊息的 class 不為MqttPublishMessage
, 且 payload 中無資料,但在 Message 中有一個報錯訊息too large message: 56234 bytes
- Google 一下,有網友遇到了
- 檢視
MqttDecoder
, 發現 decoder 有最長 payload 限制(以下為部分程式碼),啟動程式碼裡呼叫的是預設建構函式,因此預設最長資料為8092
位元組。
public final class MqttDecoder extends ReplayingDecoder<DecoderState> { private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092; public MqttDecoder() { this(DEFAULT_MAX_BYTES_IN_MESSAGE); } public MqttDecoder(int maxBytesInMessage) { super(DecoderState.READ_FIXED_HEADER); this.maxBytesInMessage = maxBytesInMessage; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { switch (state()) { case READ_FIXED_HEADER: try { mqttFixedHeader = decodeFixedHeader(buffer); bytesRemainingInVariablePart = mqttFixedHeader.remainingLength(); checkpoint(DecoderState.READ_VARIABLE_HEADER); // fall through } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case READ_VARIABLE_HEADER: try { final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader); variableHeader = decodedVariableHeader.value; if (bytesRemainingInVariablePart > maxBytesInMessage) { throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes"); } bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; checkpoint(DecoderState.READ_PAYLOAD); // fall through } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case READ_PAYLOAD: try { final Result<?> decodedPayload = decodePayload( buffer, mqttFixedHeader.messageType(), bytesRemainingInVariablePart, variableHeader); bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed; if (bytesRemainingInVariablePart != 0) { throw new DecoderException( "non-zero remaining payload bytes: " + bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')'); } checkpoint(DecoderState.READ_FIXED_HEADER); MqttMessage message = MqttMessageFactory.newMessage( mqttFixedHeader, variableHeader, decodedPayload.value); mqttFixedHeader = null; variableHeader = null; out.add(message); break; } catch (Exception cause) { out.add(invalidMessage(cause)); return; } case BAD_MESSAGE: // Keep discarding until disconnection. buffer.skipBytes(actualReadableBytes()); break; default: // Shouldn't reach here. throw new Error(); } } private MqttMessage invalidMessage(Throwable cause) { checkpoint(DecoderState.BAD_MESSAGE); return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause); } }
- 長訊息的原因找到了,還剩一個問題,為什麼後續的訊息包括 ping 訊息就再也發不出去了?經過檢視程式碼,這與
MqttDecoder
的父類ReplayingDecoder
有關係,檢視原始碼有詳盡的類說明, 在讀取可變長度頭部時,如果payload 超過了最大限制,那麼直接丟擲異常。摘出程式碼如下:
case READ_VARIABLE_HEADER: try {
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
// fall through
} catch (Exception cause) {
out.add(invalidMessage(cause));
return;
}
在異常處理中,呼叫了 invalidMessage
方法,這個方法將 狀態設為 DecoderState.BAD_MESSAGE
, 在這個狀態下,所有的位元組都直接被丟棄。
case BAD_MESSAGE:
// Keep discarding until disconnection.
buffer.skipBytes(actualReadableBytes());
break;
也就是說此後的訊息都不會進入到業務處理邏輯,這條長連線廢掉了。
解決方案
- 客戶端對長訊息做字數限制和拆分,保證單條訊息不超過最大限制
- 服務端增大最大載荷長度,
MqttDecoder
提供了建構函式(不建議使用,這樣會增大伺服器處理時間和記