Netty實現高效能IOT伺服器(Groza)之精盡程式碼篇中
阿新 • • 發佈:2018-12-17
if (!msg.variableHeader().isCleanSession()){ List<DupPublishMessageStore> dupPublishMessageStoreList = grozaDupPublishMessageStoreService.get(msg.payload().clientIdentifier()); List<DupPubRelMessageStore> dupPubRelMessageStoreList = grozaDupPubRelMessageStoreService.get(msg.payload().clientIdentifier()); dupPublishMessageStoreList.forEach(dupPublishMessageStore-> { MqttPublishMessage publishMessage = (MqttPublishMessage)MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBLISH,true,MqttQoS.valueOf(dupPublishMessageStore.getMqttQoS()),false,0), new MqttPublishVariableHeader(dupPublishMessageStore.getTopic(),dupPublishMessageStore.getMessageId()), Unpooled.buffer().writeBytes(dupPublishMessageStore.getMessageBytes()) ); channel.writeAndFlush(publishMessage); }); dupPubRelMessageStoreList.forEach(dupPubRelMessageStore-> { MqttMessage pubRelMessage = MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.PUBREL,true,MqttQoS.AT_MOST_ONCE,false,0), MqttMessageIdVariableHeader.from(dupPubRelMessageStore.getMessageId()),null ); channel.writeAndFlush(pubRelMessage); }); }