1. 程式人生 > >Java中介軟體JMS(四)之ActiveMQ整合spring之類轉換(三)

Java中介軟體JMS(四)之ActiveMQ整合spring之類轉換(三)

前幾章都是直接傳送MapMessage型別的資料,拿前面的例子來講,如果生產者傳送的是TextMessage,消費者也是必須TextMessage;如果我們自己要傳送的資料不是TextMessage型別,而消費者還是TextMessage的,那該怎麼辦?難道每次接受後都要增加一個轉換方法麼?其實spring早就考慮到這種情況了。轉化器在很多元件中都是必不缺少的東西Spring的MessageConverter介面提供了對訊息轉換的支援。

1、轉換類的相關程式碼POJO

新建一個類MsgPoJo,就是一個簡單的Pojo類。具體程式碼如下:

package jms.mq.spring;

import java.io.Serializable;

public class MsgPoJo implements Serializable{
	private String id;
	private String text;
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getText() {
		return text;
	}
	public void setText(String text) {
		this.text = text;
	}	
}

2.轉換類的實現

新建一個類MsgConverter.java,實現MessageConverter介面。生成的程式碼如下

package jms.mq.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;

public class MsgConverter implements MessageConverter{

	@Override
	public Object fromMessage(Message message) throws JMSException,
	MessageConversionException {
		if (!(message instanceof TextMessage)) {
			throw new MessageConversionException("Message is not TextMessage");
		}
		System.out.println("--轉換接收的訊息--");
		TextMessage textMessage = (TextMessage) message;
		MsgPoJo msgPojo = new MsgPoJo();
		String[] texts=textMessage.getText().split(",");
		msgPojo.setId(texts[0]);
		msgPojo.setText(texts[1]);
		return msgPojo;
	}

	@Override
	public Message toMessage(Object object, Session session) throws JMSException,
	MessageConversionException {
		if (!(object instanceof MsgPoJo)) {
			throw new MessageConversionException("obj is not MsgPojo");
		}
		System.out.println("--轉換髮送的訊息--");
		MsgPoJo msgPojo = (MsgPoJo) object;
		TextMessage textMessage = session.createTextMessage();
		textMessage.setText(msgPojo.getId()+","+msgPojo.getText());
		return  textMessage;
	}
}

程式碼很簡單就是做些轉換,有fromMessage和toMessage兩個方法,真好對應傳送轉換toMessage和接受轉換fromMessage。此時,傳送和接收訊息要換成template.convertAndSend(message);template.receiveAndConvert()。接下來我做一些配置,讓spring知道我們的轉換類。修改applicationContext.xml中jms模版配置的程式碼,修改後的程式碼如下:

	<!-- 類轉換 -->
	<bean id="msgConverter" class="jms.mq.spring.MsgConverter"></bean>
	
	<!-- 配置Jms模板 -->
	<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="queueDest" />
		<!--<property name="receiveTimeout" value="10000" /> -->
		<!-- 類轉換 -->
		<property name="messageConverter" ref="msgConverter"></property>
	</bean>

注意:如果你有佇列監聽容器配置,配置jmsQueueTemplate和jmsTopicTemplate可能與佇列容器配置衝突。

3、業務相關程式碼和配置

在QueueProducerService.java增加convertAndSend()方法並在其實現類中實現,實現類的程式碼如下:

package jms.mq.spring;

import java.util.Date;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class QueueProducerService{
	JmsTemplate jmsTemplate;

	Destination destination;

	public void send() {
		MessageCreator messageCreator = new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				TextMessage message = session.createTextMessage();
				message.setText("QueueProducerService傳送訊息"+new Date());
				return message;
			}

		};
		jmsTemplate.send(this.destination,messageCreator);
	}
	
	public void convertAndSend(){
		MsgPoJo msgPojo = new MsgPoJo();
		msgPojo.setId("1");
		msgPojo.setText("first msg");
		System.out.println("--傳送訊息:msgPojo.id為"+msgPojo.getId()+";msgPojo.text為"+msgPojo.getText());
		jmsTemplate.convertAndSend(this.destination, msgPojo);
	}


	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}
	
	public void setDestination(Destination destination) {
		this.destination = destination;
	}
}

同樣在QueueConsumerService.java中增加receiveAndConvert()方法並在其實現類中實現,實現類的程式碼如下:

package jms.mq.spring;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.springframework.jms.core.JmsTemplate;


public class QueueConsumerService{

	JmsTemplate jmsTemplate;

	Destination destination;

	public void receive() {
		TextMessage message = (TextMessage) jmsTemplate.receive();
		try {
			System.out.println("QueueConsumerService收到訊息:"+message.getText());

		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

	public void receiveAndConvert() {
		MsgPoJo msgPojo = (MsgPoJo)jmsTemplate.receiveAndConvert();
		if(msgPojo!=null){
			System.out.println("--收到訊息:msgPojo.id為"+msgPojo.getId()+";msgPojo.text為"+msgPojo.getText());
		}
	}

	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void setDestination(Destination destination) {
		this.destination = destination;
	}
}

修改我們的兩個測試類,增加對轉換方法的呼叫,不再贅述,直接上程式碼:

QueueConsumerTest.java測試類

package jms.mq.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class QueueConsumerTest {
	private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

	private static void receive() {
		QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");
		consumerService.receive();
	}

	private static void receiveAndConvert() {
		QueueConsumerService consumerService = (QueueConsumerService) appContext.getBean("queueConsumerService");
		consumerService.receiveAndConvert();
	}


	public static void main(String[] args) {
		//receive();
		receiveAndConvert();
	}
}

QueueProducerTest.java測試類

package jms.mq.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class QueueProducerTest {
	private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

	private static void send() {
		QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");
		producerService.send();
	}
	
	private static void convertAndSend() {
		QueueProducerService producerService = (QueueProducerService) appContext.getBean("queueProducerService");
		producerService.convertAndSend();
	}

	public static void main(String[] args) {
		//send();
		convertAndSend();
	}

}

程式碼編寫完畢,我們看一下我們的勞動成果。首先執行生產者類和消費者控制檯資訊如下:



收到的內容與發的內容相同,說明轉換成功了。如果這一部分的程式使用的佇列跟上面的一樣,那你會發現傳送的時候打印出的資訊不值上面的一個,還包括一個接收的資訊,這是為什麼呢?瞭解spring原理的人應該知道,spring是把所有類都載入到內容中,當然也包括我們上門寫的按個實現MessageListener的一個消費者類,他們也在執行,如果監聽的地址跟你送的地址正好相同的話,他也有可能收到這個資訊。所以在測試的時候要注意修改配置檔案。

	<bean id="queueProducerService" class="jms.mq.spring.QueueProducerService">
		<property name="jmsTemplate" ref="jmsQueueTemplate" />
		<property name="destination" ref="queueDest" />  
	</bean>

	<bean id="queueConsumerService" class="jms.mq.spring.QueueConsumerService">
		<property name="jmsTemplate" ref="jmsQueueTemplate" />
		<property name="destination" ref="queueDest" /> 
	</bean>

4、監聽器上的使用方式

我再來學習一下跟監聽器聯合使用的方式,只在釋出訂閱者模式上演示一下。我們先來修改釋出者的實現方式,在釋出者中增加convertAndSend方法並在其實現類中實現,訂閱者監聽器沒有類轉換,不用修改,釋出者修改後的程式碼如下:

package jms.mq.spring;

import java.util.Date;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import jms.spring.QueueProducerService;

public class TopicPublisherService{
	JmsTemplate jmsTemplate;
	 
	Destination destination;

	public void send() {
		MessageCreator messageCreator = new MessageCreator() {

			public Message createMessage(Session session) throws JMSException {
				TextMessage message = session.createTextMessage();
				message.setText("QueueProducerService傳送訊息"+new Date());
				return message;
			}
		};
		jmsTemplate.send(this.destination,messageCreator);
	}

	public void convertAndSend(Object obj) {
		System.out.println("--傳送PoJo物件...");
		jmsTemplate.convertAndSend(destination, obj);
	}

	
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void setDestination(Destination destination) {
		this.destination = destination;
	}

}

釋出訂閱者配置檔案如下

	<!-- 配置TopicJms模板 -->
	<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="defaultDestination" ref="topicDest" />
		<!-- 配置是否為釋出訂閱者模式,預設為false -->
		<property name="pubSubDomain" value="true" />
		<!--<property name="receiveTimeout" value="10000" /> -->
		<property name="messageConverter" ref="msgConverter"></property>
	</bean>
<bean id="topicPublisherService" class="jms.mq.spring.TopicPublisherService">
		<property name="jmsTemplate" ref="jmsTopicTemplate" />
		<!-- <property name="destination" ref="topicDest" /> -->
		<property name="destination" ref="topicSubscriberMessageListenerDest" /> 
	</bean>

	<bean id="topicSubscriberService" class="jms.mq.spring.TopicSubscriberService">
		<property name="jmsTemplate" ref="jmsTopicTemplate" />
		<property name="destination" ref="topicDest" />
	</bean>

修改上面的釋出測試類,修改增加對新增方法的呼叫,修改後的內容如下:

package jms.mq.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class TopicPublisherTest {
	private static ApplicationContext appContext = new ClassPathXmlApplicationContext( "applicationContext.xml");

	private static void send() {
		TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");
		topicPublisherService.send();
	}
	private static void convertAndSend() {
		TopicPublisherService topicPublisherService = (TopicPublisherService) appContext.getBean("topicPublisherService");
		MsgPoJo msgPoJo = new MsgPoJo();
		msgPoJo.setId("1");
		msgPoJo.setText("測試內容");
		topicPublisherService.convertAndSend(msgPoJo);
	}


	public static void main(String[] args) {
		//send();
		convertAndSend();
	}
}

執行釋出測試類,執行結果如下:


寫在到這裡,ActiveMQ與spring整合就講完了,主要講了ActiveMQ與spring的簡單整合,監聽器和類轉換這些主要功能.

呵呵,寫到不好,請大家不要拍磚。



相關推薦

Java中介軟體JMS()ActiveMQ整合spring之類轉換()

前幾章都是直接傳送MapMessage型別的資料,拿前面的例子來講,如果生產者傳送的是TextMessage,消費者也是必須TextMessage;如果我們自己要傳送的資料不是TextMessage型別,而消費者還是TextMessage的,那該怎麼辦?難道每次接受後都要增

Java中介軟體JMS()ActiveMQ整合spring之類轉換

前幾章都是直接傳送MapMessage型別的資料,拿前面的例子來講,如果生產者傳送的是TextMessage,消費者也是必須TextMessage;如果我們自己要傳送的資料不是TextMessage型別,而消費者還是TextMessage的,那該怎麼辦?難道每次接受後都要增加一個轉換方法麼?其實sprin

Java中介軟體JMS(五)JMS入門

  3).JBoss 是 JBoss 公司開發的一個免費開源的應用伺服器,它提供了 EJB 執行的環境,並能夠結合 EJB 進行 JMS 訊息的收取,支援點到點模型和釋出/訂閱模型。  4).ActiveMQ 是一個基於 Apache 2.0 Licenced 釋出的開放原始碼的 JMS 產品,它能夠提供點到

Java中介軟體JMS()ActiveMQ整合spring監聽器(二)

對於讓spring管理監聽的實現方式有兩種方法,一種是自己寫監聽器,然後交給spring的監聽介面卡管理,再由監聽容器管理監聽介面卡,另一種是寫一個實現MessageListener介面的類。第一種在第一章涉及到,但是沒有交給spring託管.其實實現的方法很簡單,在j2e

Java中介軟體JMSActiveMQ入門

1.ActiveMQ概述 企業訊息軟體從80年代起就存在,它不只是一種應用間訊息傳遞風格,也是一種整合風格。因此,訊息傳遞可以滿足應用間的通知和互相操作。但是開源的解決方案是到最近10年才出現的。Apache ActiveMQ就是其中一種。它使應用間能以非同步,鬆耦合方式交流。ActiveMQ 是Apa

() 訊息中介軟體——使用 JmsTemplate 向 ActiveMQ 傳送 Queue 型別訊息

文章目錄 前言 傳送類的編寫 建立 類 JmsTemplateQueueProduct 方法一 send 方法二 convertAndS

activemq訊息中介軟體--JMS概述(1)

1 JMS概述 目前現在很多的RPC中介軟體技術都有如下問題: (1)同步通訊,客戶端發出呼叫請求,必須等待服務端處理完成以後返回結果才能繼續執行。 (2)客戶和服務物件的生命週期緊密耦合,客戶程序和服務程序都必須正常進行,如果由於服務物件的崩潰和網路故障導致客戶請求不可達,客戶收到

jmsactiveMQspring整合進階-實現一種負載均衡

="http://www.springframework.org/schema/beans            http://www.springframework.org/schema/beans/spring-beans-2.5.xsd            http://www.springframe

(六) 訊息中介軟體——使用 JmsTemplate 向 ActiveMQ 傳送和獲取 Topic 型別訊息

文章目錄 前言 回顧 Queue 型別訊息的傳送和請求過程 ActiveMQTopic 和 JmsTemplate Topic 型別訊息的特性

(五) 訊息中介軟體——使用 JmsTemplate 向 ActiveMQ 獲取 Queue 型別訊息

文章目錄 前言 為什麼不將傳送和接收寫到同一篇文章中? 傳送和接收的聯絡和區別-本質上是對某一個 ActiveMQQueue 的操作 讀取 Queue 中的訊息

《大型網站系統與JAVA中介軟體實踐》 第六章 訊息中介軟體

    如何保證一致性                     &

《大型網站系統與JAVA中介軟體實踐》 第五章 資料訪問層

         兩階段提交                 &nbs

《大型網站系統與JAVA中介軟體實踐》 第一章

 主要內容  1、將單機發展為分散式 分散式也可以看為大型單機 控制儲存和計算單元變得更大了 2、 NIO BIO AIO 3、 互動呼叫常見方式: 名稱 、規則 、master、 4、分散式的難點     1)、單點故障 &nb

訊息中介軟體JMS

一.什麼是訊息中介軟體      訊息中介軟體利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息排隊模型,它可以在分散式環境下擴充套件程序間的通訊。   ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。Acti

訊息中介軟體學習1:ActiveMQ

TODO 歡迎使用Markdown編輯器 你好! 這是你第一次使用 Markdown編輯器 所展示的歡迎頁。如果你想學習如何使用Markdown編輯器, 可以仔細閱讀這篇文章,瞭解一下Markdown的基本語法知識。 新的改變 我們對Markdown編輯器進行了一些功能拓展與

訊息中介軟體入門「二」:在spring中使用activemq

訊息中介軟體入門「二」:在spring中使用activemq 在普通的java專案使用activemq一般需要經過:1.獲得連線工廠;2.建立連線;2.啟動連線;3.獲取會話(session);4.繫結連線地址(detination)5.獲得消費者/生產者;6.生產/消費訊息。