activemq結合mqtt傳送p2p訊息
阿新 • • 發佈:2018-11-05
實現思路:所有使用者訂閱一個主題,當伺服器端發起推送時使用jms協議傳送訊息到主題,並設定附帶屬性為目標使用者的clientId,對該主題進行自定義分發策略
1.下載mqtt原始碼
自行下載,本案例以5.5.10為例
2.自定義分發策略
新增一個分發策略帶指定的原始碼包路徑:org.apache.activemq.broker.region.policy
注:一定要放在此包下面
以下為完整的class內容:
package org.apache.activemq.broker.region.policy;
import java.util.List;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
/** * ClientIdFilterDispatchPolicy dispatches messages in a topic to a given * client. Then the message with a "PTP_CLIENTID" property, can be received by a * mqtt client with the same clientId. * @org.apache.xbean.XBean
*/
public class ClientIdFilterDispatchPolicy extends SimpleDispatchPolicy {
private static final Log LOG = LogFactory.getLog(ClientIdFilterDispatchPolicy.class);
public static final String PTP_CLIENTID = "PTP_CLIENTID"; //可自定義訊息目標id在訊息屬性中的key private String ptpClientId = PTP_CLIENTID;
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception { if (LOG.isInfoEnabled()) { LOG.info("===============Enter ClientIdFilterDispatchPolicy........"); } // 獲取訊息中的目標 客戶端id Object clientId = node.getMessage().getProperty(ptpClientId); // 如果沒有,直接廣播 if (clientId == null) { return super.dispatch(node, msgContext, consumers); } if (LOG.isInfoEnabled()) { LOG.info("===============Client id : " + clientId); }
// 獲取當前訊息型別,此處主要是限制為主題模式 ActiveMQDestination destination = node.getMessage().getDestination(); int count = 0; // 遍歷所有訂閱者 for (Subscription sub : consumers) { if (LOG.isInfoEnabled()) { LOG.info("===============consumers id: " + sub.getContext().getClientId()); } // 不交於瀏覽器 if (sub.getConsumerInfo().isBrowser()) { continue; } // 只發送給感興趣的訂閱 if (!sub.matches(node, msgContext)) { sub.unmatched(node); continue; }
if (LOG.isInfoEnabled()) { LOG.info("==============destination clientId : " + clientId); } // 訊息中帶有的目標id不為空,也為主題模式,並且當前的消費者的id和訊息中的目標id相同,則投遞訊息 if (clientId != null && destination.isTopic() && clientId.equals(sub.getContext().getClientId())) { if (LOG.isInfoEnabled()) { LOG.info("==============Send p2p message to : " + clientId); LOG.info("==============Top ic : " + destination.isTopic()); } sub.add(node); count++; } else { // 過濾訊息,不進行投遞 LOG.info("==============Un consumers subscription!"); sub.unmatched(node); } }
return count > 0; }
public String getPtpClientId() { return ptpClientId; }
public void setPtpClientId(String ptpClientId) { this.ptpClientId = ptpClientId; }
} 紅色tips:
在類開頭的註釋中需要加 :@org.apache.xbean.XBean,否則編譯的時候無法載入到activemq-spring/actimvemq.xsd檔案內
3.maven編譯打包以及替換
maven專案頂層目錄執行:mvn -Dtest=false clean install
替換
activemq-spring-x.x.x.jar和activemq-broker-x.x.x-.jar兩個檔案
tips:
1)檔案目錄:
assembly/target 解壓apache-activemq-version-bin.zip檔案,進入解壓後的目錄/lib 資料夾中
2)程式碼編寫自定義的policy後,重新編譯,acitvemq會自動將自定義好的policy以首字母小寫的方式自動生產到activemq.xsd檔案中,如果該檔案沒有你自定義的policy定義,則再activemq.xml檔案中引用的時候會報錯。
activemq.xsd檔案是編譯打包後生成的,檔案路徑:activemq-spring-version.jar中,解壓後會看到
然後可以再裡面找到自定義的policy的定義。
4.activemq.xml檔案中配置自己寫的分發策略: <policyEntry topic="PTP.>"> <dispatchPolicy> <clientIdFilterDispatchPolicy /> </dispatchPolicy> </policyEntry> 如下圖所示,表示PTP開頭的所有主題都會進行該自定義分發策略
tips:如果自定義的policy沒有在指定的包目錄,或者類名包含了數字,或者其他原因導致沒有生成到 activemq.xsd 檔案中,那麼此步驟配置改項啟動的時候會出現錯誤 5.訊息傳送和接收案例: 1)服務端使用jms傳送訊息:
import java.util.Scanner;
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils;
/** * [簡要描述]:<br/> * [詳細描述]:<br/> * * @author linlinxiao * @version 1.0, 2017年7月21日 * @since V100R001C10 */ public class TopicProducerTest { public static void main(String[] args) throws JMSException { // 建立連線工廠 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); String PTP_CLIENTID = "PTP_CLIENTID";
// 鑑權,如沒有開啟可省略 factory.setUserName("admin"); factory.setPassword("admin123"); // 建立JMS連線例項,並啟動連線 Connection connection = factory.createConnection();
connection.start();
// 建立Session物件,不開啟事務 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題 Topic topic = session.createTopic("PTP.test");
// 建立生成者 MessageProducer producer = session.createProducer(topic);
// 設定訊息不需持久化。預設訊息需要持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Scanner sc = new Scanner(System.in); boolean isStart = true; String userMsg = ""; String msg = ""; TextMessage message = null; String[] messages = null; String clientId = null; while (isStart) { userMsg = sc.nextLine(); if (StringUtils.isBlank(userMsg) || "stop".equals(userMsg)) { System.out.println("Stop producer message!"); isStart = false; } messages = userMsg.split(":"); msg = "Hello MQ,Client msg:" + messages[0]; message = session.createTextMessage(msg);
if (messages.length == 2) { clientId = messages[1]; }
// 傳送指定訊息,配合主題分發策略使用,以附帶使用者ID ,分發策略對特定的主題進行攔截解析分發 if (StringUtils.isNotBlank(clientId)) { message.setStringProperty(PTP_CLIENTID, clientId); }
// 傳送訊息。non-persistent 預設非同步傳送;persistent 默認同步傳送 producer.send(message); } sc.close(); // 關閉連線 producer.close(); session.close(); connection.close();
} } 2)mqtt端接收使用
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/** * [簡要描述]:<br/> * [詳細描述]:<br/> * * @author xiaolinlin * @version 1.0, 2017年6月8日 * @since V100R001C00 */ public class MqttTestClient { public static final String HOST = "tcp://localhost:1883";
public static final String TOPIC = "test";
private String clientId;
private MqttClient client;
private MqttConnectOptions options;
private String userName = "admin";
private String passWord = "admin123";
public MqttTestClient(String clientId) { this.clientId = clientId; }
private void start() throws MqttException { try { // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 client = new MqttClient(HOST, clientId, new MemoryPersistence()); // MQTT的連線設定 options = new MqttConnectOptions(); // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 options.setCleanSession(true); // 設定連線的使用者名稱 options.setUserName(userName); // 設定連線的密碼 options.setPassword(passWord.toCharArray()); // 設定超時時間 單位為秒 options.setConnectionTimeout(10); // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // // 設定回撥 // MqttTopic topic = client.getTopic(TOPIC+"/test/"); // // setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息 // options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); client.setCallback(new ClientCallback(client, options)); // 訂閱訊息 int[] Qos = {1}; String[] topic1 = {"PTP/test"}; client.subscribe(topic1, Qos);
} catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) throws MqttException { String clientId = "admin1"; MqttTestClient client = new MqttTestClient(clientId); client.start(); } }
callback類:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttSecurityException;
/** * [簡要描述]:<br/> * [詳細描述]:<br/> * * @author xiaolinlin * @version 1.0, 2017年6月8日 * @since V100R001C00 */ public class ClientCallback implements MqttCallback { private MqttClient client; private MqttConnectOptions options;
public ClientCallback(MqttClient client,MqttConnectOptions options) { this.client = client; this.options = options; }
@Override public void connectionLost(Throwable cause) { // 連線丟失後,一般在這裡面進行重連 System.out.println("連線斷開,可以做重連"); try { client.connect(options); } catch (MqttSecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } // while(!sampleClient.isConnected()){ // try { // Thread.sleep(1000); // sampleClient.connect(connOpts); // //客戶端每次上線都必須上傳自己所有涉及的訂閱關係,否則可能會導致訊息接收延遲 // sampleClient.subscribe(topicFilters,qos); // } catch (Exception e) { // e.printStackTrace(); // } // } }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe後得到的訊息會執行到這裡面 System.out.println("接收訊息主題 : " + topic); System.out.println("接收訊息Qos : " + message.getQos()); String msg = new String(message.getPayload()); System.out.println("接收訊息到服務端內容 : " + msg); if(msg.contains("close")) { client.close(); } }
@Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); }
} 紅色tips: 在jms傳送訊息端主題格式為 P2P.smile,在mqtt客戶端訂閱使用的主題格式為P2P/smile。也就是說mqtt中和jms中使用的分隔符是不一樣的。這個坑很難察覺,容易誤導配置的策略不生效。
6.demo演示部分效果圖: 1)以控制檯模式啟動的activemq 觀看日誌結果:
會看到日誌記錄已經進入了我們自定義的策略 其他自行驗證
7.參考博文:https://blog.csdn.net/kimmking/article/details/17449019
import java.util.List;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
/** * ClientIdFilterDispatchPolicy dispatches messages in a topic to a given * client. Then the message with a "PTP_CLIENTID" property, can be received by a * mqtt client with the same clientId. * @org.apache.xbean.XBean
private static final Log LOG = LogFactory.getLog(ClientIdFilterDispatchPolicy.class);
public static final String PTP_CLIENTID = "PTP_CLIENTID"; //可自定義訊息目標id在訊息屬性中的key private String ptpClientId = PTP_CLIENTID;
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers) throws Exception { if (LOG.isInfoEnabled()) { LOG.info("===============Enter ClientIdFilterDispatchPolicy........"); } // 獲取訊息中的目標 客戶端id Object clientId = node.getMessage().getProperty(ptpClientId); // 如果沒有,直接廣播 if (clientId == null) { return super.dispatch(node, msgContext, consumers); } if (LOG.isInfoEnabled()) { LOG.info("===============Client id : " + clientId); }
// 獲取當前訊息型別,此處主要是限制為主題模式 ActiveMQDestination destination = node.getMessage().getDestination(); int count = 0; // 遍歷所有訂閱者 for (Subscription sub : consumers) { if (LOG.isInfoEnabled()) { LOG.info("===============consumers id: " + sub.getContext().getClientId()); } // 不交於瀏覽器 if (sub.getConsumerInfo().isBrowser()) { continue; } // 只發送給感興趣的訂閱 if (!sub.matches(node, msgContext)) { sub.unmatched(node); continue; }
if (LOG.isInfoEnabled()) { LOG.info("==============destination clientId : " + clientId); } // 訊息中帶有的目標id不為空,也為主題模式,並且當前的消費者的id和訊息中的目標id相同,則投遞訊息 if (clientId != null && destination.isTopic() && clientId.equals(sub.getContext().getClientId())) { if (LOG.isInfoEnabled()) { LOG.info("==============Send p2p message to : " + clientId); LOG.info("==============Top ic : " + destination.isTopic()); } sub.add(node); count++; } else { // 過濾訊息,不進行投遞 LOG.info("==============Un consumers subscription!"); sub.unmatched(node); } }
return count > 0; }
public String getPtpClientId() { return ptpClientId; }
public void setPtpClientId(String ptpClientId) { this.ptpClientId = ptpClientId; }
} 紅色tips:
然後可以再裡面找到自定義的policy的定義。
4.activemq.xml檔案中配置自己寫的分發策略: <policyEntry topic="PTP.>"> <dispatchPolicy> <clientIdFilterDispatchPolicy /> </dispatchPolicy> </policyEntry> 如下圖所示,表示PTP開頭的所有主題都會進行該自定義分發策略
tips:如果自定義的policy沒有在指定的包目錄,或者類名包含了數字,或者其他原因導致沒有生成到 activemq.xsd 檔案中,那麼此步驟配置改項啟動的時候會出現錯誤 5.訊息傳送和接收案例: 1)服務端使用jms傳送訊息:
import java.util.Scanner;
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils;
/** * [簡要描述]:<br/> * [詳細描述]:<br/> * * @author linlinxiao * @version 1.0, 2017年7月21日 * @since V100R001C10 */ public class TopicProducerTest { public static void main(String[] args) throws JMSException { // 建立連線工廠 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); String PTP_CLIENTID = "PTP_CLIENTID";
// 鑑權,如沒有開啟可省略 factory.setUserName("admin"); factory.setPassword("admin123"); // 建立JMS連線例項,並啟動連線 Connection connection = factory.createConnection();
connection.start();
// 建立Session物件,不開啟事務 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立主題 Topic topic = session.createTopic("PTP.test");
// 建立生成者 MessageProducer producer = session.createProducer(topic);
// 設定訊息不需持久化。預設訊息需要持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Scanner sc = new Scanner(System.in); boolean isStart = true; String userMsg = ""; String msg = ""; TextMessage message = null; String[] messages = null; String clientId = null; while (isStart) { userMsg = sc.nextLine(); if (StringUtils.isBlank(userMsg) || "stop".equals(userMsg)) { System.out.println("Stop producer message!"); isStart = false; } messages = userMsg.split(":"); msg = "Hello MQ,Client msg:" + messages[0]; message = session.createTextMessage(msg);
if (messages.length == 2) { clientId = messages[1]; }
// 傳送指定訊息,配合主題分發策略使用,以附帶使用者ID ,分發策略對特定的主題進行攔截解析分發 if (StringUtils.isNotBlank(clientId)) { message.setStringProperty(PTP_CLIENTID, clientId); }
// 傳送訊息。non-persistent 預設非同步傳送;persistent 默認同步傳送 producer.send(message); } sc.close(); // 關閉連線 producer.close(); session.close(); connection.close();
} } 2)mqtt端接收使用
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/** * [簡要描述]:<br/> * [詳細描述]:<br/> * * @author xiaolinlin * @version 1.0, 2017年6月8日 * @since V100R001C00 */ public class MqttTestClient { public static final String HOST = "tcp://localhost:1883";
public static final String TOPIC = "test";
private String clientId;
private MqttClient client;
private MqttConnectOptions options;
private String userName = "admin";
private String passWord = "admin123";
public MqttTestClient(String clientId) { this.clientId = clientId; }
private void start() throws MqttException { try { // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 client = new MqttClient(HOST, clientId, new MemoryPersistence()); // MQTT的連線設定 options = new MqttConnectOptions(); // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 options.setCleanSession(true); // 設定連線的使用者名稱 options.setUserName(userName); // 設定連線的密碼 options.setPassword(passWord.toCharArray()); // 設定超時時間 單位為秒 options.setConnectionTimeout(10); // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // // 設定回撥 // MqttTopic topic = client.getTopic(TOPIC+"/test/"); // // setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息 // options.setWill(topic, "close".getBytes(), 2, true); client.connect(options); client.setCallback(new ClientCallback(client, options)); // 訂閱訊息 int[] Qos = {1}; String[] topic1 = {"PTP/test"}; client.subscribe(topic1, Qos);
} catch (Exception e) { e.printStackTrace(); } }
public static void main(String[] args) throws MqttException { String clientId = "admin1"; MqttTestClient client = new MqttTestClient(clientId); client.start(); } }
callback類:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttSecurityException;
/** * [簡要描述]:<br/> * [詳細描述]:<br/> * * @author xiaolinlin * @version 1.0, 2017年6月8日 * @since V100R001C00 */ public class ClientCallback implements MqttCallback { private MqttClient client; private MqttConnectOptions options;
public ClientCallback(MqttClient client,MqttConnectOptions options) { this.client = client; this.options = options; }
@Override public void connectionLost(Throwable cause) { // 連線丟失後,一般在這裡面進行重連 System.out.println("連線斷開,可以做重連"); try { client.connect(options); } catch (MqttSecurityException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MqttException e) { // TODO Auto-generated catch block e.printStackTrace(); } // while(!sampleClient.isConnected()){ // try { // Thread.sleep(1000); // sampleClient.connect(connOpts); // //客戶端每次上線都必須上傳自己所有涉及的訂閱關係,否則可能會導致訊息接收延遲 // sampleClient.subscribe(topicFilters,qos); // } catch (Exception e) { // e.printStackTrace(); // } // } }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe後得到的訊息會執行到這裡面 System.out.println("接收訊息主題 : " + topic); System.out.println("接收訊息Qos : " + message.getQos()); String msg = new String(message.getPayload()); System.out.println("接收訊息到服務端內容 : " + msg); if(msg.contains("close")) { client.close(); } }
@Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); }
} 紅色tips: 在jms傳送訊息端主題格式為 P2P.smile,在mqtt客戶端訂閱使用的主題格式為P2P/smile。也就是說mqtt中和jms中使用的分隔符是不一樣的。這個坑很難察覺,容易誤導配置的策略不生效。
6.demo演示部分效果圖: 1)以控制檯模式啟動的activemq 觀看日誌結果:
會看到日誌記錄已經進入了我們自定義的策略 其他自行驗證
7.參考博文:https://blog.csdn.net/kimmking/article/details/17449019