1. 程式人生 > >ActiveMQ兩種訊息模式以及為什麼使用MQ

ActiveMQ兩種訊息模式以及為什麼使用MQ

1.為什麼使用MQ

 a.高併發

在高併發分散式環境下,由於來不及同步處理,請求往往發生堵塞;通過訊息佇列,可以非同步處理請求,緩解系統的壓力;

b.鬆耦合性

一個應用傳送訊息到MQ之後並不關係訊息如何或者什麼時候被傳遞,同樣的訊息的接收者也不關係訊息從哪裡來的。在不同的環境中這樣做的好處是允許客戶端使用不同的語言編寫甚至使用不同的線路協議,MQ作為中間人存在,允許不同環境的整合和非同步互動。

2.點對點訊息

package com.imooc.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSProducer {

    //預設連線使用者名稱
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //預設連線密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //預設連線地址
    private static final String BROKEURL = "tcp://localhost:61618";
    //傳送的訊息數量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //連線工廠
        ConnectionFactory connectionFactory;
        //連線
        Connection connection = null;
        //會話 接受或者傳送訊息的執行緒
        Session session;
        //訊息的目的地
        Destination destination;
        //訊息生產者
        MessageProducer messageProducer;
        //例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

        try {
            //通過連線工廠獲取連線
            connection = connectionFactory.createConnection();
            //啟動連線
            connection.start();
            //建立session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //建立一個名稱為HelloWorld的訊息佇列
            destination = session.createQueue("HelloWorld");
            //建立訊息生產者
            messageProducer = session.createProducer(destination);
            //傳送訊息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    /**
     * 傳送訊息
     * @param session
     * @param messageProducer  訊息生產者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < JMSProducer.SENDNUM; i++) {
            //建立一條文字訊息 
            TextMessage message = session.createTextMessage("ActiveMQ 傳送訊息" +i);
            System.out.println("傳送訊息:Activemq 傳送訊息" + i);
            //通過訊息生產者發出訊息 
            messageProducer.send(message);
        }

    }
}

package com.imooc.jms;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連線使用者名稱
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連線密碼
    private static final String BROKEURL = "tcp://localhost:61618";//預設連線地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//連線工廠
        Connection connection = null;//連線

        Session session;//會話 接受或者傳送訊息的執行緒
        Destination destination;//訊息的目的地

        MessageConsumer messageConsumer;//訊息的消費者

        //例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);

        try {
            //通過連線工廠獲取連線
            connection = connectionFactory.createConnection();
            //啟動連線
            connection.start();
            //建立session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一個連線HelloWorld的訊息佇列
            destination = session.createQueue("HelloWorld");
            //建立訊息消費者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if(textMessage != null){
                    System.out.println("收到的訊息:" + textMessage.getText());
                }else {
                    break;
                }
            }


        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}
3.釋出者/訂閱模式
package com.imooc;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * MQ生產者
 */
public class MQ_Producer {
	
	public static final String USER_NAME = "admin";//mq使用者名稱
	public static final String PASSWORD = "admin";//mq密碼
	public static final String BROKER_URL = "tcp://localhost:61618";//mqURL
	
	private static ActiveMQConnectionFactory connectionFactory;
	
	private static Connection connection;
	
	private static Session session;
	
	private static Destination[] destinations;//目的地
	
	private static MessageProducer producer;
	
	static{
		connectionFactory = new ActiveMQConnectionFactory(
				USER_NAME, PASSWORD, BROKER_URL);
		
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session  = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e) {
			e.printStackTrace();
		}
		
	}
	
	public static void sendMessage(String[] stocks) throws JMSException {
		for (int i = 0; i < stocks.length; i++) {
			Message message = createStockMessage(stocks[i]);
			producer = session.createProducer(destinations[i]);
			producer.send(destinations[i],message);
			System.out.println("Sender 傳送: " + message.getStringProperty("stock"));
		}
	}
	
	public static void setTopics(String[] stocks) throws JMSException {
		destinations = new Destination[stocks.length];
		for (int i = 0; i < stocks.length; i++) {
			destinations[i] = session.createTopic("STOCKS." + stocks[i]);
		}

	}
	
	
	private static Message createStockMessage(String stock) throws JMSException {
		MapMessage message = session.createMapMessage();
		message.setString("stock",stock);
		message.setDouble("price",1.00);
		message.setDouble("offer",0.01);
		message.setBoolean("up",true);
		return message;
	}
	
	
	
	
	
	public static void close() {
		if(session != null) {
			try {
				session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
		if(connection != null) {
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	
	
	
	
	
	
}
package com.imooc;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * MQ消費者
 */
public class MQ_Consumer {
	
	public static final String BROKER_URL = "tcp://localhost:61618";//mqURL
	
	private static ActiveMQConnectionFactory connectionFactory;
	
	private static Connection connection;
	
	private static Session session;
	
	private static MessageConsumer consumer;
	
	static {
		connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
	
	public static void setTopic(String stock) throws JMSException {
		Destination destination = session.createTopic("STOCKS." + stock);
		consumer = session.createConsumer(destination);
	}
	
	
	public static void setMessageListener(MessageListener listener) throws JMSException{
		consumer.setMessageListener(listener);
	}
	
	
	
	public static void close() {
		if(session != null) {
			try {
				session.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
		if(connection != null) {
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	

}

public class MyListener implements MessageListener {

	@Override
	public void onMessage(Message message) {
		MapMessage map = (MapMessage) message;
		try {
			String stock = map.getString("stock");
			double price = map.getDouble("price");
			double offer = map.getDouble("offer");
			boolean up = map.getBoolean("up");
			System.out.println(stock + "---" + price + "---" + offer + "---" + up);
		} catch (JMSException e) {
			e.printStackTrace();
		}

package com.imooc;

import javax.jms.JMSException;

public class MQtestProducer {
	
	public static void main(String[] args) {
		
		String[] stocks = {"imooc1","imooc2","imooc3"};
		try {
			MQ_Producer.setTopics(stocks);
			MQ_Producer.sendMessage(stocks);
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			MQ_Producer.close();
		}
		
	}

}

package com.imooc;

import javax.jms.JMSException;

public class MQtestConsumer {
	
	public static void main(String[] args) {
		
		try {
			MQ_Consumer.setTopic("imooc1");
			MQ_Consumer.setMessageListener(new MyListener());
			
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			//註釋掉關閉連線的程式碼,否則程式停止,不能保持監聽
//			MQ_Consumer.close();
		}
	}

}




相關推薦

ActiveMQ訊息模式以及為什麼使用MQ

1.為什麼使用MQ  a.高併發 在高併發分散式環境下,由於來不及同步處理,請求往往發生堵塞;通過訊息佇列,可以非同步處理請求,緩解系統的壓力; b.鬆耦合性 一個應用傳送訊息到MQ之後並不關係訊息如何或者什麼時候被傳遞,同樣的訊息的接收者也不關係訊息從哪裡來的。在不同的環

ActiveMQ訊息模式,主題、佇列

  1、開發的模式流程如下: 2、佇列模式Queue 如果生產者產生了100條訊息,那麼兩個消費同時在的話,會分工合作來接收這100條訊息。就是每個消費者接收到50條來處理。 3、主題模式topic 如果生產者產生了100條訊息,消費者在還沒有訂閱這個主題之前,是

Redis的訊息模式

Redis的兩種訊息模式 佇列模式 釋出訂閱模式 佇列模式 佇列模式下每個消費者可以同時從多個伺服器讀取訊息,但是每個訊息只能被一個消費者讀取。 在佇列模式下其實每次插入的資料都是載入在最前面的,而先插入的資料在後面,列表中始終維持了一個佇列故稱之為佇

JMQ的訊息模式(點對點訊息模式、訂閱模式

一:JMQ的兩種訊息模式 訊息列隊有兩種訊息模式,一種是點對點的訊息模式,還有一種就是訂閱的模式. 1.1:點對點的訊息模式 點對點的模式主要建立在一個佇列上面,當連線一個列隊的時候,傳送端不需要知道接收端是否正在接收,可以直接向ActiveMQ傳送訊息,傳送的訊息,將

ActiveMQ的queue以及topic訊息處理機制分析

Q來作為jms匯流排,並且給大家介紹了activeMQ的叢集和高可用部署方案,本期給大家再介紹下,如何根據自己的專案需求,更好地使用activeMQ的兩種訊息處理模式。 1    queue與topic的技術特點對比 Topic Queue 概要 Publish Subscribe mes

ActiveMQ模式PTP和PUB/SUB<轉>

pub provide ops itl 通知 subscribe cin sdn cti 1.PTP模型 PTP(Point-to-Point)模型是基於隊列(Queue)的,對於PTP消息模型而言,它的消息目的是一個消息隊列(Queue),消息生產者每次發送消息總是把消

ActiveMQ訊息形式。

一、訊息的傳遞型別 點對點:即一個生產者和一個消費者一一對應 PTP的過程好比是兩個人打電話,這兩個人獨享這一條通訊鏈路。一方傳送訊息,另外一方接收 訊息 。在實際應用中因為有多個使用者對使用 PTP 的鏈路,它的通訊場景如下圖所示:

activemq中的訂閱模式以及訊息時長和確認機制

直接上程式碼 釋出主題 package com.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TopicPub {

ActiveMQ模式PTP和PUB/SUB

1.PTP模型 PTP(Point-to-Point)模型是基於佇列(Queue)的,對於PTP訊息模型而言,它的訊息目的是一個訊息佇列(Queue),訊息生產者每次傳送訊息總是把訊息送入訊息佇列中,訊息消費者總是從訊息佇列中讀取訊息.先進佇列的訊息將先被訊息消費者讀取

初學安卓開發隨筆之 Menu、toast 用法、活動的四啟動模式 以及 一個方便的Base活動類使用方法

pro 一點 cte edi standard oid nal xtend 解釋 Toast toast 是安卓系統的一種非常棒的提醒方式 首先定義一個彈出Toast的觸發點,比如可以是按鈕之類 其中 Toast.LENGTH_SHORT是指顯示時長 還有一個內置變量為To

FTP工作模式:主動模式(Active FTP)和被動模式

ftp 主動(port) 被動(pasv)模式在主動模式下,FTP客戶端隨機開啟一個大於1024的端口N向服務器的21號端口發起連接,然後開放N+1號端口進行監聽,並向服務器發出PORT N+1命令。服務器接收到命令後,會用其本地的FTP數據端口(通常是20)來連接客戶端指定的端口N+1,進行數據傳輸。在被

javaweb的開發模式

學生 log 其他 模式 圖片 cnblogs ima nbsp .com 以上截圖來自大神“孤傲蒼狼”的博客,學生收藏做筆記,無其他用法 javaweb的兩種開發模式

2.Mybatis開發模式

throws users pan except family RM Coding apache 不同 普通模式 自定義接口,接口實現類。 思考:需要sqlSessionFactory,生產sqlSession。 UserDao: package dao; import

SpringBoot配置Bean的方式--註解以及配置文件

cep tms ast doc ice print str PE 寫實 一、註解方式編寫實體類:package com.example.bean;import org.springframework.boot.context.properti

搜索引擎系列八:solr-部署詳解(solr部署模式介紹、獨立服務器模式詳解、SolrCloud分布式集群模式詳解)

nod 為什麽 用途 serve creat 復制 stand 數據 變量名 一、solr兩種部署模式介紹 Standalone Server 獨立服務器模式:適用於數據規模不大的場景 SolrCloud 分布式集群模式:適用於數據規模大,高可靠、高可用、高並發的場景 二

WebApp專家評委打分的進入模式

text ron wid 開始 兩種 模式 分享 評委打分 ... A模式: 當前PC端的前期設置如下: 【管理員允許時,只針對管理員指定選手】 選項選中。在現場時,管理員點擊 狀態未知 或下方紅框所示按鈕 發出打分允許指令時, 專家評委進入專家打分區後: 可以直接

Apache 工作模式 :prefork 、 worker

文章 serve 共享 生產 servers sta 能力 一個 請求 前言 1·最近這幾篇文章都在講Apache的一些安全與優化,這些針對服務器都是很重要的,掌握這些不僅提升了服務器的安全,還讓服務器的性能大大的提高。這樣就可以讓客戶有很好的體驗感。2·今天這篇文章依

常見的六設計模式以及應用場景

自己 產品 狀態 細節 是什麽 功能 順序 做什麽 核心 設計模式是對設計原則的具體化。用江湖話說就是武林秘籍,總結出來的一些固定套路,可以幫助有根基的程序員迅速打通任督二脈,從此做什麽都特別快。常用的模式及其場景如下。 1) 單例模式。 單例模式是一種常用的軟件設計模

設計模式(單例 簡單工廠)

一.設計模式 1.什麼是設計模式 2.為什麼用設計模式 3.什麼時候使用設計模式 4.怎樣用設計模式 二.單例模式 1.單例模式,是一種常用的軟體設計模式。在它的核心結構中只包含一個被稱為單例的特殊類。 通

區塊鏈C2C點對點系統搭建,區塊鏈點對點交易系統的交易模式你知道嗎?

在平時的交易過程中由於法幣交易受國家監管限制,現在許多交易所都沒有法幣交易版塊,我們平時看到的許多大型的交易所上面寫的是法幣交易,但是卻不是真正的法幣交易,而是點對點交易模式,像比較常見的ZB交易平臺,上面寫著法幣交易,確實C2C點對點交易模式,那麼接下來源中瑞黃顧問(具體加vx:ruiec1688)就給大家