ActiveMQ入門程式
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
//預設連線使用者名稱
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//預設連線密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//預設連線地址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//傳送的訊息數量
private static final int SENDNUM = 10;
public static void main(String[] args) throws Exception {
/*ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");*/
/**
* activemq.xml 配置密碼
*/
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory(
"bhz",
"bhz",
"tcp://localhost:61616");
//連線
Connection connection = null;
try {
connection = activeMQConnectionFactory.createConnection();
connection.start();
//建立session
// Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//開啟事物
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//訊息的目的地
Destination destination = session.createQueue("queue1");
//建立訊息生產者
MessageProducer messageProducer = session.createProducer(destination);
//Persistent 用來指定JMS Provider對訊息進行持久化操作,以免Provider fail的時候,丟失Message
//NON_Persistent 方式下的JMS Provider不會對消進憲持久化
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//傳送訊息
sendMessage(session, messageProducer);
//使用事物 Boolean.TRUE
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 傳送訊息
* @param session
* @param messageProducer 訊息生產者
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = 0; i < Sender.SENDNUM; i++) {
//建立一條文字訊息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我的訊息內容,id為"+i);
messageProducer.send(textMessage);
System.out.println("生產者: "+textMessage.getText());
// TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" +i);
// System.out.println("生產者傳送訊息:Activemq 傳送訊息" + i);
//通過訊息生產者發出訊息
// messageProducer.send(message);
}
}
}
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連線使用者名稱
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連線密碼
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連線地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//連線工廠
Connection connection = null;//連線
Session session;//會話 接受或者傳送訊息的執行緒
Destination destination;//訊息的目的地
MessageConsumer messageConsumer;//訊息的消費者
//例項化連線工廠
// connectionFactory = new ActiveMQConnectionFactory(Receiver.USERNAME, Receiver.PASSWORD, Receiver.BROKEURL);
/**
* activemq.xml 配置密碼之後
*/
connectionFactory = new ActiveMQConnectionFactory("bhz", "bhz", Receiver.BROKEURL);
try {
//通過連線工廠獲取連線
connection = connectionFactory.createConnection();
//啟動連線
connection.start();
//建立session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個連線HelloWorld的訊息佇列
destination = session.createQueue("queue1");
//建立訊息消費者
messageConsumer = session.createConsumer(destination);
while (true) {
//receive 阻塞的等待生產者生產才接受 receive(100000)等待多久;
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if(textMessage != null){
System.out.println("收到的訊息:" + textMessage.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數
Messages Enqueued 進入佇列的訊息 進入佇列的總數量,包括出佇列的。 這個數量只增不減
Messages Dequeued 出了佇列的訊息 可以理解為是消費這消費掉的數量
這個要分兩種情況理解
在queues裡它和進入佇列的總數量相等(因為一個訊息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。
在 topics裡 它因為多消費者從而導致數量會比入佇列數高。
簡單的理解上面的意思就是
當有一個訊息進入這個佇列時,等待消費的訊息是1,進入佇列的訊息是1。
當訊息消費後,等待消費的訊息是0,進入佇列的訊息是1,出佇列的訊息是1.
在來一條訊息時,等待消費的訊息是1,進入佇列的訊息就是2.
沒有消費者時 Pending Messages 和 入佇列數量一樣
有消費者消費的時候 Pedding會減少 出佇列會增加
到最後 就是 入佇列和出佇列的數量一樣多
以此類推,進入佇列的訊息和出佇列的訊息是池子,等待消費的訊息是水流。
//Producer 開啟事物 並且使用Client的方式
Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
//Consumer 開啟事物 並且使用Client的方式
message Enqueued 進入佇列的資訊增加10條 但是Message Dequeued 出佇列的數量沒有增加