1. 程式人生 > >ActiveMQ入門

ActiveMQ入門

1、環境: Windows XP apache-activemq-5.2.0-bin.zip 2、安裝 解壓縮到apache-activemq-5.2.0-bin.zip到一個目錄,比如C:\apache-activemq-5.2.0 3、配置 配置就在C:\apache-activemq-5.2.0\conf目錄下三個檔案 activemq.xml credentials.properties log4j.properties 4、啟動ActiveMQ 執行C:\apache-activemq-5.2.0\bin\activemq.bat
5、測試 ActiveMQ預設使用的TCP連線埠是61616, 通過檢視該埠的資訊可以測試ActiveMQ是否成功啟動 netstat -an|find "61616" C:\Documents and Settings\Administrator>netstat -an|find "61616"
    TCP        0.0.0.0:61616                    0.0.0.0:0                            LISTENING
下面是ActiveMQ5.2的一個最簡單例子! 環境還是apache-activemq-5.2.0-bin.zip,需要注意的是,開發時候,要將apache-activemq- 5.2.0-bin.zip解壓縮后里面的activemq-all-5.2.0.jar包加入到classpath下面,這個包包含了所有jms介面 api的實現。
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* 訊息的生產者(傳送者)
*
*/
public class JmsSender {
        public static void main(String[] args) throws JMSException {
                // ConnectionFactory :連線工廠,JMS 用它建立連線
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                                ActiveMQConnection.DEFAULT_USER,
                                ActiveMQConnection.DEFAULT_PASSWORD,
                                "tcp://192.168.14.117:61616");
                //JMS 客戶端到JMS Provider 的連線
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Session: 一個傳送或接收訊息的執行緒
                Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // Destination :訊息的目的地;訊息傳送給誰.
                // 獲取session注意引數值my-queue是Query的名字
                Destination destination = session.createQueue("my-queue");
                // MessageProducer:訊息生產者
                MessageProducer producer = session.createProducer(destination);
                //設定不持久化
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                //傳送一條訊息
                sendMsg(session, producer);
                session.commit();
                connection.close();
        }

        /**
         * 在指定的會話上,通過指定的訊息生產者發出一條訊息
         *
         * @param session    訊息會話
         * @param producer 訊息生產者
         */
        public static void sendMsg(Session session, MessageProducer producer) throws JMSException {
                //建立一條文字訊息
                TextMessage message = session.createTextMessage("Hello ActiveMQ!");
                //通過訊息生產者發出訊息
                producer.send(message);
                System.out.println("");
        }
}
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* 訊息的消費者(接受者)
*
*/
public class JmsReceiver {
        public static void main(String[] args) throws JMSException {
                // ConnectionFactory :連線工廠,JMS 用它建立連線
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                                ActiveMQConnection.DEFAULT_USER,
                                ActiveMQConnection.DEFAULT_PASSWORD,
                                "tcp://192.168.14.117:61616");
                //JMS 客戶端到JMS Provider 的連線
                Connection connection = connectionFactory.createConnection();
                connection.start();
                // Session: 一個傳送或接收訊息的執行緒
                Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                // Destination :訊息的目的地;訊息傳送給誰.
                // 獲取session注意引數值xingbo.xu-queue是一個伺服器的queue,須在在ActiveMq的console配置
                Destination destination = session.createQueue("my-queue");
                // 消費者,訊息接收者
                MessageConsumer consumer = session.createConsumer(destination);
                while (true) {
                        TextMessage message = (TextMessage) consumer.receive(1000);
                        if (null != message)
                                System.out.println("收到訊息:" + message.getText());
                        else
                                break;
                }
                session.close();
                connection.close();
        }
}
  啟動ActiveMQ,然後開始執行: 先執行傳送者,連續運行了三次,最後一次控制檯輸出:

Process finished with exit code 0
後執行接受者,輸出結果: 收到訊息Hello ActiveMQ!
收到訊息Hello ActiveMQ!
收到訊息Hello ActiveMQ!

Process finished with exit code 0
注意: 其中的埠61616是ActiveMQ預設的配置,在activemq.xml中,
   <!-- The transport connectors ActiveMQ will listen to -->
                <transportConnectors>
                        <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
                        <transportConnector name="ssl" uri="ssl://localhost:61617"/>
                        <transportConnector name="stomp" uri="stomp://localhost:61613"/>
                        <transportConnector name="xmpp" uri="xmpp://localhost:61222"/>
                </transportConnectors> 
,建議不要改動,都用這個埠多好,就像ftp都用21埠,也沒錯。 這是官方的HelloWorld例子,不過看著不順眼: