ActiveMQ 發送和就收消息
阿新 • • 發佈:2018-03-29
ace listen OS factor row conn ack 多個 tar
一、添加 jar 包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.2</version> </dependency>
二、消息傳遞的兩種形式
1、點對點:發送的消息只能被一個消費者接收,第一個消費者接收後,消息沒了
2、發布/訂閱:消息可以被多個消費者接收 。發完消息,如果沒有消費者接收,這消息會自動消失。也就是說,消費者服務必須是啟動的狀態。( topic 消息在 ActiveMQ 服務端默認不是持久化的,可以通過配置文件配置持久化 )
三、點對點發送消息
/** * 點到點形式發送消息 * @throws Exception */ @Test public void testQueueProducer() throws Exception{ //1、創建一個連接工廠,需要指定服務的 ip 和端口 String brokerURL = "tcp://192.168.25.129:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //2、使用工廠對象創建一個 Connection 對象Connection connection = connectionFactory.createConnection(); //3、開啟連接,調用 Connection 對象的 start 方法 connection.start(); //4、創建一個 Session 對象。 //第一個參數:是否開啟事務(一般不開啟,如果開啟事務,第二個參數沒意義); //第二個參數:應答模式。自動應答或者手動應答,一般是自動應答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、使用 Session 對象創建一個 Destination 對象。兩種形式 queue、topic。 Queue queue = session.createQueue("test-queue"); //6、使用 Session 對象創建一個 Producer 對象 MessageProducer producer = session.createProducer(queue); //7、創建一個 Message 對象,可以使用 TextMessage。下面兩種方式都可以 /*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello ActiveMQ");*/ TextMessage textMessage = session.createTextMessage("hello ActiveMQ"); //8、發布消息 producer.send(textMessage); //9、關閉資源 producer.close(); session.close(); connection.close(); }
四、點對點接收消息
/** * 點對點接收消息 * @throws Exception */ @Test public void testQueueConsumer() throws Exception{ //1、創建一個 ConnectionFactory 對象連接 MQ 服務器 String brokerURL = "tcp://192.168.25.129:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //2、創建一個連接對象 Connection connection = connectionFactory.createConnection(); //3、開啟連接 connection.start(); //4、使用 Connection 對象 創建一個 Session 對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、創建一個 Destination 對象。queue 對象 Queue queue = session.createQueue("test-queue"); //6、使用 Session 對象創建一個消費者 MessageConsumer consumer = session.createConsumer(queue); //7、接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //8、打印結果 TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //9、等待接收消息。( 接收到消息後才網下面執行。關閉資源 ) System.in.read(); //10、關閉資源 consumer.close(); session.close(); connection.close(); }
五、廣播發送消息
/** * 廣播發送消息 * @throws Exception */ @Test public void testTopicProducer() throws Exception{ //1、創建一個連接工廠,需要指定服務的 ip 和端口 String brokerURL = "tcp://192.168.25.129:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //2、使用工廠對象創建一個 Connection 對象 Connection connection = connectionFactory.createConnection(); //3、開啟連接,調用 Connection 對象的 start 方法 connection.start(); //4、創建一個 Session 對象。 //第一個參數:是否開啟事務(一般不開啟,如果開啟事務,第二個參數沒意義); //第二個參數:應答模式。自動應答或者手動應答,一般是自動應答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、使用 Session 對象創建一個 Destination 對象。兩種形式 queue、topic。 Topic topic = session.createTopic("test-topic"); //6、使用 Session 對象創建一個 Producer 對象 MessageProducer producer = session.createProducer(topic); //7、創建一個 Message 對象,可以使用 TextMessage。下面兩種方式都可以 /*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello ActiveMQ");*/ TextMessage textMessage = session.createTextMessage("hello ActiveMQ"); //8、發布消息 producer.send(textMessage); //9、關閉資源 producer.close(); session.close(); connection.close(); }
六、廣播接收消息
/** * 廣播接收消息 * @throws Exception */ @Test public void testTopicConsumer() throws Exception{ //1、創建一個 ConnectionFactory 對象連接 MQ 服務器 String brokerURL = "tcp://192.168.25.129:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //2、創建一個連接對象 Connection connection = connectionFactory.createConnection(); //3、開啟連接 connection.start(); //4、使用 Connection 對象 創建一個 Session 對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5、創建一個 Destination 對象。Topic 對象 Topic topic = session.createTopic("test-topic"); //6、使用 Session 對象創建一個消費者 MessageConsumer consumer = session.createConsumer(topic); //7、接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //8、打印結果 TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); System.out.println("topic消費者"); //9、等待接收消息。( 接收到消息後才網下面執行。關閉資源 ) System.in.read(); //10、關閉資源 consumer.close(); session.close(); connection.close(); }
ActiveMQ 發送和就收消息