01-消息中間件概述和ActiveMq入門
阿新 • • 發佈:2018-12-02
異步處理 while private not 啟動 多個 ati 設置 ack
1.mq解決的問題
- 系統異步處理
- 應用解耦
- 流量削峰
- 日誌處理
- 消息通信
2.消息中間件的2中模型
2.1 Point-to-Point(P2P) / 點對點 / 類比:送快遞
特點:
+ 一個消費生產者必須有一個消息消費者。一對一的關系
+ 一個消息發送到queue中,如果mqserver重啟,消息不會丟失(當然也可以設置為丟失。缺省是不會丟失的)
2.2 Topic/ 主題(發布訂閱(Pub/Sub) )/類比:廣播
特點:
+ 一個生產者生產的消息可以同時被多個消息消費者消費。一對多。
+ 一個消費者可以消費來自不同生產者的消息。
3. Java Messaging Service規範
3.1 JMS規範模型包含如下幾個要素
- 連接工廠
- 獲取連接
- 創建會話
- JMS的目的/broker
- 創建生產者
- 創建消費者
5. Hello ActiveMQ
5.1 原生api
package com.hs.gz.hellodemo.mq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQMessage; /** * 生產者 * @author hasee * */ public class Producer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEURL = "tcp://139.199.158.112:61616"; private static final int SENDNUM = 3; public static void main(String[] args) throws Exception { //工廠 ConnectionFactory factory; //連接 Connection connection; //會話 Session session; //目的地 Destination destination; //消費者 MessageProducer producer; //指定用戶名,密碼和url來創建連接工廠 factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); //從連接工廠中獲取麗連接 connection = factory.createConnection(); //從連接中創建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); /* 創建一個名為HelloWorld消息隊列*/ destination = session.createQueue("HelloWorld"); /*往隊列裏面註冊生產者*/ producer = session.createProducer(destination); for (int i = 0; i < SENDNUM; i++) { String msg = "發送消息"+i+" "+System.currentTimeMillis(); TextMessage textMessage = session.createTextMessage(msg); producer.send(textMessage); } System.out.println("生成者生產ok....."); producer.close(); session.close(); connection.close(); } } package com.hs.gz.hellodemo.mq.demo; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消費者 * @author hasee * */ public class Consumer { /* 默認連接用戶名 */ private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; /* 默認連接密碼 */ private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; /* 默認連接地址 */ private static final String BROKEURL = "tcp://139.199.158.112:61616"; public static void main(String[] args) throws Exception { ConnectionFactory factory; Connection connection; Session session; Destination destination = null; MessageConsumer consumer; factory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); connection = factory.createConnection(); /* 啟動連接*/ connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("HelloWorld"); consumer = session.createConsumer(destination); //一直監聽mqserver,如果有待消費的消息就進行消費 Message message; while((message = consumer.receive()) != null ) { System.out.println("consumer..." + ((TextMessage)message).getText()); } consumer.close(); session.close(); connection.close(); } }
01-消息中間件概述和ActiveMq入門