1. 程式人生 > >ActiveMq的訊息佇列的簡單使用

ActiveMq的訊息佇列的簡單使用

安裝

程式碼

  • 訊息的傳送者

package com.bonade.mall.mqTest;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import
javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.StringUtils; /** * 生產者 * @author lmf * */ public class Producter { // ActiveMq 的預設使用者名稱
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // ActiveMq 的預設登入密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // ActiveMQ 的連結地址 private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; AtomicInteger count = new
AtomicInteger(0); // 連結工廠 ConnectionFactory connectionFactory; // 連結物件 Connection connection; // 事務管理 Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); public void init() { try { // 建立一個連結工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL); // 從工廠中建立一個連結 connection = connectionFactory.createConnection(); // 開啟連結 connection.start(); // 建立一個事務(這裡通過引數可以設定事務的級別) session = connection .createSession(true, Session.SESSION_TRANSACTED); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage(String disname) { try { // 建立一個訊息佇列 Queue queue = session.createQueue(disname); // 訊息生產者 MessageProducer messageProducer = null; if (threadLocal.get() != null) { messageProducer = threadLocal.get(); } else { messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while (true) { if(StringUtils.isNotBlank(disname)){ int num = count.getAndIncrement(); // 建立一條訊息 TextMessage msg = session.createTextMessage(Thread .currentThread().getName() + "productor:這是生產者從這裡傳送訊息!,count:" + num); // 傳送訊息 messageProducer.send(msg); // 提交事務 session.commit(); } } } catch (JMSException e) { e.printStackTrace(); } } }
  • 訊息的消費者
package com.bonade.mall.mqTest;
import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消費者
 * @author lmf
 *
 */
public class Comsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  • 測試程式碼

public class TestMq {
    public static void main(String[] args) {
        Producter producter = new Producter();
        producter.init();
        TestMq testMq = new TestMq();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        // Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        // Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        // Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        // Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable {
        Producter producter;

        public ProductorMq(Producter producter) {
            this.producter = producter;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    producter.sendMessage("TEST-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}       
public class TestConsumer {
     public static void main(String[] args){
            Comsumer comsumer = new Comsumer();
            comsumer.init();
            TestConsumer testConsumer = new TestConsumer();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        }

        private class ConsumerMq implements Runnable{
            Comsumer comsumer;
            public ConsumerMq(Comsumer comsumer){
                this.comsumer = comsumer;
            }

            @Override
            public void run() {
                while(true){
                    try {
                        comsumer.getMessage("TEST-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
}

需要注意先啟動TestMq 訊息的傳送者程式碼 然後啟動TestConsumer程式碼