1. 程式人生 > >訊息中介軟體系列-JMS例項(ActiveMQ)

訊息中介軟體系列-JMS例項(ActiveMQ)

一、ActiveMQ安裝、配置、啟動、視覺化介面

1、安裝 
下載地址:http://activemq.apache.org/download.html
2、配置(conf目錄下) 
1)使用者名稱密碼設定 
設定使用者名稱密碼
2)開啟jmx監控 
activemq.xml中進行如下修改 
這裡寫圖片描述
注:這裡的配置不是必須,根據需要自行配置 
3、啟動 
直接執行bin目錄下:activemq.bat 
4、視覺化介面 
瀏覽器中:http://localhost:8161/admin/index.jsp
使用者名稱,密碼在:jetty-realm.properties中設定

程式碼演示:

A、點對點的訊息模型,只需要一個訊息生成者和訊息消費者,下面我們編寫程式碼

訊息生產者:

package com.gcc.activemq.product;

import java.awt.font.TextMeasurer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;



/**
 * 訊息生產者(傳送者)
 * @author gcc
 *
 * 2018年1月20日
 */
public class JMSProducer {
	 //預設連線使用者名稱
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //預設連線密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //預設連線地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //傳送的訊息數量
    private static final int SENDNUM = 100;
    
    public static void main(String[] args) {
    	//連線工廠
		ConnectionFactory connectionFactory;
		//連線
		Connection connection =null;
		//會話 接收或傳送訊息的執行緒
		Session session;
		//訊息的目的地
		Destination destination;
		//訊息的生產者
		MessageProducer messageProducer;
		//例項化連線工廠
		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
		
		try {
			//建立連線
			connection = connectionFactory.createConnection();
			//開啟會話
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			//建立訊息佇列
			destination = session.createQueue("xiaochao");
			//建立訊息生產者
			messageProducer = session.createProducer(destination);
			//傳送訊息
			 sendMessage(session, messageProducer);
		} catch (JMSException e) {
			
			e.printStackTrace();
		}finally {
			if(connection!=null) {
				try {
					connection.close();
				} catch (JMSException e) {
					
					e.printStackTrace();
				}
			}
		}
		
	}

    /**
     * 傳送訊息
    
    * @param session 
    * @param messageProducer 訊息生產者
    * @throws JMSException
     */
	private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
		for(int i=0;i<JMSProducer.SENDNUM;i++) {
			TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" +i);
			System.out.println("傳送訊息:Activemq 傳送訊息" + i);
			messageProducer.send(message);
		}
		
	}

}

訊息消費者:
package com.gcc.activemq.consume;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer {

	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連線使用者名稱
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連線密碼
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連線地址
    
    public static void main(String[] args) {
    	 ConnectionFactory connectionFactory;//連線工廠
         Connection connection = null;//連線

         Session session;//會話 接受或者傳送訊息的執行緒
         Destination destination;//訊息的目的地

         MessageConsumer messageConsumer;//訊息的消費者
       //例項化連線工廠
         connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
         try {
 			//建立連線
 			connection = connectionFactory.createConnection();
 			//開啟會話
 			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 			//建立訊息佇列
 			destination = session.createQueue("xiaochao");
 			//建立訊息生產者
 			messageConsumer = session.createConsumer(destination);
 		
 			while (true) {
 	             TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
 	             if(textMessage != null){
 	                 System.out.println("收到的訊息:" + textMessage.getText());
 	             }else {
 	                 break;
 	             }
 	         }
 		} catch (JMSException e) {
 			
 			e.printStackTrace();
 		}finally {
 			if(connection!=null) {
 				try {
 					connection.close();
 				} catch (JMSException e) {
 					
 					e.printStackTrace();
 				}
 			}
 		}
         
         
	}
}

執行

  1. 首先,啟動ActiveMQ,如何啟動ActiveMQ如何啟動,請看第二篇博文。在瀏覽器中輸入:http://localhost:8161/admin/,然後開始執行:
  2. 執行傳送者,eclipse控制檯輸出,如下圖: 
    這裡寫圖片描述 
    此時,我們先看一下ActiveMQ伺服器,Queues內容如下: 
    這裡寫圖片描述
    我們可以看到建立了一個名稱為HelloWorld的訊息佇列,佇列中有10條訊息未被消費,我們也可以通過Browse檢視是哪些訊息,如下圖: 
    這裡寫圖片描述
    如果這些佇列中的訊息,被刪除,消費者則無法消費。

  3. 我們繼續執行一下消費者,eclipse控制檯列印訊息,如下: 
    這裡寫圖片描述 
    此時,我們先看一下ActiveMQ伺服器,Queues內容如下: 
    這裡寫圖片描述
    我們可以看到HelloWorld的訊息佇列發生變化,多一個訊息者,佇列中的10條訊息被消費了,點選Browse檢視,已經為空了。 
    點選Active Consumers,我們可以看到這個消費者的詳細資訊: 
    這裡寫圖片描述B主題釋出訂閱式(Topic)

    package com.gcc.activemq.consume;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TopicConsumer {
    	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連線使用者名稱
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連線密碼
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連線地址
        
        public static void main(String[] args) {
        	 ConnectionFactory connectionFactory;//連線工廠
             Connection connection = null;//連線
    
             Session session;//會話 接受或者傳送訊息的執行緒
             Destination destination;//訊息的目的地
    
             MessageConsumer messageConsumer;//訊息的消費者
           //例項化連線工廠
             connectionFactory = new ActiveMQConnectionFactory(TopicConsumer.USERNAME, TopicConsumer.PASSWORD, TopicConsumer.BROKEURL);
             try {
     			//建立連線
     			connection = connectionFactory.createConnection();
     			//開啟會話
     			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
     			//建立訊息佇列
     			destination = session.createQueue("xiaochao");
     			//建立訊息消費者
     			messageConsumer = session.createConsumer(destination);
     			//採用監聽的方式 ,註冊監聽,自動的去監聽裡觸發訊息
     			messageConsumer.setMessageListener(new Listener());
     			
     		} catch (JMSException e) {
     			
     			e.printStackTrace();
     		}finally {
     			if(connection!=null) {
     				try {
     					connection.close();
     				} catch (JMSException e) {
     					
     					e.printStackTrace();
     				}
     			}
     		}
             
             
    	}
    
    }
    /**
     * 訊息監聽
     * @author gcc
     *
     * 2018年1月20日
     */
    class Listener  implements MessageListener{
    
    	public void onMessage(Message message) {
    		 // TODO Auto-generated method stub  
            try {  
                System.out.println("收到的訊息:"+((TextMessage)message).getText());  
            } catch (JMSException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
    		
    	}
    	
    }
    

作者:scgyus