springboot如何整合mqtt訊息推送
阿新 • • 發佈:2019-01-31
1.需求分析
近期筆者專案需要用到mqtt實現訊息推送,筆者選擇emq作為mqtt伺服器載體,上篇筆者講解了如何在linux中安裝mqtt服務,安裝連結:https://blog.csdn.net/zhangxing52077/article/details/80567270,接下來筆者將講解如何在springboot中整合mqtt
2.實現方案
①pom依賴
<!--mqtt--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId></dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId></dependency>
②yml中配置mqtt(自定義配置)
#mq配置 com: mqtt: host: tcp://ip:1883 clientid: mqttjs_e8022a4d0b topic: good,test,yes username: zhangxing password: zxp52077 timeout: 10 keepalive: 20
③建立mqtt訊息屬性配置類
@Component @ConfigurationProperties(prefix = "com.mqtt") @Setter @Getter public classMqttConfiguration { private String host; private String clientid; private String topic; private String username; private String password; private int timeout; private int keepalive; }
④建立mqtt訊息推送實體
@Slf4j @Setter @Getter public class PushPayload { //推送型別 private String type; //推送物件 private String mobile; //標題 private String title; //內容 private String content; //數量 private Integer badge = 1; //鈴聲 private String sound = "default"; public PushPayload(String type, String mobile, String title, String content, Integer badge , String sound){ this.type = type; this.mobile = mobile; this.title = title; this.content = content; this.badge = badge; this.sound = sound; } public static class Builder{ //推送型別 private String type; //推送物件 private String mobile; //標題 private String title; //內容 private String content; //數量 private Integer badge = 1; //鈴聲 private String sound = "default"; public Builder setType(String type) { this.type = type; return this; } public Builder setMobile(String mobile) { this.mobile = mobile; return this; } public Builder setTitle(String title) { this.title = title; return this; } public Builder setContent(String content) { this.content = content; return this; } public Builder setBadge(Integer badge) { this.badge = badge; return this; } public Builder setSound(String sound) { this.sound = sound; return this; } public PushPayload bulid(){ return new PushPayload(type,mobile,title,content,badge,sound); } } public static Builder getPushPayloadBuider(){ return new Builder(); } @Override public String toString() { return JSON.toJSONString(this, SerializerFeature.DisableCircularReferenceDetect); } }
⑤建立mqtt訊息推送或訂閱客戶端
@Slf4j public class MqttPushClient { private MqttClient client; private static volatile MqttPushClient mqttPushClient = null; public static MqttPushClient getInstance(){ if(null == mqttPushClient){ synchronized (MqttPushClient.class){ if(null == mqttPushClient){ mqttPushClient = new MqttPushClient(); } } } return mqttPushClient; } private MqttPushClient() { connect(); } private void connect(){ try { client = new MqttClient(PropertiesUtil.MQTT_HOST, PropertiesUtil.MQTT_CLIENTID, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(PropertiesUtil.MQTT_USER_NAME); options.setPassword(PropertiesUtil.MQTT_PASSWORD.toCharArray()); options.setConnectionTimeout(PropertiesUtil.MQTT_TIMEOUT); options.setKeepAliveInterval(PropertiesUtil.MQTT_KEEP_ALIVE); try { client.setCallback(new PushCallback()); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 釋出,預設qos為0,非持久化 * @param topic * @param pushMessage */ public void publish(String topic,PushPayload pushMessage){ publish(0, false, topic, pushMessage); } /** * 釋出 * @param qos * @param retained * @param topic * @param pushMessage */ public void publish(int qos,boolean retained,String topic,PushPayload pushMessage){ MqttMessage message = new MqttMessage(); message.setQos(qos); message.setRetained(retained); message.setPayload(pushMessage.toString().getBytes()); MqttTopic mTopic = client.getTopic(topic); if(null == mTopic){ log.error("topic not exist"); } MqttDeliveryToken token; try { token = mTopic.publish(message); token.waitForCompletion(); } catch (MqttPersistenceException e) { e.printStackTrace(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個主題,qos預設為0 * @param topic */ public void subscribe(String topic){ subscribe(topic,0); } /** * 訂閱某個主題 * @param topic * @param qos */ public void subscribe(String topic,int qos){ try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { String kdTopic = "good"; PushPayload pushMessage = PushPayload.getPushPayloadBuider().setMobile("15345715326") .setContent("designModel") .bulid(); MqttPushClient.getInstance().publish(0, false, kdTopic, pushMessage);} }
⑥配置獲取類的編寫
public class PropertiesUtil { public static String MQTT_HOST; public static String MQTT_CLIENTID; public static String MQTT_USER_NAME; public static String MQTT_PASSWORD; public static int MQTT_TIMEOUT; public static int MQTT_KEEP_ALIVE; public static final String ELASTIC_SEARCH_HOST; public static final int ELASTIC_SEARCH_PORT; public static final String ELASTIC_SEARCH_CLUSTER_NAME; static { MQTT_HOST = loadMqttProperties().getProperty("MQTT_HOST"); MQTT_CLIENTID = loadMqttProperties().getProperty("MQTT_CLIENTID"); MQTT_USER_NAME = loadMqttProperties().getProperty("MQTT_USER_NAME"); MQTT_PASSWORD = loadMqttProperties().getProperty("MQTT_PASSWORD"); MQTT_TIMEOUT = Integer.valueOf(loadMqttProperties().getProperty("MQTT_TIMEOUT")); MQTT_KEEP_ALIVE = Integer.valueOf(loadMqttProperties().getProperty("MQTT_KEEP_ALIVE")); } static { ELASTIC_SEARCH_HOST = loadEsProperties().getProperty("ES_HOST"); ELASTIC_SEARCH_PORT = Integer.valueOf(loadEsProperties().getProperty("ES_PORT")); ELASTIC_SEARCH_CLUSTER_NAME = loadEsProperties().getProperty("ES_CLUSTER_NAME"); } private static Properties loadMqttProperties() { InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/mqtt.yml"); Properties properties = new Properties(); try { properties.load(inputstream); return properties; } catch (IOException e) { throw new RuntimeException(e); } finally { try { if (inputstream != null) { inputstream.close(); } } catch (IOException e) { throw new RuntimeException(e); } } } private static Properties loadEsProperties() { InputStream inputstream = PropertiesUtil.class.getResourceAsStream("/elasticsearch.properties"); Properties properties = new Properties(); try { properties.load(inputstream); return properties; } catch (IOException e) { throw new RuntimeException(e); } finally { try { if (inputstream != null) { inputstream.close(); } } catch (IOException e) { throw new RuntimeException(e); } } } }
⑦mqtt推送回調類
/** * @auther zx * @date 2018/5/28 9:20 */ public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 連線丟失後,一般在這裡面進行重連 System.out.println("連線斷開,可以做重連"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe後得到的訊息會執行到這裡面 System.out.println("接收訊息主題 : " + topic); System.out.println("接收訊息Qos : " + message.getQos()); System.out.println("接收訊息內容 : " + new String(message.getPayload())); } }
3.效果測試
@Test public void test() { PushPayload pushPayload = PushPayload.getPushPayloadBuider().setContent("test") .setMobile("119") .setType("2018") .bulid(); mqttClientComponent.push("yes",pushPayload); }
mqtt客戶端效果顯示
好了,至此mqtt的整合已完結
我是張星,歡迎加入博主技術交流群,群號:313145288