springcloud 整合Mqtt,kafka
阿新 • • 發佈:2018-12-25
springcloud 整合Mqtt,kafka (rabbit與kafka整和方式相同)
原理說明:
springcloud整合mqtt主要是使用spring-boot-starter-integration、spring-integration-mqtt,springcloud整合kafka、rabbitmq主要是使用spring-cloud-starter-stream-kafka
springcloud整合mqtt
依賴:
<!--mqtt--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
整合程式碼:
import cn.enncloud.iot.iotmqtttransferkafkahps.constant.AdapterProperties; import cn.enncloud.iot.iotmqtttransferkafkahps.process.MessageMqttProcessHandler; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Optional; @Slf4j @Configuration public class MqttConfig { @Autowired private MessageMqttProcessHandler messageProcess; @Autowired private AdapterProperties adapterProperties; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { adapterProperties.getHost() }); options.setUserName(adapterProperties.getUsername()); options.setPassword(adapterProperties.getPassword().toCharArray()); options.setCleanSession(adapterProperties.isCleanSession()); factory.setConnectionOptions(options); return factory; } // publisher @Bean public IntegrationFlow mqttOutFlow() { //console input // return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(), // e -> e.poller(Pollers.fixedDelay(1000))) // .transform(p -> p + " sent to MQTT") // .handle(mqttOutbound()) // .get(); return IntegrationFlows.from(outChannelMqtt()) .handle(mqttOutbound()) .get(); } @Primary @Bean public MessageChannel outChannelMqtt() { return new DirectChannel(); } @Bean public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(adapterProperties.getPublisher(), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(adapterProperties.getTopic()); return messageHandler; } // consumer @Bean public IntegrationFlow mqttInFlow() { return IntegrationFlows.from(mqttInbound()) // .transform(p -> p + ", received from MQTT") // .handle(logger()) .handle(accephandler()) .get(); } @Bean // 這裡注入處理邏輯服務 public MessageHandler accephandler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { Optional optional = Optional.ofNullable(message.getPayload()); if(optional.isPresent()){ messageProcess.doProcess(message); }else{ log.info("my"+message.getHeaders()); } } }; } private LoggingHandler logger() { LoggingHandler loggingHandler = new LoggingHandler("INFO"); loggingHandler.setLoggerName("siSample"); return loggingHandler; } @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(adapterProperties.getConsumer(), mqttClientFactory(), adapterProperties.getTopic()); adapter.setCompletionTimeout(adapterProperties.getTimeout()); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(adapterProperties.getQos()); return adapter; } @MessagingGateway(defaultRequestChannel = "outChannelMqtt") public interface MessageWriter{ void write(String data); } }
寫一個傳送訊息demo
import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.MqttConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController public class SendMessageController { @Autowired MqttConfig.MessageWriter messageWriter; @RequestMapping(value = "send",method = RequestMethod.GET) public void send(@RequestParam(value = "message") String message){ log.info("收到meseage"+message); messageWriter.write(message); } }
yml中配置:
enn: mqtt: # todo host: tcp://**:1883 publisher: samplePublisher # todo consumer: dampleConsumerprod2 # 共享訂閱 # topic: $share/group1/allInOne # 本地共享訂閱 topic: $local/$share/group1/allInOne username: %% password: %% timeout: 5000 qos: 2 cleanSession: false
springcloud整合kafka(rabbitmq相同)
依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <!--<dependency>--> <!--<groupId>org.springframework.cloud</groupId>--> <!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>--> <!--</dependency>--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
整合程式碼(已經包含了使用邏輯):
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface KafkaChannel { /** * 發訊息的通道名稱 */ String CIM_OUTPUT = "cim_output"; /** * 訊息的訂閱通道名稱 */ // String CIM_INPUT = "cim_input"; /** * 發訊息的通道 * * @return */ @Output(CIM_OUTPUT) MessageChannel sendCIMMessage(); /** * 收訊息的通道 * * @return */ // @Input(CIM_INPUT) // SubscribableChannel recieveCIMMessage(); }
@EnableBinding(value = Sink.class) public class MessageKafkaCimHandler { @Autowired private MessageKafkaOutHandler messageKafkaOutHandler; private static final String CHARSET = "UTF-8"; @StreamListener(Sink.INPUT) public void doProcess(Message<byte[]> message){ messageKafkaOutHandler.doProcess(MessageBuilder.withPayload(kafkaData).build()); } }
import cn.enncloud.iot.iotmqtttransferkafkahps.configuration.KafkaChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.stereotype.Component; @Component @Slf4j @EnableBinding(value = KafkaChannel.class) public class MessageKafkaOutHandler { @Qualifier(value = KafkaChannel.CIM_OUTPUT) @Autowired private MessageChannel cimOutput; public boolean doProcess(Message<?> message) throws Exception{ return this.cimOutput.send(message); } }
yml配置檔案:
spring: cloud: stream: default-binder: kafka bindings: #cim_output自定義配置的,上看程式碼中的介面KafkaChannel .output,input不需要配置介面,預設提供,直接使用即可。 cim_output: destination: topic1 content-type: application/json binder: kafka producer: headerMode: raw output: destination: topic2 content-type: application/json binder: kafka producer: headerMode: raw input: destination: topic2 content-type: application/json binder: kafka group: hpsgroup consumer: headerMode: raw kafka: binder: brokers: *.*.*.1:9092,1.*.*.*:9092,1.*.*.*:9092 zk-nodes: *.*.*.1:2181,1.*.*.*:2181,1.*.*.*:2181 kafka: producer: client-id: mqtttransferkafka
最後提醒:
複製要注意把配置複製全了,都是血淚調試出來的必需品,少了可能就要開始湯坑了。