1. 程式人生 > 實用技巧 >ActiveMQ_Java實現生產者和消費者操作佇列

ActiveMQ_Java實現生產者和消費者操作佇列

一、JMS編碼總體規範

二、建立Maven工程和引入Maven依賴

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.5</version>
</dependency>
<dependency>
  <groupId>org.apache.xbean</groupId>
  <artifactId>xbean-spring</artifactId>
  <version>4.10</version>
</dependency>

  

三、佇列-生產者生產訊息例項程式碼

public class JmsProducer {
    private static final String BROKER_URL = "tcp://192.168.229.129:61616";
    private static final String QUEUE_NAME = "queue01";
    private static final String TEXT_MESSAGE = "message";

    public static void main(String[] args) throws JMSException {
        // 1、建立連線工廠ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        // 2、通過連線工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 3、啟動JMS連線
        connection.start();
        // 4、通過連線物件建立JMS Session物件,第一個引數為是否開啟事務,第二個引數是訊息的簽收方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、建立Destination (Destination是 Queue/Topic的父介面)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、建立訊息的生產者,引數是Destination,也就是訊息的生產者會將訊息推送到哪裡.
        MessageProducer producer = session.createProducer(queue);
        // 7、建立訊息,建立三個訊息傳送至佇列
        for (int i = 1; i < 4; i++) {
            TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE + "0" + i);
            producer.send(textMessage);
        }
        // 8、釋放資源,先開後關,後開先關
        producer.close();
        session.close();
        connection.close();
        System.out.println("訊息成功傳送至佇列");
    }
}

  執行程式碼之後,登入ActiveMQ客戶端,可以看到訊息已經推送至佇列queue01中了,由於我們傳送了3個訊息,所以這裡的待處理訊息就是3,我們還沒有建立消費者,所以消費者數目為0,傳送了3個訊息,並且都進入了佇列,所以入隊的訊息數目為3,沒有消費者消費訊息,那麼出佇列的訊息數目為0.

  佇列相關名詞解釋:

名稱 簡介

Number Of Pending Messages

待處理的訊息數目:入隊的總數-出隊的總數

Number Of Consumers

消費者數目:消費端的消費者數目

Messages Enqueued

已經入隊的訊息數目,進入佇列的訊息的數目,這個數目只增不減,即使出隊了也不會減少

Messages Dequeued

已經出隊的訊息數目,也就是消費者消費掉的訊息數目

四、佇列-消費者訊息例項程式碼

public class JmsConsumer {
    private static final String BROKER_URL = "tcp://192.168.229.129:61616";
    private static final String QUEUE_NAME = "queue01";
    private static final String TEXT_MESSAGE = "message";

    public static void main(String[] args) throws JMSException {
        // 1、建立連線工廠物件
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        // 2、通過連線工廠獲取連線物件
        Connection connection = connectionFactory.createConnection();
        // 3、啟動JMS連線
        connection.start();
        // 4、通過connection物件獲取JMS Session物件
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、建立Destination物件(Queue/Topic的父介面是Destination)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、建立消費者物件
        MessageConsumer consumer = session.createConsumer(queue);
        // 7、消費者消費訊息
        while (true) {
            // 時間為毫秒值,如果過了10s之後,消費者沒有接收到新的訊息,那麼消費者自動和佇列斷開
            TextMessage textMessage = (TextMessage) consumer.receive(10*1000);
            /*TextMessage textMessage = (TextMessage) consumer.receive();*/
            if (textMessage != null) {
                String text = textMessage.getText();
                if (text != null) {
                    System.out.println("接收到的訊息是:" + text);
                }
            } else {
                break;
            }
        }
        // 8、釋放資源
        consumer.close();
        session.close();
        connection.close();
    }
}

五、消費者幾個細節

  1、注意:consumer.receive()方法有兩個

// 一直接收訊息,消費者和佇列不會斷開連線
TextMessage textMessage = (TextMessage) consumer.receive();
// 超過了時間(毫秒值),消費者將和佇列自動斷開連線
TextMessage textMessage = (TextMessage) consumer.receive(10*1000);

  01、先看 consumer.receive(),當沒有新訊息的時候,消費者一直處於等待的狀態,消費者數目為1

  

  02、consumer.receive(10*1000):10秒鐘後,消費者將會和佇列斷開連線,消費者數目為0

  2、消費者通過 consumer.setMessageListener(MessageListerer messageListener)的方式消費訊息

// 消費者通過訊息監聽的方式消費訊息
consumer.setMessageListener(new MessageListener() {
	@Override
	public void onMessage(Message message) {
		if (message != null && message instanceof TextMessage) {
			TextMessage textMessage = (TextMessage) message;
			try {
				System.out.println("接收到的訊息是:" + textMessage.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
});
// 保證在連線到MQ之前,控制檯不滅,也就是消費到了訊息之後才去釋放資源,Dubbo中也有
System.in.read();

  3、先啟動生產者/消費者對訊息的影響

生產者 消費者 結果
先啟動生產者生產訊息 然後只啟動一號消費者 消費者可以正常消費訊息
先啟動生產者生產訊息 然後啟動一號消費者,接著啟動二號消費者 一號消費者消費完所有的訊息,二號消費者無法消費訊息
然後啟動生產者生產訊息 先啟動一號消費者,接著啟動二號消費者 一號消費者,二號消費者都能消費訊息,類似於負載均衡

Number Of Pending Messages