訊息中介軟體系列-JMS例項(ActiveMQ)
阿新 • • 發佈:2019-01-09
一、ActiveMQ安裝、配置、啟動、視覺化介面
1、安裝
下載地址:http://activemq.apache.org/download.html
2、配置(conf目錄下)
1)使用者名稱密碼設定
2)開啟jmx監控
activemq.xml中進行如下修改
注:這裡的配置不是必須,根據需要自行配置
3、啟動
直接執行bin目錄下:activemq.bat
4、視覺化介面
瀏覽器中:http://localhost:8161/admin/index.jsp
使用者名稱,密碼在:jetty-realm.properties中設定
程式碼演示:
A、點對點的訊息模型,只需要一個訊息生成者和訊息消費者,下面我們編寫程式碼 。
訊息生產者:
package com.gcc.activemq.product; import java.awt.font.TextMeasurer; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.camel.component.ActiveMQComponent; /** * 訊息生產者(傳送者) * @author gcc * * 2018年1月20日 */ public class JMSProducer { //預設連線使用者名稱 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //預設連線密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //預設連線地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //傳送的訊息數量 private static final int SENDNUM = 100; public static void main(String[] args) { //連線工廠 ConnectionFactory connectionFactory; //連線 Connection connection =null; //會話 接收或傳送訊息的執行緒 Session session; //訊息的目的地 Destination destination; //訊息的生產者 MessageProducer messageProducer; //例項化連線工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { //建立連線 connection = connectionFactory.createConnection(); //開啟會話 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立訊息佇列 destination = session.createQueue("xiaochao"); //建立訊息生產者 messageProducer = session.createProducer(destination); //傳送訊息 sendMessage(session, messageProducer); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 傳送訊息 * @param session * @param messageProducer 訊息生產者 * @throws JMSException */ private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException { for(int i=0;i<JMSProducer.SENDNUM;i++) { TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" +i); System.out.println("傳送訊息:Activemq 傳送訊息" + i); messageProducer.send(message); } } }
訊息消費者:
package com.gcc.activemq.consume; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連線使用者名稱 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連線密碼 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連線地址 public static void main(String[] args) { ConnectionFactory connectionFactory;//連線工廠 Connection connection = null;//連線 Session session;//會話 接受或者傳送訊息的執行緒 Destination destination;//訊息的目的地 MessageConsumer messageConsumer;//訊息的消費者 //例項化連線工廠 connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL); try { //建立連線 connection = connectionFactory.createConnection(); //開啟會話 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立訊息佇列 destination = session.createQueue("xiaochao"); //建立訊息生產者 messageConsumer = session.createConsumer(destination); while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的訊息:" + textMessage.getText()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
執行
- 首先,啟動ActiveMQ,如何啟動ActiveMQ如何啟動,請看第二篇博文。在瀏覽器中輸入:http://localhost:8161/admin/,然後開始執行:
-
執行傳送者,eclipse控制檯輸出,如下圖:
此時,我們先看一下ActiveMQ伺服器,Queues內容如下:
我們可以看到建立了一個名稱為HelloWorld的訊息佇列,佇列中有10條訊息未被消費,我們也可以通過Browse檢視是哪些訊息,如下圖:
如果這些佇列中的訊息,被刪除,消費者則無法消費。 -
我們繼續執行一下消費者,eclipse控制檯列印訊息,如下:
此時,我們先看一下ActiveMQ伺服器,Queues內容如下:
我們可以看到HelloWorld的訊息佇列發生變化,多一個訊息者,佇列中的10條訊息被消費了,點選Browse檢視,已經為空了。
點選Active Consumers,我們可以看到這個消費者的詳細資訊:
B、主題釋出訂閱式(Topic)package com.gcc.activemq.consume; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連線使用者名稱 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連線密碼 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連線地址 public static void main(String[] args) { ConnectionFactory connectionFactory;//連線工廠 Connection connection = null;//連線 Session session;//會話 接受或者傳送訊息的執行緒 Destination destination;//訊息的目的地 MessageConsumer messageConsumer;//訊息的消費者 //例項化連線工廠 connectionFactory = new ActiveMQConnectionFactory(TopicConsumer.USERNAME, TopicConsumer.PASSWORD, TopicConsumer.BROKEURL); try { //建立連線 connection = connectionFactory.createConnection(); //開啟會話 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立訊息佇列 destination = session.createQueue("xiaochao"); //建立訊息消費者 messageConsumer = session.createConsumer(destination); //採用監聽的方式 ,註冊監聽,自動的去監聽裡觸發訊息 messageConsumer.setMessageListener(new Listener()); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } } /** * 訊息監聽 * @author gcc * * 2018年1月20日 */ class Listener implements MessageListener{ public void onMessage(Message message) { // TODO Auto-generated method stub try { System.out.println("收到的訊息:"+((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
作者:scgyus