1. 程式人生 > >Spring整合MQTT

Spring整合MQTT

Spring整合提供inbound與outbound通道介面卡以支援MQTT協議,當前實現使用Eclipse Paho MQTT Client庫。 通過DefaultMqttPahoClientFactory配置兩個介面卡,參考Paho文件獲取關於配置選項更多的資訊。

Inbound(訊息驅動)通道介面卡

     由MqttPahoMessageDrivenChannelAdapter實現,為方便起見,可以採用名稱空間的方式進行配置。最小配置可能會是這樣: <bean id="clientFactory"

  class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
 <property name="userName" value="${mqtt.username}"/>
 <property name="password" value="${mqtt.password}"/>
</bean>

<int-mqtt:message-driven-channel-adapter id
="mqttInbound"
 client-id="${mqtt.default.client.id}.src"
 url="${mqtt.url}"
 topics="sometopic"
 client-factory="clientFactory"
 channel="output"/> 屬性: <int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
 client-id="foo"
 url="tcp://localhost:1883"  
 topics="bar,baz"
 qos="1,2"
 converter="myConverter"
 client-factory="clientFactory"
 send-timeout="123"
 error-channel="errors"
 recovery-interval="10000"
 channel="out" /> 1)客戶端id 2)代理URL 3)介面卡會接受到訊息的一組以逗號分隔的主題 4)以逗號分隔的一組QoS值,可以是所有主題運用單一值,或者每一個主題一個值(列表必須同樣長度) 5)MqttMessageConverter(可選項),預設DefaultPahoMessageConverter生成訊息帶字串載荷(預設),攜帶頭部包括:      mqtt_topic     接收訊息主題      mqtt_duplicate     如果訊息重複,值為true      mqtt_qos     業務質量   DefaultPahoMessageConverter可配置為返回載荷原始byte[]型別,通過將其宣告為一個實體類<bean/>,並且設定payloadAsBytes屬性 6)客戶端工廠 7)傳送超時-如果通道可能會阻塞,才會運用(例如當前已滿的邊界QueueChannel) 8)錯誤通道--如果使用的話,ErrorMessage訊息下行異常會發送至該通道,載荷為MessagingException,包含錯誤訊息與原因 9)恢復間隔--控制在故障之後介面卡會嘗試重新連線的時間間隔,預設為10000ms(10s)      從4.1版本開始,程式設計方式改變介面卡訂閱的主題可以省略url,DefaultMqttPahoClientFactory屬性serverURIs可以提供服務端URI,例如,這將使能連線至HA高可用簇。      從4.2.2版本開始,當介面卡成功訂閱至主題後,釋出MqttSubscribedEvent,當連線/訂閱失敗時,釋出MqttConnectionFailedEvent。這些事件可以由實現ApplicationListener介面的實體類獲取。       新的屬性recoveryInterval控制在故障之後介面卡會嘗試重新連線的時間間隔,預設為10000ms(10s)      在4.2.3版本之前,當介面卡停止後,客戶端總是會解除訂閱。這是不正確的,因為如果客戶端QoS大於0,我們需要保持訂閱以便介面卡停止時到達的訊息在下一次開始時會傳送。這也需要設定客戶端工廠cleanSession屬性為false,預設值為true。      從4.2.3版本開始,介面卡不會解除訂閱(預設),如果cleanSession值為false。可以重寫該行為,通過設定工廠屬性consumerCloseAction,可以有以下值:UNSUBSCRIBE_ALWAYS, UNSUBSCRIBE_NEVER以及UNSUBSCRIBE_CLEAN,後者(預設)會解除訂閱僅當cleanSession屬性值為true。      回退至4.2.3之前的行為,使用UNSUBSCRIBE_ALWAYS。
執行時增加/刪除主題      從4.1版本開始,是可能的。提供方法addTopic()與removeTopic(),當增加主題時,可以可選地指明QoS(預設值為1),也可以通過傳送合適的訊息至<control-bus/>,攜帶合適的載荷修改主題:
myMqttAdapter.addTopic('foo', 1)
     停止/啟動介面卡對主題列表無影響(不會回退至配置裡原始設定值),更改不會保留至超出應用上下文生命週期;新的應用上下文會回退到配置的設定值。      當介面卡停止時(或者從代理斷開連線時)改變主題會在下一次連線建立時生效。
Java配置

     下面的Spring Boot應用程式提供了一個運用Java配置來配置inbound介面卡的例子:



  1. @SpringBootApplication
  2. public class MqttJavaApplication {
  3. public static void main(String[] args) {
  4. new SpringApplicationBuilder(MqttJavaApplication.class)
  5. .web( false)
  6. .run(args);
  7. }
  8. @Bean
  9. public MessageChannel mqttInputChannel() {
  10. return new DirectChannel();
  11. }
  12. @Bean
  13. public MessageProducer inbound() {
  14. MqttPahoMessageDrivenChannelAdapter adapter =
  15. new MqttPahoMessageDrivenChannelAdapter( "tcp://localhost:1883", "testClient",
  16. "topic1", "topic2");
  17. adapter.setCompletionTimeout( 5000);
  18. adapter.setConverter( new DefaultPahoMessageConverter());
  19. adapter.setQos( 1);
  20. adapter.setOutputChannel(mqttInputChannel());
  21. return adapter;
  22. }
  23. @Bean
  24. @ServiceActivator(inputChannel = "mqttInputChannel")
  25. public MessageHandler handler() {
  26. return new MessageHandler() {
  27. @Override
  28. public void handleMessage(Message<?> message) throws MessagingException {
  29. System.out.println(message.getPayload());
  30. }
  31. };
  32. }
  33. }

Outbound通道介面卡

     由ConsumerEndpoint內包裝的MqttPahoMessageHandler實現,為方便起見,可以採用名稱空間的方式配置:      從版本4.1開始,介面卡支援非同步傳送,避免阻塞直到傳送確認,如果想要的話,應用事件可被髮出使能應用確認傳遞。 屬性:
<int-mqtt:outbound-channel-adapter id="withConverter"
	client-id="foo"  
	url="tcp://localhost:1883"  
	converter="myConverter"  
	client-factory="clientFactory"  
	default-qos="1"  
	default-retained="true"  
	default-topic="bar"  
	async="false"  
	async-events="false"  
	channel="target" />
1)客戶端id 2) 代理URL 3)MqttMessageConverter(可選),預設DefaultPahoMessageConverter識別以下頭部:      mqtt_topic     訊息傳送主題      mqtt_retained     如果訊息保留的話,值為true      mqtt_qos     業務質量 4)客戶端工廠 5)預設業務質量(用於未發現mqtt_qos頭部的情況),如果自定義converter提供的話,不允許採用 6)保留標記符預設值(用於未發現mqtt_retained頭部的情況),如果自定義converter提供的話,不允許採用 7)訊息傳送預設主題(用於未發現mqtt_topic頭部的情況) 8)當為true,當傳送訊息時,呼叫者不會阻塞等待傳送確認,預設值:false(傳送阻塞直到傳送確認) 9)當async和async-events都為true時,發出MqttMessageSentEvent事件,包含訊息、主題以及由客戶端庫生成的訊息id,客戶端id和客戶端例項(每次客戶端連線增加)。當傳送由客戶端庫確認,發出MqttMessageDeliveredEvent,包含訊息號、客戶端號和客戶端例項,使傳送與傳送相關聯。這些事件可以由任意ApplicationListener接收,或者通過事件inbound通道介面卡。注意:在MqttMessageSentEvent之前可能會接收到MqttMessageDeliveredEvent。預設值為false。
     從版本4.1開始,可以省略url,DefaultMqttPahoClientFactory屬性serverURIs可以提供伺服器URI。例如,這將使能連線至HA高可用簇。 Java      下面的Spring Boot應用程式給出了一個使用Java配置配置outbound介面卡的例子:
  1. @SpringBootApplication
  2. @IntegrationComponentScan
  3. public class MqttJavaApplication {
  4. public static void main(String[] args) {
  5. ConfigurableApplicationContext context =
  6. new SpringApplicationBuilder(MqttJavaApplication.class)
  7. .web( false)
  8. .run(args);
  9. MyGateway gateway = context.getBean(MyGateway.class);
  10. gateway.sendToMqtt( "foo");
  11. }
  12. @Bean
  13. public MqttPahoClientFactory mqttClientFactory() {
  14. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  15. factory.setServerURIs( "tcp://host1:1883", "tcp://host2:1883");
  16. factory.setUserName( "username");
  17. factory.setPassword( "password");
  18. return factory;
  19. }
  20. @Bean
  21. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  22. public MessageHandler mqttOutbound() {
  23. MqttPahoMessageHandler messageHandler =
  24. new MqttPahoMessageHandler( "testClient", mqttClientFactory());
  25. messageHandler.setAsync( true);
  26. messageHandler.setDefaultTopic( "testTopic");
  27. return messageHandler;
  28. }
  29. @Bean
  30. public MessageChannel mqttOutboundChannel() {
  31. return new DirectChannel();
  32. }
  33. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  34. public interface MyGateway {
  35. void sendToMqtt(String data);
  36.      }
  37.        }

  1. @SpringBootApplication
  2. public class MqttJavaApplication {
  3. public static void main(String[] args) {
  4. new SpringApplicationBuilder(MqttJavaApplication.class)
  5. .web( false)
  6. .run(args);
  7. }
  8. @Bean
  9. public MessageChannel mqttInputChannel() {
  10. return new DirectChannel();
  11. }
  12. @Bean
  13. public MessageProducer inbound() {
  14. MqttPahoMessageDrivenChannelAdapter adapter =
  15. new MqttPahoMessageDrivenChannelAdapter( "tcp://localhost:1883", "testClient",
  16. "topic1", "topic2");
  17. adapter.setCompletionTimeout( 5000);
  18. adapter.setConverter( new DefaultPahoMessageConverter());
  19. adapter.setQos( 1);
  20. adapter.setOutputChannel(mqttInputChannel());
  21. return adapter;
  22. }
  23. @Bean
  24. @ServiceActivator(inputChannel = "mqttInputChannel")
  25. public MessageHandler handler() {
  26. return new MessageHandler() {
  27. @Override
  28. public void handleMessage(Message<?> message) throws MessagingException {
  29. System.out.println(message.getPayload());
  30. }
  31. };
  32. }
  33. }

Outbound通道介面卡

     由ConsumerEndpoint內包裝的MqttPahoMessageHandler實現,為方便起見,可以採用名稱空間的方式配置: