ActiveMQ_Java實現生產者和消費者操作佇列
阿新 • • 發佈:2020-09-20
一、JMS編碼總體規範
二、建立Maven工程和引入Maven依賴
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.5</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.10</version> </dependency>
三、佇列-生產者生產訊息例項程式碼
public class JmsProducer { private static final String BROKER_URL = "tcp://192.168.229.129:61616"; private static final String QUEUE_NAME = "queue01"; private static final String TEXT_MESSAGE = "message"; public static void main(String[] args) throws JMSException { // 1、建立連線工廠ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); // 2、通過連線工廠獲取連線物件 Connection connection = connectionFactory.createConnection(); // 3、啟動JMS連線 connection.start(); // 4、通過連線物件建立JMS Session物件,第一個引數為是否開啟事務,第二個引數是訊息的簽收方式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、建立Destination (Destination是 Queue/Topic的父介面) Queue queue = session.createQueue(QUEUE_NAME); // 6、建立訊息的生產者,引數是Destination,也就是訊息的生產者會將訊息推送到哪裡. MessageProducer producer = session.createProducer(queue); // 7、建立訊息,建立三個訊息傳送至佇列 for (int i = 1; i < 4; i++) { TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE + "0" + i); producer.send(textMessage); } // 8、釋放資源,先開後關,後開先關 producer.close(); session.close(); connection.close(); System.out.println("訊息成功傳送至佇列"); } }
執行程式碼之後,登入ActiveMQ客戶端,可以看到訊息已經推送至佇列queue01中了,由於我們傳送了3個訊息,所以這裡的待處理訊息就是3,我們還沒有建立消費者,所以消費者數目為0,傳送了3個訊息,並且都進入了佇列,所以入隊的訊息數目為3,沒有消費者消費訊息,那麼出佇列的訊息數目為0.
佇列相關名詞解釋:
名稱 | 簡介 |
Number Of Pending Messages |
待處理的訊息數目:入隊的總數-出隊的總數 |
Number Of Consumers |
消費者數目:消費端的消費者數目 |
Messages Enqueued |
已經入隊的訊息數目,進入佇列的訊息的數目,這個數目只增不減,即使出隊了也不會減少 |
Messages Dequeued |
已經出隊的訊息數目,也就是消費者消費掉的訊息數目 |
四、佇列-消費者訊息例項程式碼
public class JmsConsumer {
private static final String BROKER_URL = "tcp://192.168.229.129:61616";
private static final String QUEUE_NAME = "queue01";
private static final String TEXT_MESSAGE = "message";
public static void main(String[] args) throws JMSException {
// 1、建立連線工廠物件
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// 2、通過連線工廠獲取連線物件
Connection connection = connectionFactory.createConnection();
// 3、啟動JMS連線
connection.start();
// 4、通過connection物件獲取JMS Session物件
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、建立Destination物件(Queue/Topic的父介面是Destination)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、建立消費者物件
MessageConsumer consumer = session.createConsumer(queue);
// 7、消費者消費訊息
while (true) {
// 時間為毫秒值,如果過了10s之後,消費者沒有接收到新的訊息,那麼消費者自動和佇列斷開
TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
/*TextMessage textMessage = (TextMessage) consumer.receive();*/
if (textMessage != null) {
String text = textMessage.getText();
if (text != null) {
System.out.println("接收到的訊息是:" + text);
}
} else {
break;
}
}
// 8、釋放資源
consumer.close();
session.close();
connection.close();
}
}
五、消費者幾個細節
1、注意:consumer.receive()方法有兩個
// 一直接收訊息,消費者和佇列不會斷開連線
TextMessage textMessage = (TextMessage) consumer.receive();
// 超過了時間(毫秒值),消費者將和佇列自動斷開連線
TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
01、先看 consumer.receive(),當沒有新訊息的時候,消費者一直處於等待的狀態,消費者數目為1
02、consumer.receive(10*1000):10秒鐘後,消費者將會和佇列斷開連線,消費者數目為0
2、消費者通過 consumer.setMessageListener(MessageListerer messageListener)的方式消費訊息
// 消費者通過訊息監聽的方式消費訊息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到的訊息是:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 保證在連線到MQ之前,控制檯不滅,也就是消費到了訊息之後才去釋放資源,Dubbo中也有
System.in.read();
3、先啟動生產者/消費者對訊息的影響
生產者 | 消費者 | 結果 |
先啟動生產者生產訊息 | 然後只啟動一號消費者 | 消費者可以正常消費訊息 |
先啟動生產者生產訊息 | 然後啟動一號消費者,接著啟動二號消費者 | 一號消費者消費完所有的訊息,二號消費者無法消費訊息 |
然後啟動生產者生產訊息 | 先啟動一號消費者,接著啟動二號消費者 | 一號消費者,二號消費者都能消費訊息,類似於負載均衡 |
Number Of Pending Messages