1. 程式人生 > 實用技巧 >ActiveMQ生產者消費者編碼

ActiveMQ生產者消費者編碼

匯入依賴

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.0</version>
</dependency>

一個簡單的生產者

public class JmsProduce {

    //  linux 上部署的activemq 的 IP 地址 + activemq 的埠號
    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String QUEUE_NAME = "queue01";

    

    public static void main(String[] args) throws  Exception{

        // 1 按照給定的url建立連線工產,這個構造器採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 2 通過連線工廠連線 connection  和 啟動
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();

        //  啟動
        connection.start();

        // 3 建立回話  session
        // 兩個引數,第一個事務, 第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 4 建立目的地 (兩種 : 佇列/主題   這裡用佇列)
        Queue queue = session.createQueue(QUEUE_NAME);

        // 5 建立訊息的生產者
        MessageProducer messageProducer = session.createProducer(queue);

        // 6 通過messageProducer 生產 3 條 訊息傳送到訊息佇列中
        for (int i = 1; i < 4 ; i++) {

            // 7  建立字訊息
            TextMessage textMessage = session.createTextMessage("msg--" + i);

            // 8  通過messageProducer釋出訊息
            messageProducer.send(textMessage);

        }

        // 9 關閉資源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 訊息傳送到MQ完成 ****");

    }

}

在頁面上顯示

與之對應的訊息消費者

public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String QUEUE_NAME = "queue01";   // 1對1 的佇列

    public static void main(String[] args) throws Exception{

        // 1 按照給定的url建立連線工程,這個構造器採用預設的使用者名稱密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        // 2 通過連線工廠連線 connection  和 啟動
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();

        //  啟動
        connection.start();

        // 3 建立回話  session
        // 兩個引數,第一個事務, 第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 4 建立目的地 (兩種 : 佇列/主題   這裡用佇列)
        Queue queue = session.createQueue(QUEUE_NAME);

        // 5 建立訊息的消費者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        while(true){
            // 這裡是 TextMessage 是因為訊息傳送者是 TextMessage , 接受處理的
            // 也應該是這個型別的訊息
						//receive()方法 帶參receive(4000L)超時4秒就會走
            TextMessage message = (TextMessage)messageConsumer.receive();
            if (null != message){
                System.out.println("****消費者的訊息:"+message.getText());
            }else {
                break;
            }
        }

        messageConsumer.close();
        session.close();
        connection.close();
    }
}

這個代表有一個訊息消費者處理訊息,並且處理了三條訊息

先啟動兩個消費者,再生產6條訊息,請問消費情況如何?

一人一半

activemq 好像自帶負載均衡,當先啟動兩個佇列(Queue)的消費者時,在啟動生產者發出訊息,此時的訊息平均的被兩個消費者消費。 並且消費者不會消費已經被消費的訊息(即為已經出隊的訊息)