1. 程式人生 > >jms入門

jms入門

ssi 開會 == hang 測試 span time ceo 連接

一.所需jar(maven)

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.14.3</version>
</dependency>

二.創建生產者

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import javax.jms.*; import java.io.Serializable; public class JMSProducer { public static void main(String[] args) throws JMSException { ConnectionFactory connectionfactory =new ActiveMQConnectionFactory("username","pwd","tcp://localhost:61616");
//創建與JMS服務的連接:ConnectionFactory被管理的對象,由客戶端創建,用來創建一個連接對象 Connection connection = connectionfactory.createConnection(); /* 確認消息的方式有如下三種: AUTO_ACKNOWLEDGE(自動通知) CLIENT_ACKNOWLEDGE(客戶端自行決定通知時機) DUPS_OK_ACKNOWLEDGE(延時//批量通知)
*/ /* 打開會話,一個單獨的發送和接受消息的線程上下文 為true時,事務會話必須session.commit(); */ Session session =connection.createSession(true,Session.AUTO_ACKNOWLEDGE ); JMSProducer qs = new JMSProducer(); qs.sendTextMsg(session,"helli text","jmsText"); /* MapMessage mapMsg = session.createMapMessage(); mapMsg.setString("name", "張三"); mapMsg.setInt("age", 35); qs.sendMap(session, mapMsg, "queue.msgMap");//發送map類型的消息 JMS jms = new JMS();//發送Object類型消息 jms.setName("zhangsan"); jms.setSex("男"); qs.sendObj(session,jms,"queue.msgObj"); */ session.commit(); //在事務性會話中,只有commit之後,消息才會真正到達目的地 session.close(); connection.close(); } /* 發送文本消息 */ public void sendTextMsg(Session session,String MsgContent,String name) throws JMSException{ Queue queue = new ActiveMQQueue(name); // Topic topic=new ActiveMQTopic(name); 創建topic MessageProducer msgProducer = session.createProducer(queue); Message textMessage = session.createTextMessage(MsgContent); msgProducer.send(textMessage); /* //發送byte字節 byte[] bs={1,2}; BytesMessage msg1= session.createBytesMessage(); msg1.writeBytes(bs); msgProducer.send(msg1); //流消息 StreamMessage streamMessage = session.createStreamMessage(); streamMessage.writeString("streamMessage流消息"); streamMessage.writeLong(55); producer.send(streamMessage); */ } /* 發送MAP類型消息 */ public void sendMap(Session session, MapMessage map, String name) throws JMSException { Topic topic=new ActiveMQTopic(name); // Queue queue = new ActiveMQQueue(name); MessageProducer msgProducer1=session.createProducer(topic); msgProducer1.send(map); //msgProducer1.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 設置了重啟之後消息會丟失 //msgProducer1.setTimeToLive(1000*60*60); 消息有效期1小時 } /* 發送Object類型消息 */ public void sendObj(Session session,Object obj,String name) throws JMSException{ Destination queue = new ActiveMQQueue(name); //發送對象時必須讓該對象實現serializable接口 ObjectMessage objMsg=session.createObjectMessage((Serializable) obj); MessageProducer msgProducer = session.createProducer(queue); msgProducer.send(objMsg); } }

三.創建消費者

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

import javax.jms.*;

public class JMSConsumer implements MessageListener{
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionfactory =null;
        Connection connection=null;
        Session session=null;
        if(connectionfactory==null){
            connectionfactory = new ActiveMQConnectionFactory("username","pwd","tcp://localhost:61616");
            //接收對象時,設置這個為true
            ((ActiveMQConnectionFactory) connectionfactory).setTrustAllPackages(true);
        }
        if(connection==null){
            connection = connectionfactory.createConnection();
            connection.start();
        }
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = new ActiveMQQueue("que");//根據發送的名稱接受消息
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new JMSConsumer());//不繼承MessageListener時可以用consumer.receive()手動接受消息

        Topic queue1 = new ActiveMQTopic("topic-name");
        MessageConsumer consumer1 = session.createConsumer(queue1);
        consumer1.setMessageListener(new JMSConsumer());

        Queue queue3 = new ActiveMQQueue("queue.msgMap");
        MessageConsumer consumer3 = session.createConsumer(queue3);
        consumer3.setMessageListener(new JMSConsumer());

        Queue queue2 = new ActiveMQQueue("queue.msgObj");
        MessageConsumer consumer2 = session.createConsumer(queue2);
        consumer2.setMessageListener(new JMSConsumer());
    }

    public void onMessage(Message message) {
        //instanceof 測試它所指向的對象是否是TextMessage類
        if(message instanceof TextMessage){ //接受文本消息
            TextMessage text = (TextMessage) message;
            try {
                System.out.println("message:"+message);
                System.out.println("發送的文本消息內容為:"+text.getText()); 
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
       
        if(message instanceof MapMessage){ //接收map消息
            MapMessage map = (MapMessage) message;
            try {
                System.out.println("姓名:"+map.getString("name"));
                System.out.println("年齡:"+map.getInt("age"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        
        if(message instanceof ObjectMessage){ //接收object
            try {
                System.out.println("ObjectMessage");
                ObjectMessage objMsg =(ObjectMessage) message;
                JMS jms=(JMS) objMsg.getObject();
                System.out.println(jms);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if(message instanceof BytesMessage){ //接收字節消息
            byte[] b = new byte[1024];
            int len = -1;
            BytesMessage byteMsg = (BytesMessage)message;
            try {
                while((len=byteMsg.readBytes(b))!=-1){
                    System.out.println(new String(b, 0, len));
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
            /*
                  if(message instanceof StreamMessage){ //接收流消息
                    StreamMessage message = (StreamMessage)message;
                    System.out.println(message.readString());
                    System.out.println(message.readLong());
                }
             */

    }
}

jms入門