1. 程式人生 > 實用技巧 >ActiveMQ訊息佇列

ActiveMQ訊息佇列

1.四種訊息佇列:
1.1(Storm-)Kafka
1.2RabbitMQ安全性最高
1.3RocketMQ==>阿里
1.4ActiveMQ

2.為什麼要使用訊息佇列?

主要解決系統之間的通訊問題
2.1. 儘量消除系統耦合性

2. 2.非同步訊息傳遞(非同步通訊)

2.3. 流量削峰

3.JMS(Java Message Service)Java 訊息服務

3.1. JMS是Java EE的規範之一,定義了訪問訊息中介軟體的介面;

3.2. JMS規範指出訊息傳遞應該是非同步的、非阻塞的;

4.JMS的核心API
4.1. ConnectionFactory:連線工廠,用於建立Connection

4.2. Connection:客戶端和MQ伺服器的一次連線
4.3. Session:一次會話

4.4. Destination:生產者生產訊息的目的地,消費者消費訊息的來源
Queue:只能消費一次
Topic:可以消費多次

4.5. MessageProducer:訊息生產者,用於將訊息傳送訊息佇列
4.6. MessageConsumer:消費者

4.7. Message:訊息
TextMessage
MapMessage
ObjectMessage
BytesMessage
StreamMessage
4.8. MessageListener

5.JMS訊息型別

5.1. 點對點訊息(P2P): 一條訊息只能被一個消費者消費,生產者和消費者沒有時間上的依賴性


5.2. 釋出訂閱 : 一條訊息可以被多個消費者消費
生產者和消費者有時間上的依賴性(生產者在生產訊息的時候,至少應該有一個消費者處於線上狀態)
可以建立一個持久化的消費者訂閱佇列

6.什麼是ActiveMQ
1. ActiveMQ是最受歡迎的、功能強大的開源訊息和整合伺服器

2. ActiveMQ 速度快,支援跨語言的客戶端和協議,很容易進行企業整合,支援許多高 級特性,完全支援JMS1.1和J2EE1.4

7.ActiveMQ的特點

7.1. 支援多語言客戶端和協議,如Java、C、C++、Ruby、Perl、Python、PHP

7.2. 支援許多高階特性,如訊息分組、虛擬目的地、萬用字元、組合目的地;

7.3. 完全支援JMS1.1和J2EE1.4;

7.4. 可以很容易整合到Spring應用程式中;

7.5. 通過大部分J2EE伺服器的測試,如TomEE、JBoss、WebLogic等;

7.6. 支援高效的JDBC持久化方式;

7.7. 叢集的支援;

7.8. ......

8.上程式碼

8.1生產者和消費者(原生狀態)

/**
 * 生產者
 */
public class HelloProducer {


    public static void main(String[] args) throws JMSException {
        // 1. 建立ConnectionFactory(使用者名稱、密碼、連線地址)
        // 叢集的情況:“failover:(tcp://192.168.1.100:61616,tcp://192.168.1.101:61616,tcp://192.168.1.102:61616)?Randomize=false”
        ConnectionFactory factory = new ActiveMQConnectionFactory(null,
                null,
                "tcp://localhost:61616");
        //2.
        Connection connection = factory.createConnection();
        connection.start();

        //3.建立session
        //引數(是否開啟事務,客戶端自動簽收訊息)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立Destination(目的地)
        //引數:佇列名稱,如果不存在則建立一個新的佇列
        Queue queue = session.createQueue("hello");

        //5.建立生產者
        MessageProducer producer = session.createProducer(queue);

        //6.建立訊息,傳送訊息
        for(int i= 0;i<10;i++){
            TextMessage message = session.createTextMessage("這是第" + i + "條訊息");
            producer.send(message);
        }
        producer.close();
        session.close();
        connection.close();
        System.out.println("傳送完成");
    }
}
=============================================================================================
/**
* 消費者
*/
public class HelloConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");

Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue("hello");

MessageConsumer consumer = session.createConsumer(queue);
while(true){
TextMessage msg = (TextMessage)consumer.receive();
if(msg!=null){
System.out.println(msg.getText().toString());
Thread.sleep(1000);
}
}
}
}

8.2生產者和消費者(手動簽收訊息)

/**
 * 生產者
 */
public class SelectorProducer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(null,
                null,"tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();

        //客戶端手動簽收訊息
        Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("selector");

        MessageProducer producer = session.createProducer(queue);
        TextMessage msg = session.createTextMessage("地址2");
        msg.setIntProperty("age",12);
        msg.setStringProperty("name","et");

        TextMessage msg2 = session.createTextMessage("地址2");
        msg2.setIntProperty("age",2);
        msg2.setStringProperty("name","et");

        producer.send(msg);
        producer.send(msg2);
        producer.close();
        session.close();
        connection.close();
        System.out.println("傳送成功");
    }
}
========================================================================================================
/**
* 消費者
*/
public class SelectConsumer {
public static void main(String[] args) throws JMSException, InterruptedException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");

Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Queue queue = session.createQueue("selector");

MessageConsumer consumer = session.createConsumer(queue, "name = 'et' and age = 2");
consumer.setMessageListener(message -> {
TextMessage textMessage = (TextMessage)message;
try {
System.out.println(textMessage.getText());
//簽收訊息,通知佇列刪除訊息
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}

8.3釋出和訂閱狀態(及持久化訂閱)

/**
 * 釋出
 */
public class HelloPublisher {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(null,
                null,
                "tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic("topic");

        MessageProducer producer = session.createProducer(topic);

        MapMessage mapMessage = session.createMapMessage();
        mapMessage.setString("name","et2006");
        mapMessage.setInt("id",111111);
        producer.send(mapMessage);
        producer.close();
        session.close();
        connection.close();
        System.out.println("傳送完成");
    }
}
========================================================================================
/**
* 訂閱
*/
public class HelloSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Topic topic = session.createTopic("topic");

MessageConsumer consumer = session.createConsumer(topic);

consumer.setMessageListener(new HelloListener());
}
}
class HelloListener implements MessageListener{

@Override
public void onMessage(Message message) {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
==========================================================================================
/**
* 持久化訂閱者
*/
public class DurableSubscriber {
public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory(null,
null,
"tcp://localhost:61616");

Connection connection = factory.createConnection();
connection.setClientID("zs");
connection.start();

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("topic");
MessageConsumer consumer = session.createDurableSubscriber(topic,"zs");
consumer.setMessageListener(message -> {
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
int id = mapMessage.getInt("id");
String name = mapMessage.getString("name");
System.out.println(id + "-" + name);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}