ActiveMQ訊息佇列
1.四種訊息佇列:
1.1(Storm-)Kafka
1.2RabbitMQ安全性最高
1.3RocketMQ==>阿里
1.4ActiveMQ
2.為什麼要使用訊息佇列?
主要解決系統之間的通訊問題
2.1. 儘量消除系統耦合性
2. 2.非同步訊息傳遞(非同步通訊)
2.3. 流量削峰
3.JMS(Java Message Service)Java 訊息服務
3.1. JMS是Java EE的規範之一,定義了訪問訊息中介軟體的介面;
3.2. JMS規範指出訊息傳遞應該是非同步的、非阻塞的;
4.JMS的核心API
4.1. ConnectionFactory:連線工廠,用於建立Connection
4.2. Connection:客戶端和MQ伺服器的一次連線
4.3. Session:一次會話
4.4. Destination:生產者生產訊息的目的地,消費者消費訊息的來源
Queue:只能消費一次
Topic:可以消費多次
4.5. MessageProducer:訊息生產者,用於將訊息傳送訊息佇列
4.6. MessageConsumer:消費者
4.7. Message:訊息
TextMessage
MapMessage
ObjectMessage
BytesMessage
StreamMessage
4.8. MessageListener
5.JMS訊息型別
5.1. 點對點訊息(P2P): 一條訊息只能被一個消費者消費,生產者和消費者沒有時間上的依賴性
5.2. 釋出訂閱 : 一條訊息可以被多個消費者消費
生產者和消費者有時間上的依賴性(生產者在生產訊息的時候,至少應該有一個消費者處於線上狀態)
可以建立一個持久化的消費者訂閱佇列
6.什麼是ActiveMQ
1. ActiveMQ是最受歡迎的、功能強大的開源訊息和整合伺服器
2. ActiveMQ 速度快,支援跨語言的客戶端和協議,很容易進行企業整合,支援許多高 級特性,完全支援JMS1.1和J2EE1.4
7.ActiveMQ的特點
7.1. 支援多語言客戶端和協議,如Java、C、C++、Ruby、Perl、Python、PHP
7.2. 支援許多高階特性,如訊息分組、虛擬目的地、萬用字元、組合目的地;
7.3. 完全支援JMS1.1和J2EE1.4;
7.4. 可以很容易整合到Spring應用程式中;
7.5. 通過大部分J2EE伺服器的測試,如TomEE、JBoss、WebLogic等;
7.6. 支援高效的JDBC持久化方式;
7.7. 叢集的支援;
7.8. ......
8.上程式碼
8.1生產者和消費者(原生狀態)
/** * 生產者 */ public class HelloProducer { public static void main(String[] args) throws JMSException { // 1. 建立ConnectionFactory(使用者名稱、密碼、連線地址) // 叢集的情況:“failover:(tcp://192.168.1.100:61616,tcp://192.168.1.101:61616,tcp://192.168.1.102:61616)?Randomize=false” ConnectionFactory factory = new ActiveMQConnectionFactory(null, null, "tcp://localhost:61616"); //2. Connection connection = factory.createConnection(); connection.start(); //3.建立session //引數(是否開啟事務,客戶端自動簽收訊息) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.建立Destination(目的地) //引數:佇列名稱,如果不存在則建立一個新的佇列 Queue queue = session.createQueue("hello"); //5.建立生產者 MessageProducer producer = session.createProducer(queue); //6.建立訊息,傳送訊息 for(int i= 0;i<10;i++){ TextMessage message = session.createTextMessage("這是第" + i + "條訊息"); producer.send(message); } producer.close(); session.close(); connection.close(); System.out.println("傳送完成"); } }
=============================================================================================
/**
* 消費者
*/
public class HelloConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("hello");
MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage msg = (TextMessage)consumer.receive();
if(msg!=null){
System.out.println(msg.getText().toString());
Thread.sleep(1000);
}
}
}
}
8.2生產者和消費者(手動簽收訊息)
/** * 生產者 */ public class SelectorProducer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory(null, null,"tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); //客戶端手動簽收訊息 Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); Queue queue = session.createQueue("selector"); MessageProducer producer = session.createProducer(queue); TextMessage msg = session.createTextMessage("地址2"); msg.setIntProperty("age",12); msg.setStringProperty("name","et"); TextMessage msg2 = session.createTextMessage("地址2"); msg2.setIntProperty("age",2); msg2.setStringProperty("name","et"); producer.send(msg); producer.send(msg2); producer.close(); session.close(); connection.close(); System.out.println("傳送成功"); } }
========================================================================================================
/**
* 消費者
*/
public class SelectConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("selector");
MessageConsumer consumer = session.createConsumer(queue, "name = 'et' and age = 2");
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
//簽收訊息,通知佇列刪除訊息
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
8.3釋出和訂閱狀態(及持久化訂閱)
/** * 釋出 */ public class HelloPublisher { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory(null, null, "tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("topic"); MessageProducer producer = session.createProducer(topic); MapMessage mapMessage = session.createMapMessage(); mapMessage.setString("name","et2006"); mapMessage.setInt("id",111111); producer.send(mapMessage); producer.close(); session.close(); connection.close(); System.out.println("傳送完成"); } }
========================================================================================
/**
* 訂閱
*/
public class HelloSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new HelloListener());
}
}
class HelloListener implements MessageListener{
@Override
public void onMessage(Message message) {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
==========================================================================================
/**
* 持久化訂閱者
*/
public class DurableSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.setClientID("zs");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
MessageConsumer consumer = session.createDurableSubscriber(topic,"zs");
consumer.setMessageListener(message -> {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}