ActiveMQ入門(一)
什麼是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
主要特點:
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)
3. 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
4. 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
5. 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支援通過JDBC和journal提供高速的訊息持久化
7. 從設計上保證了高效能的叢集,客戶端-伺服器,點對點
8. 支援Ajax
9. 支援與Axis的整合
10. 可以很容易得呼叫內嵌JMS provider,進行測試
JMS介紹
JMS的全稱是Java Message Service,即Java訊息服務。用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。
它主要用於在生產者和消費者之間進行訊息傳遞,生產者負責產生訊息,而消費者負責接收訊息。把它應用到實際的業務需求中的話我們可以在特定的時候利用生產者生成一訊息,並進行傳送,對應的消費者在接收到對應的訊息後去完成對應的業務邏輯。
對於訊息的傳遞有兩種型別:
一種是點對點的,即一個生產者和一個消費者一一對應;
另一種是釋出/訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。
JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。
· StreamMessage -- Java原始值的資料流
· MapMessage--一套名稱-值對
· TextMessage--一個字串物件
· ObjectMessage--一個序列化的 Java物件
· BytesMessage--一個位元組的資料流
ActiveMQ的安裝
下載
進入http://activemq.apache.org/下載ActiveMQ
安裝
安裝步驟:
第一步:安裝jdk,需要jdk1.7以上版本
第二步:解壓縮activeMQ的壓縮包。
第三步:進入bin目錄。
啟動:[[email protected] bin]# ./activemq start
停止:[[email protected] bin]# ./activemq stop
windows進入控制檯切換到解壓目錄下執行activemq start
第四步:訪問後臺管理。
使用者名稱:admin
密碼:admin
ActiveMQ的使用方法
JMS訊息傳送模式
在點對點或佇列模型下,一個生產者向一個特定的佇列釋出訊息,一個消費者從該佇列中讀取訊息。這裡,生產者知道消費者的佇列,並直接將訊息傳送到消費者的佇列。這種模式被概括為:只有一個消費者將獲得訊息。生產者不需要在接收者消費該訊息期間處於執行狀態,接收者也同樣不需要在訊息傳送時處於執行狀態。每一個成功處理的訊息都由接收者簽收。
釋出者/訂閱者模型支援向一個特定的訊息主題釋出訊息。0或多個訂閱者可能對接收來自特定訊息主題的訊息感興趣。在這種模型下,釋出者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得訊息.在釋出者和訂閱者之間存在時間依賴性。釋出者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收訊息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連線時釋出的訊息將在訂閱者重新連線時重新發布。
JMS應用程式介面
ConnectionFactory 介面(連線工廠)
使用者用來建立到JMS提供者的連線的被管物件。JMS客戶通過可移植的介面訪問連線,這樣當下層的實現改變時,程式碼不需要進行修改。 管理員在JNDI名字空間中配置連線工廠,這樣,JMS客戶才能夠查詢到它們。根據訊息型別的不同,使用者將使用佇列連線工廠,或者主題連線工廠。
Connection 介面(連線)
連線代表了應用程式和訊息伺服器之間的通訊鏈路。在獲得了連線工廠後,就可以建立一個與JMS提供者的連線。根據不同的連線型別,連線允許使用者建立會話,以傳送和接收佇列和主題到目標。
Destination 介面(目標)
目標是一個包裝了訊息目標識別符號的被管物件,訊息目標是指訊息釋出和接收的地點,或者是佇列,或者是主題。JMS管理員建立這些物件,然後使用者通過JNDI發現它們。和連線工廠一樣,管理員可以建立兩種型別的目標,點對點模型的佇列,以及釋出者/訂閱者模型的主題。
MessageConsumer 介面(訊息消費者)
由會話建立的物件,用於接收發送到目標的訊息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收佇列和主題型別的訊息。
MessageProducer 介面(訊息生產者)
由會話建立的物件,用於傳送訊息到目標。使用者可以建立某個目標的傳送者,也可以建立一個通用的傳送者,在傳送訊息時指定目標。
Message 介面(訊息)
是在消費者和生產者之間傳送的物件,也就是說從一個應用程式創送到另一個應用程式。一個訊息有三個主要部分:
訊息頭(必須):包含用於識別和為訊息尋找路由的操作設定。
一組訊息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定製的欄位和過濾器(訊息選擇器)。
一個訊息體(可選):允許使用者建立五種型別的訊息(文字訊息,對映訊息,位元組訊息,流訊息和物件訊息)。
訊息介面非常靈活,並提供了許多方式來定製訊息的內容。
Session 介面(會話)
表示一個單執行緒的上下文,用於傳送和接收訊息。由於會話是單執行緒的,所以訊息是連續的,就是說訊息是按照發送的順序一個一個接收的。會話的好處是它支援事務。如果使用者選擇了事務支援,會話上下文將儲存一組訊息,直到事務被提交才傳送這些訊息。在提交事務之前,使用者可以使用回滾操作取消這些訊息。一個會話允許使用者建立訊息生產者來發送訊息,建立訊息消費者來接收訊息。
入門程式碼
生產者消費者模型
package com.hj.ActivityMQ;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.omg.CORBA.ACTIVITY_COMPLETED;
/**
*
* @author 生產者
*
*/
public class Producer {
// 預設連線使用者名稱
private static final String USERNAME = "admin";
// 預設連線密碼
private static final String PASSWORD = "admin";
// 預設連線地址
private static final String BROKER_URL = "tcp://localhost:61616";
public static void main(String[] args) {
// 連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
// 連線
Connection connection = connectionFactory.createConnection();
// 啟動連線
connection.start();
// 建立session
/*
* connection.createSession(paramA, paramB) A)paramA設定為true時: paramB的值忽略,
* acknowledgment mode被jms伺服器設定 SESSION_TRANSACTED 。 當一個事務被提交的時候,訊息確認就會自動發生。 B)
* paramA設定為false時: Session.AUTO_ACKNOWLEDGE為自動確認,當客戶成功的從receive方法返回的時候,或者從
* MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的訊息。 Session.CLIENT_ACKNOWLEDGE
* 為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的 acknowledge方法。jms伺服器才會刪除訊息。(預設是批量確認)
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 訊息目的地
Destination destination = session.createQueue("FirstQueue");
// 訊息生產者
MessageProducer producer = session.createProducer(destination);
// 設定不持久化,此處學習,實際根據專案決定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 傳送訊息
for (int i = 0; i < 5; i++) {
// 建立一條文字訊息
TextMessage message = session.createTextMessage("ActiveMQ: 這是第 " + i + " 條訊息");
System.out.println("send----->ActiveMQ: 這是第 " + i + " 條訊息");
// 生產者傳送訊息
producer.send(message);
}
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package com.hj.ActivityMQ;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
* @author 消費者
*
*/
public class Consumer {
// 預設連線使用者名稱
private static final String USERNAME = "admin";
// 預設連線密碼
private static final String PASSWORD = "admin";
// 預設連線地址
private static final String BROKER_URL = "tcp://localhost:61616";
public static void main(String[] args) {
// 連線工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
// 連線
Connection connection = connectionFactory.createConnection();
// 啟動連線
connection.start();
// 建立session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 訊息目的地
Destination destination = session.createQueue("FirstQueue");
// 訊息消費者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (message != null) {
System.out.println("接收到訊息: " + message.getText());
} else {
break;
}
}
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
結果:
可以看到生產消費者模型是非同步的。
訂閱者模式
package com.hj.ActivityMQPublisher;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;
class Publisher {
// 預設連線使用者名稱
private static final String USERNAME = "admin";
// 預設連線密碼
private static final String PASSWORD = "admin";
// 預設連線地址
private static final String BROKER_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
String destination = arg(args, 0, "event");
int messages = 10000;
int size = 256;
String DATA = "abcdefghijklmnopqrstuvwxyz";
String body = "";
for (int i = 0; i < size; i++) {
body += DATA.charAt(i % DATA.length());
}
// 連線工廠
ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = new ActiveMQTopic(destination);
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 1; i <= messages; i++) {
TextMessage msg = session.createTextMessage(body);
msg.setIntProperty("id", i);
producer.send(msg);
if ((i % 1000) == 0) {
System.out.println(String.format("Sent %d messages", i));
}
}
producer.send(session.createTextMessage("SHUTDOWN"));
connection.close();
}
private static String arg(String[] args, int index, String defaultValue) {
if (index < args.length)
return args[index];
else
return defaultValue;
}
}
package com.hj.ActivityMQPublisher;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.*;
class Listener {
// 預設連線使用者名稱
private static final String USERNAME = "admin";
// 預設連線密碼
private static final String PASSWORD = "admin";
// 預設連線地址
private static final String BROKER_URL = "tcp://localhost:61616";
public static void main(String[] args) {
String destination = arg(args, 0, "event");
// 連線工廠
ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
Connection connection = null;
try {
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = new ActiveMQTopic(destination);
MessageConsumer consumer = session.createConsumer(dest);
long start = System.currentTimeMillis();
long count = 1;
System.out.println("Waiting for messages...");
while (true) {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();
if ("SHUTDOWN".equals(body)) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
//break;
} else {
if (count != msg.getIntProperty("id")) {
System.out.println("mismatch: " + count + "!=" + msg.getIntProperty("id"));
}
count = msg.getIntProperty("id");
if (count == 0) {
start = System.currentTimeMillis();
}
if (count % 1000 == 0) {
System.out.println(String.format("Received %d messages.", count));
}
count++;
}
} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private static String arg(String[] args, int index, String defaultValue) {
if (index < args.length)
return args[index];
else
return defaultValue;
}
}
結果:
訂閱者模式是同步的。
記一個踩過的坑
在配置activitymq是沒有注意,一開始使用了預設使用者名稱和密碼,導致報錯Exception in thread "main" javax.jms.JMSSecurityException: User name [null] or password is invalid.。查了好久是/conf/activitymq.xml配置問題,參考文章activitymq配置,我把配置檔案附上吧!!省得大家改了。複製就行了。
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<!-- START SNIPPET: example -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration
file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!-- Allows accessing the server log -->
<bean id="logQuery"
class="io.fabric8.insight.log.log4j.Log4jLogQuery" lazy-init="false"
scope="singleton" init-method="start" destroy-method="stop">
</bean>
<!-- The <broker> element is used to configure the ActiveMQ broker. -->
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<!-- The constantPendingMessageLimitStrategy is used to prevent slow
topic consumers to block producers and affect other consumers by limiting
the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy
limit="1000" />
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!-- The managementContext is used to configure how ActiveMQ is exposed
in JMX. By default, ActiveMQ uses the MBean server that is started by the
JVM. For more information, see: http://activemq.apache.org/jmx.html -->
<managementContext>
<managementContext createConnector="false" />
</managementContext>
<!-- Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag). For more information,
see: http://activemq.apache.org/persistence.html -->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" />
</persistenceAdapter>
<!-- The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information,
see: http://activemq.apache.org/producer-flow-control.html -->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100mb" />
</storeUsage>
<tempUsage>
<tempUsage limit="50mb" />
</tempUsage>
</systemUsage>
</systemUsage>
<!-- The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html -->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size
to 100MB -->
<transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="amqp"
uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="stomp"
uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="mqtt"
uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="ws"
uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans"
class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="admin"
password="admin" groups="users,admins" />
</users>
</simpleAuthenticationPlugin>
</plugins>
</broker>
<!-- Enable web consoles, REST and Ajax APIs and demos The web consoles
requires by default login, you can disable this in the jetty.xml file Take
a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details -->
<import resource="jetty.xml" />
</beans>
<!-- END SNIPPET: example -->