1. 程式人生 > >01-消息中間件概述和ActiveMq入門

01-消息中間件概述和ActiveMq入門

異步處理 while private not 啟動 多個 ati 設置 ack

1.mq解決的問題

  • 系統異步處理
  • 應用解耦
  • 流量削峰
  • 日誌處理
  • 消息通信

2.消息中間件的2中模型

2.1 Point-to-Point(P2P) / 點對點 / 類比:送快遞

技術分享圖片

特點:
+ 一個消費生產者必須有一個消息消費者。一對一的關系
+ 一個消息發送到queue中,如果mqserver重啟,消息不會丟失(當然也可以設置為丟失。缺省是不會丟失的)

2.2 Topic/ 主題(發布訂閱(Pub/Sub) )/類比:廣播

技術分享圖片

特點:
+ 一個生產者生產的消息可以同時被多個消息消費者消費。一對多。
+ 一個消費者可以消費來自不同生產者的消息。

3. Java Messaging Service規範

3.1 JMS規範模型包含如下幾個要素

  • 連接工廠
  • 獲取連接
  • 創建會話
  • JMS的目的/broker
  • 創建生產者
  • 創建消費者

5. Hello ActiveMQ

5.1 原生api


package com.hs.gz.hellodemo.mq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;

/**
 * 生產者
 * @author hasee
 *
 */
public class Producer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKEURL = "tcp://139.199.158.112:61616";
    private static final int SENDNUM = 3;
    public static void main(String[] args) throws Exception {
        //工廠
        ConnectionFactory factory;
        //連接
        Connection connection;
        //會話
        Session session;
        //目的地
        Destination destination;
        //消費者
        MessageProducer producer;
        //指定用戶名,密碼和url來創建連接工廠
        factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
        //從連接工廠中獲取麗連接
        connection = factory.createConnection();
        //從連接中創建session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         /* 創建一個名為HelloWorld消息隊列*/
        destination = session.createQueue("HelloWorld");
        /*往隊列裏面註冊生產者*/
        producer = session.createProducer(destination);
        for (int i = 0; i < SENDNUM; i++) {
             String msg = "發送消息"+i+" "+System.currentTimeMillis();
             TextMessage textMessage = session.createTextMessage(msg);
             producer.send(textMessage);
        }
        System.out.println("生成者生產ok.....");
        producer.close();
        session.close();
        connection.close();
    }
}

package com.hs.gz.hellodemo.mq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消費者
 * @author hasee
 *
 */
public class Consumer {
    /* 默認連接用戶名 */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /* 默認連接密碼 */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /* 默認連接地址 */
    private static final String BROKEURL = "tcp://139.199.158.112:61616";
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory;
        Connection connection;
        Session session;
        Destination destination = null;
        MessageConsumer consumer;
        factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
        connection = factory.createConnection();
         /* 啟動連接*/
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue("HelloWorld");
        consumer = session.createConsumer(destination);
        //一直監聽mqserver,如果有待消費的消息就進行消費
        Message message;
        while((message = consumer.receive()) != null ) {
            System.out.println("consumer..." + ((TextMessage)message).getText());
        }
        
        consumer.close();
        session.close();
        connection.close();
    }
}

01-消息中間件概述和ActiveMq入門