1. 程式人生 > >activemq訊息佇列

activemq訊息佇列

1.下載http://activemq.apache.org/activemq-5157-release.html包,解壓後直接雙擊C:\Users\Mac\Desktop\apache-activemq-5.15.7-bin\apache-activemq-5.15.7\bin\win64下的activemq.bat即可執行activemq服務,瀏覽器執行http://localhost:8161/admin/topics.jsp即可檢視訊息佇列資訊。(包括訊息生產者,訊息消費者,訊息佇列queue、訂閱topic)

2.預設是使用者密碼和使用者賬號是:system、manager,在config檔案下的

3.可手動修改配置:

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"  useJmx="true">  /**   useJmx="true"
    <managementContext createConnector="true"/>   // createConnector="true"

4.queue普通佇列程式碼,一個訊息生產者和一個訊息消費者,

 生產者: 

/**
 * 訊息生產者
 * @author Mac
 *
 */
public class QueueProducer {

    public static void main(String[] args) {
        //連線資訊設定
        String username="system";
        String password="manager";
        String brokerUrl="failover://tcp://localhost:61616";
        //連線工廠
        ConnectionFactory cf=null;
        //連線
        Connection c=null;
        //會話 接受或傳送訊息訊息的執行緒
        Session session=null;
        //訊息目的地,訊息佇列
        Destination des=null;
        //訊息生產者
        MessageProducer mp=null;
        //傳送訊息
        TextMessage tm=null; 
        //例項化連線工廠
        cf=new ActiveMQConnectionFactory(username,password,brokerUrl);
        try {
            c=cf.createConnection();//建立連線
            c.start();
            session=c.createSession(true,Session.AUTO_ACKNOWLEDGE);//建立會話,auto_acknowledge自動應答
            des=session.createQueue("QueueTest");//建立名為queueTest名稱的訊息佇列
            mp=session.createProducer(des);//訊息生產者 
            tm=session.createTextMessage("queue測試新訊息666888");//建立訊息
            mp.send(tm);
            session.commit();
        } catch (Exception e) {
             e.printStackTrace();
        }finally{
            if(c!=null){
                try {
                    c.close();
                } catch (Exception
                        e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        
    }
}
 

消費者: 

/**
 * 訊息消費者
 * @author Mac
 *
 */
public class QueueConsumer {
 
    public static void main(String[] args) {
        String username="system";
        String password="manager";
        String url="failover://tcp://localhost:61616";
        
        ConnectionFactory cf=null;//連線工廠
        Connection c=null;//連線例項
        Session session=null;//會話
        Destination des=null;//訊息佇列
        MessageConsumer mc=null;//訊息消費者
        
        cf=new ActiveMQConnectionFactory(username,password,url);
        
        try {
            c=cf.createConnection();
            c.start();
            session=c.createSession(true, Session.AUTO_ACKNOWLEDGE);
            des=session.createQueue("QueueTest");//連線到名為QueueTest的訊息佇列
            mc=session.createConsumer(des);//建立訊息消費者
            
            while(true){
                TextMessage tm=(TextMessage) mc.receive(100000);
                if(tm!=null){
                    String text=tm.getText();
                    System.out.println("成功消費到訊息佇列中的訊息:"+text);
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            try {
                c.close();
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
    }

5.topic訂閱模式程式碼:

生產者: 

/**
 * 主題生產者
 * @author Mac
 *
 */
public class TopicProducer {

    public static void main(String[] args) {
        
        String username="system";
        String password="manager";
        String url="failover://tcp://localhost:61616";
        
        ConnectionFactory cf=null;
        Connection c=null;
        Session s=null; 
        TextMessage tm=null;
        Topic top=null;
        MessageProducer mp=null; 
        try {
            cf=new ActiveMQConnectionFactory(username,password,url);//建立連線 
            c=cf.createConnection();
            c.start();//開始連線
            s=c.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立會話
            top=s.createTopic("TopicTest");//建立topic訂閱佇列
            
            mp=s.createProducer(top);//建立訂閱訊息生產者
            mp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//資料不持久化
             
              for (int i=0; i<10; i++) {
                    //建立要傳送的文字資訊
                  tm = s.createTextMessage("Topic主題測試" +(i+1));
                    //通過主題生產者發出訊息 
                    mp.send(tm);
                    System.out.println("傳送成功:" + tm.getText());
                } 
            
            s.commit(); 
        } catch (Exception e) {
             e.printStackTrace();
        }finally{
            if(null!=c){
                try {
                     //c.close();
                } catch (Exception e) { 
                    e.printStackTrace();
                }
            }
        }
        
    }
}
 

消費者: 

/*
 * 主題消費者
 */
public class TopicConsumer {

    public static void main(String[] args) {
        
        String username="system";
        String password="manager";
        String url="failover://tcp://localhost:61616";
        
        ConnectionFactory connectionFactory=null;
        Connection connection=null;
        Session session=null;  
        Topic topic=null;
        MessageConsumer messageConsumer=null; 
        
        connectionFactory=new ActiveMQConnectionFactory(username,password,url);
        try {
            connection=connectionFactory.createConnection();
            connection.start();
            session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            topic=session.createTopic("TopicTest"); 
            messageConsumer=session.createConsumer(topic);
            System.out.println("1");
            messageConsumer.setMessageListener(new myMessageListener()); 
            System.out.println("2");
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            if(null!=connection){
                try {
                     //connection.close();
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                
                    e.printStackTrace();
                }
            }
        }
    }
}

class myMessageListener implements MessageListener{

    @Override
    public void onMessage(Message mes) {
        System.out.println("xx");
         TextMessage tm=(TextMessage) mes;
         try {
            System.out.println("訊息:"+tm.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
}