AMQ初級使用(佇列模式+主題模式)
阿新 • • 發佈:2019-02-07
佇列模式
1.生產者程式碼:
package com.zzf.jms.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class AppProducer { private static final String url="tcp://192.168.1.6:61616"; private static final String queueName="queue-text"; public static void main(String[] args) throws JMSException { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //((ActiveMQConnectionFactory) connectionFactory).setUserName("admin"); //((ActiveMQConnectionFactory) connectionFactory).setPassword("admin"); //2.建立Connection Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立接受者目標 Destination destination = session.createQueue(queueName); //6.建立目標 MessageProducer producer = session.createProducer(destination); for (int i = 0; i <10 ; i++) { //7.建立訊息 TextMessage textMessage=session.createTextMessage("訊息內容"+i); //8.傳送訊息 producer.send(textMessage); // System.out.println(textMessage.getText()); } //9.關閉連線 connection.close(); } }
上文的url可以改成“tcp://localhost:61616”,或者通過cmd->ipconfig
2.消費者程式碼:
package com.zzf.jms.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class AppConsumer { private static final String url="tcp://192.168.1.6:61616"; private static final String queueName="queue-text"; public static void main(String[] args) throws JMSException { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //((ActiveMQConnectionFactory) connectionFactory).setUserName("admin"); //((ActiveMQConnectionFactory) connectionFactory).setPassword("admin"); //2.建立Connection Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立目標 Destination destination = session.createQueue(queueName); //6.建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); //7.建立一個監聽器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受訊息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //8.關閉連線 // connection.close(); } }
輸出(啟動消費者模式,再啟動生產者)
(佇列模式如上)
主題模式
1.釋出者程式碼:
package com.zzf.jms.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class AppProducer { private static final String url="tcp://192.168.1.6:61616"; private static final String topicName="topic-text"; public static void main(String[] args) throws JMSException { //1.建立ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); //((ActiveMQConnectionFactory) connectionFactory).setUserName("admin"); //((ActiveMQConnectionFactory) connectionFactory).setPassword("admin"); //2.建立Connection Connection connection = connectionFactory.createConnection(); //3.啟動連線 connection.start(); //4.建立會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.建立接受者目標 Destination destination = session.createTopic(topicName); //6.建立目標 MessageProducer producer = session.createProducer(destination); for (int i = 0; i <10 ; i++) { //7.建立訊息 TextMessage textMessage = session.createTextMessage("訊息內容" + i); //8.傳送訊息 producer.send(textMessage); // System.out.println(textMessage.getText()); } //9.關閉連線 connection.close(); } }
2.接受者程式碼:
package com.zzf.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class AppConsumer {
private static final String url="tcp://192.168.1.6:61616";
private static final String topicName="topic-text";
public static void main(String[] args) throws JMSException {
//1.建立ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//((ActiveMQConnectionFactory) connectionFactory).setUserName("admin");
//((ActiveMQConnectionFactory) connectionFactory).setPassword("admin");
//2.建立Connection
Connection connection = connectionFactory.createConnection();
//3.啟動連線
connection.start();
//4.建立會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.建立目標
Destination destination = session.createTopic(topicName);
//6.建立一個消費者
MessageConsumer consumer = session.createConsumer(destination);
//7.建立一個監聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接受訊息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.關閉連線
// connection.close();
}
}
唯一需要改的地方是session.createTopic.
和佇列模式一樣,先啟動接受者(啟動一個以上),再啟動釋出者
每個接受者都接受到了釋出者的釋出資訊(公眾號)
注意:需要匯入jar包:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>