1. 程式人生 > >ActiveMQ入門程式

ActiveMQ入門程式

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    
     //預設連線使用者名稱
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //預設連線密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //預設連線地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
    //傳送的訊息數量
    private static final int SENDNUM = 10;

    public static void main(String[] args) throws Exception {
        /*ActiveMQConnectionFactory activeMQConnectionFactory = 
                new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER, 
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");*/
        /**
         * activemq.xml  配置密碼
         */
        ActiveMQConnectionFactory activeMQConnectionFactory = 
                new ActiveMQConnectionFactory(
                "bhz", 
                "bhz", 
                "tcp://localhost:61616");
        //連線
        Connection connection = null;
        
        try {
             connection = activeMQConnectionFactory.createConnection();
             connection.start();
            
            //建立session
        //    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
             //開啟事物
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            //訊息的目的地
            Destination destination = session.createQueue("queue1");
            //建立訊息生產者
            MessageProducer messageProducer = session.createProducer(destination);
            //Persistent 用來指定JMS Provider對訊息進行持久化操作,以免Provider fail的時候,丟失Message
            //NON_Persistent 方式下的JMS Provider不會對消進憲持久化
            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //傳送訊息
            sendMessage(session, messageProducer);
            
            //使用事物    Boolean.TRUE    
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    
    
     /**
     * 傳送訊息
     * @param session
     * @param messageProducer  訊息生產者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < Sender.SENDNUM; i++) {
            //建立一條文字訊息 
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("我的訊息內容,id為"+i);
            messageProducer.send(textMessage);
            System.out.println("生產者: "+textMessage.getText());
         //   TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" +i);
         //   System.out.println("生產者傳送訊息:Activemq 傳送訊息" + i);
            //通過訊息生產者發出訊息 
         //   messageProducer.send(message);
        }

    }

}
 

 

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連線使用者名稱
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連線密碼
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連線地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//連線工廠
        Connection connection = null;//連線

        Session session;//會話 接受或者傳送訊息的執行緒
        Destination destination;//訊息的目的地

        MessageConsumer messageConsumer;//訊息的消費者

        //例項化連線工廠
//        connectionFactory = new ActiveMQConnectionFactory(Receiver.USERNAME, Receiver.PASSWORD, Receiver.BROKEURL);
     /**
      * activemq.xml 配置密碼之後
      */
        connectionFactory = new ActiveMQConnectionFactory("bhz", "bhz", Receiver.BROKEURL);

        try {
            //通過連線工廠獲取連線
            connection = connectionFactory.createConnection();
            //啟動連線
            connection.start();
            //建立session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一個連線HelloWorld的訊息佇列
            destination = session.createQueue("queue1");
            //建立訊息消費者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                //receive  阻塞的等待生產者生產才接受              receive(100000)等待多久;
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if(textMessage != null){
                    System.out.println("收到的訊息:" + textMessage.getText());
                }else {
                    break;
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

 

Number Of Consumers  消費者 這個是消費者端的消費者數量

Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數
Messages Enqueued 進入佇列的訊息  進入佇列的總數量,包括出佇列的。 這個數量只增不減
Messages Dequeued 出了佇列的訊息  可以理解為是消費這消費掉的數量
這個要分兩種情況理解
在queues裡它和進入佇列的總數量相等(因為一個訊息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。
在 topics裡 它因為多消費者從而導致數量會比入佇列數高。
簡單的理解上面的意思就是
當有一個訊息進入這個佇列時,等待消費的訊息是1,進入佇列的訊息是1。
當訊息消費後,等待消費的訊息是0,進入佇列的訊息是1,出佇列的訊息是1.
在來一條訊息時,等待消費的訊息是1,進入佇列的訊息就是2.


沒有消費者時  Pending Messages   和 入佇列數量一樣
有消費者消費的時候 Pedding會減少 出佇列會增加
到最後 就是 入佇列和出佇列的數量一樣多
以此類推,進入佇列的訊息和出佇列的訊息是池子,等待消費的訊息是水流。 

 

 

 

//Producer   開啟事物 並且使用Client的方式

 Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);

//Consumer  開啟事物 並且使用Client的方式

message Enqueued 進入佇列的資訊增加10條  但是Message Dequeued 出佇列的數量沒有增加