1. 程式人生 > >JMS釋出/訂閱訊息傳送例子

JMS釋出/訂閱訊息傳送例子

前言

基於上篇文章"基於Tomcat + JNDI + ActiveMQ實現JMS的點對點訊息傳送"很容易就可以編寫一個釋出/訂閱訊息傳送例子,相關環境準備與該篇文章基本類似,主要的區別如下。

在Tomcat中配置JNDI

配置連線工廠和話題

    <Resource name="topic/connectionFactory" auth="Container"
        type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory"
        factory
="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5" brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" /> <Resource name="topic/topic0" auth="Container"
type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="TestTopic" />

在Web工廠中編寫程式碼

新建一個釋出者Servlet

package pubSub;

import java.io.IOException;
import java.io.PrintWriter;

import javax.naming.InitialContext;
import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.jms.Topic; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.TopicPublisher; import javax.jms.DeliveryMode; import javax.jms.TopicSession; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; /** * Servlet implementation class JMSTest */ @WebServlet("/Publish") public class Publisher extends HttpServlet { private static final long serialVersionUID = 1L; /** * @see HttpServlet#HttpServlet() */ public Publisher() { super(); // TODO Auto-generated constructor stub } /** * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse * response) */ protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { PrintWriter out = response.getWriter(); try { // get the initial context InitialContext ctx = new InitialContext(); // lookup the topic object Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0"); // lookup the topic connection factory TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx .lookup("java:comp/env/topic/connectionFactory"); // create a topic connection TopicConnection topicConn = connFactory.createTopicConnection(); // create a topic session TopicSession topicSession = topicConn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // create a topic publisher TopicPublisher topicPublisher = topicSession.createPublisher(topic); topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // create the "Hello World" message TextMessage message = topicSession.createTextMessage(); message.setText("Hello World"); // publish the messages topicPublisher.publish(message); // print what we did out.write("Message published: " + message.getText()); // close the topic connection topicConn.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse * response) */ protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // TODO Auto-generated method stub } }

新建一個訂閱者Servlet

package pubSub;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class Receive
 */
@WebServlet("/Subscribe")
public class Subscriber extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public Subscriber() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        PrintWriter out = response.getWriter();

        try {
            // get the initial context
            InitialContext ctx = new InitialContext();

            // lookup the topic object
            Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");

            // lookup the topic connection factory
            TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx
                    .lookup("java:comp/env/topic/connectionFactory");

            // create a topic connection
            TopicConnection topicConn = connFactory.createTopicConnection();

            // create a topic session
            TopicSession topicSession = topicConn.createTopicSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // create a topic subscriber
            TopicSubscriber topicSubscriber = topicSession
                    .createSubscriber(topic);

            // start the connection
            topicConn.start();

            // receive the message
            TextMessage message = (TextMessage) topicSubscriber.receive();

            // print the message
            out.write("Message received: " + message.getText());

            // close the topic connection
            topicConn.close();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
    }

}

執行Web工程,分別開啟多個標籤訪問訂閱servlet,然後訪問釋出servlet,結果如下:

在訂閱者訂閱訊息的時候,一開始沒接收到訊息,一旦釋出者釋出訊息後,訂閱者馬上收到訊息。

參考資料

相關推薦

JMS釋出/訂閱訊息傳送例子

前言 基於上篇文章"基於Tomcat + JNDI + ActiveMQ實現JMS的點對點訊息傳送"很容易就可以編寫一個釋出/訂閱訊息傳送例子,相關環境準備與該篇文章基本類似,主要的區別如下。 在Tomcat中配置JNDI 配置連線工廠和話題 <Resource name="topi

RocketMQ中介軟體訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)

一、專案引用 <dependency>     <groupId>com.foriseland.fjf.mq</groupId>     <artifactI

Kafka-API中介軟體MQ訊息佇列在Maven專案中的配置使用操作 (分散式釋出訂閱訊息系統)

一、 Maven依賴 <dependency> <groupId>com.foriseland.fjf.mq</groupId> <artifactId>fjf-mq-kafka</artifactId> &

ActiveMQ實戰之 Topic釋出訂閱訊息

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

ActiveMQ之釋出- 訂閱訊息模式實現

一、概念 釋出者/訂閱者模型支援向一個特定的訊息主題釋出訊息。0或多個訂閱者可能對接收來自特定訊息主題的訊息感興趣。在這種模型下,釋出者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得訊息 在釋出者和訂閱者之間存在時間依賴性。釋出者需要建

分散式釋出訂閱訊息系統Kafka

文章目錄 Kafka概述 Kafka架構及核心概念 Kafka單節點單Broker部署之Zookeeper安裝 單節點單Broker部署 單節點多Broker部署及使用 Kafka容錯性測試

Apache Pulsar:雅虎開發的企業級釋出訂閱訊息系統

英文原文:https://streaml.io/blog/intro-to-pulsar/中文翻

[Python]zeromq:socket request/receive, 釋出/訂閱訊息, 程序間通訊

Request/Response Server: import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: mes

redis 的釋出訂閱訊息佇列

demo 一個釋出者,2個訂閱者 程式碼塊 Shell ➜ ~ redis-cli 127.0.0.1:6379> 127.0.0.1:6379> 127.0.0.1:6379> PUBLISH redisChat "learn publish su

分散式釋出訂閱訊息系統 Kafka

kafka是一種高吞吐量的分散式釋出訂閱訊息系統,她有如下特性: 通過O(1)的磁碟資料結構提供訊息的持久化,這種結構對於即使數以TB的訊息儲存也能夠保持長時間的穩定效能。 高吞吐量:即使是非常普通的硬體kafka也可以支援每秒數十萬的訊息。 支援通過kafka伺服器

釋出 訂閱 訊息系統

1.從監聽與釋出說起 我們寫js程式碼的時候都知道有這樣的事件:我們註冊一個click方法 ,此時我們就為這個按鈕添加了“監聽”,基於“點選”事件的監聽。以此來實現點選按鈕提交表單資料的目的,在這裡,

釋出訂閱訊息系統--kafka的解析。

Kafka介紹: Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功

Kafka(分散式釋出訂閱訊息系統)

http://kafka.apache.org/目前越來越多的開源分散式處理系統如Apache Storm、Spark都支援與Kafka整合。 使用場景:設想這樣一個情景:想分析使用者在網站上的的瀏覽行為。這些瀏覽日誌,存資料庫浪費,直接存硬碟又怕到時候操作效率低。 此時,

ActiveMQ釋出-訂閱訊息模式(同點對點模式的區別)

點對點與釋出訂閱最初是由JMS定義的。這兩種模式主要區別或解決的問題就是傳送到佇列的訊息能否重複消費(多訂閱) 點對點: 訊息生產者生產訊息傳送到queue中,然後訊息消費者從queue中取出並且消費訊息。這裡要注意:  訊息被消費以後,queue中不再有儲存,所以訊息消費

大資料系列之分散式釋出訂閱訊息系統Kafka(一)Kafka簡介,組成,叢集安裝

1.Kafka簡介   Kafka如同JMS(Java Message Service)一樣,是一箇中間件,在異構系統間通訊,為不同的系統之間提供服務。我們知道JMS通過佇列(一對一)與主題(一對多)兩種形式提供服務,而Kafka則通過主題(topic),來給一組消費者提供

大資料系列之分散式釋出訂閱訊息系統Kafka(四)Kafka與Flume的3種整合

前面我們已經介紹了Flume,現在我們將Kafka與Flume整合 先看一下Flume的結構組成:            我們可以發現,將Flume與Kafka進行整合無非3種情況,Flume作為生產者——Sink輸出到Kafka,Flume作為消費者——Source接

分散式釋出訂閱訊息系統 Kafka 架構設計

底層API class SimpleConsumer { /* Send fetch request to a broker and get back a set of messages. */ public ByteBufferMessageSet fetch(FetchRequest re

訊息佇列-ActiveMQ學習筆記(三)-釋出-訂閱訊息模式實現

釋出-訂閱訊息模式與點對點模式類似,只不過在session建立訊息佇列時,由session.createQuene()變為session.createTopic()。 訊息釋出者程式碼: package com.feiyang.activemq2; import java

分散式釋出訂閱訊息系統—Apache Kafka

1.什麼是Kafka Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。

JMS傳送和接收例項-釋出/訂閱模式

傳送訊息 不管是將訊息傳送到佇列還是釋出到主題,程式設計的步驟是相同的,差別在於使用不同的JMS物件。具體定義見表: 傳送訊息的過程大體分為以下幾步; 1、獲得一個Weblogic Server上下文的引用; 2、建立連線工廠; 3、使用連線工廠建立一個連線; 4、使用連