spring4學習記錄08-呼叫遠端服務RPC(非同步,activeMQ)
簡介
上一篇文章說的是應用之間同步進行互動的場景,這裡介紹非同步的使用。
概念
同步,當客戶端呼叫遠端方法時,客戶端必須等待遠端方法完成之後,才能繼續執行,即使遠端方法並不返回任何訊息,客戶端也必須阻塞到服務完成。
非同步,客戶端不需要等待伺服器處理訊息,甚至不需要等待客戶端把訊息傳送給了服務端,就可以繼續執行後面的流程,這是因為客戶端假定服務端最終可以收到並處理這條訊息。
傳送訊息
在非同步訊息中主要有兩個概念:訊息代理和目的地。
當一個應用傳送訊息時,會將訊息交給一個訊息代理,訊息代理類似郵局。訊息代理可以確保訊息被投遞到指定的目的地,目的地類似於郵筒。同時解放傳送者,使傳送者能夠繼續進行其他業務。
兩種通用的 目的地:佇列和主題。
佇列:點對點模型
訊息傳送 -- 》 佇列 --》接收者
主題:釋出-訂閱模型
訂閱者A
訊息傳送 -- > 主題 --> 訂閱者B
訂閱者C
使用jms傳送訊息基於activeMQ
Java訊息服務(Java message service,JMS)是一個Java標準,定義了使用訊息代理的通用API。
spring基於模板的抽象為JMS功能提供了支援,這個模板就是 jmstemplate。使用jmstemplate,能夠非常容易地在訊息生產方傳送佇列和主題,在接收方也非常容易接收訊息。
但是傳送訊息還需要一個訊息代理,這裡採用activeMQ。
在spring中搭建訊息代理
解壓到D盤
啟動訊息代理 開啟這個bat檔案
D:\apache-activemq-5.14.3\bin\win64\activemq.bat
建立連線工廠
ConnectionFactory是用於產生到JMS伺服器的連結的,Spring為我們提供了多個ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory對於建立JMS伺服器連結的請求會一直返回同一個連結,並且會忽略Connection的close方法呼叫。CachingConnectionFactory繼承了SingleConnectionFactory,所以它擁有SingleConnectionFactory的所有功能,同時它還新增了快取功能,它可以快取Session、MessageProducer和MessageConsumer。
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>
Spring提供的ConnectionFactory只是Spring用於管理ConnectionFactory的,真正產生到JMS伺服器連結的ConnectionFactory還得是由JMS服務廠商提供,並且需要把它注入到Spring提供的ConnectionFactory中。我們這裡使用的是ActiveMQ實現的JMS,所以在我們這裡真正的可以產生Connection的就應該是由ActiveMQ提供的ConnectionFactory。所以定義一個ConnectionFactory的完整程式碼應該如下所示
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.7.234:61616"/>
</bean>
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裡面注入一個ActiveMQConnectionFactory可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗。當使用PooledConnectionFactory時,我們在定義一個ConnectionFactory時應該是如下配置
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.7.234:61616"/>
</bean>
配置生產者
生產者負責產生訊息併發送到JMS伺服器
利用Spring為我們提供的JmsTemplate類來處理訊息傳送 對於訊息傳送者而言,它在傳送訊息的時候要知道自己該往哪裡發,為此,我們在定義JmsTemplate的時候需要往裡面注入一個Spring提供的ConnectionFactory物件
<!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
<bean id="senderJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- true/訂閱模式傳送,false/點對點的模式傳送 -->
<property name="pubSubDomain" value="false"/>
</bean>
配置目的地
用JmsTemplate進行訊息傳送的時候,我們需要知道訊息傳送的目的地(destination)
<!-- 設定兩個目的地,一個是佇列形式,一個是主題模式(訂閱/釋出) -->
<bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 指定佇列名稱 -->
<constructor-arg>
<value>myQueue</value>
</constructor-arg>
</bean>
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 指定主題名稱 -->
<constructor-arg>
<value>myTopic</value>
</constructor-arg>
</bean>
定義傳送訊息的介面與實現類
介面類
package com.jx.spring.jms.server;
public interface AlertService {
void sendUserMsg(User user);
}
實現類
package com.jx.spring.jms.server;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
public class AlertServiceImpl implements AlertService{
private JmsOperations jmsOperations; //注入jms模板
@Override
public void sendUserMsg(final String dest , final String msg) {
jmsOperations.send( //傳送訊息
dest, //指定目的地
new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg); //建立訊息
}
}
);
}
public JmsOperations getJmsOperations() {
return jmsOperations;
}
public void setJmsOperations(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
}
為傳送訊息介面注入 生產者
<!-- 注入AlertServiceImpl -->
<bean id="alertService" class="com.jx.spring.jms.server.AlertServiceImpl">
<property name="jmsOperations" ref="sendTmsTemplate"/>
</bean>
編寫測試傳送訊息入口類
public static void main(String[] args) {
ClassPathXmlApplicationContext cxt = new ClassPathXmlApplicationContext("server.xml");
AlertServiceImpl alert = (AlertServiceImpl) cxt.getBean("alertService");
String dest = "myQueue";
String msg = "hello";
alert.sendUserMsg(dest , msg);
}
配置消費者
生產者往指定目的地Destination傳送訊息後,接下來就是消費者對指定目的地的訊息進行消費了。
消費者是如何知道有生產者傳送訊息到指定目的地Destination了呢?這是通過Spring為我們封裝的訊息監聽容器MessageListenerContainer實現的,它負責接收資訊,並把接收到的資訊分發給真正的MessageListener進行處理。每個消費者對應每個目的地都需要有對應的MessageListenerContainer。對於訊息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪裡去監聽,也就是說它還需要知道去監聽哪個JMS伺服器,這是通過在配置MessageConnectionFactory的時候往裡面注入一個ConnectionFactory來實現的。所以我們在配置一個MessageListenerContainer的時候有三個屬性必須指定,一個是表示從哪裡監聽的ConnectionFactory;一個是表示監聽什麼的Destination;一個是接收到訊息以後進行訊息處理的MessageListener。
Spring一共為我們提供了兩種型別的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。
SimpleMessageListenerContainer會在一開始的時候就建立一個會話session和消費者Consumer,並且會使用標準的JMS MessageConsumer.setMessageListener()方法註冊監聽器讓JMS提供者呼叫監聽器的回撥函式。它不會動態的適應執行時需要和參與外部的事務管理。相容性方面,它非常接近於獨立的JMS規範,但一般不相容Java EE的JMS限制。
大多數情況下我們還是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會動態的適應執行時需要,並且能夠參與外部的事務管理。它很好的平衡了對JMS提供者要求低、先進功能如事務參與和相容Java EE環境。
定義處理訊息的MessageListener
package com.jx.spring.jms.client;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener{
@Override
public void onMessage(Message msg) {
TextMessage text = (TextMessage) msg;
System.out.println("client 1 收到一條訊息");
try {
System.out.println(text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
要定義處理訊息的MessageListener我們只需要實現JMS規範中的MessageListener介面就可以了。MessageListener介面中只有一個方法onMessage方法,當接收到訊息的時候會自動呼叫該方法
配置訊息監聽
<bean id="myMessageListener" class="com.jx.spring.jms.client.MyMessageListener"/>
配置訊息監聽容器
這裡省略connectionFactory 、destination 這兩個需要與傳送端保持一致即可
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="myQueue" />
<property name="messageListener" ref="myMessageListener" />
</bean>
編寫接收端
public static void main(String[] args) {
ClassPathXmlApplicationContext cxt = new ClassPathXmlApplicationContext("client.xml");
System.out.println("接收者1啟動");
}
測試結果
執行傳送端測試類,與接收端測試類 (先後執行哪個端都可以)
控制檯輸出
附上完整的配置與Java類
傳送端spring配置檔案
<!-- 配置連線工廠,指定訊息代理的URL -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.7.234:61616"/>
</bean>
<!-- 配置傳送者 -->
<bean id="sendTmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- true/訂閱模式傳送,false/點對點的模式傳送 -->
<property name="pubSubDomain" value="false"/>
</bean>
<!-- 設定兩個目的地,一個是佇列形式,一個是主題模式(訂閱/釋出) -->
<bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 指定佇列名稱-->
<constructor-arg>
<value>myQueue</value>
</constructor-arg>
</bean>
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 指定主題名稱 -->
<constructor-arg>
<value>myTopic</value>
</constructor-arg>
</bean>
<!-- 注入AlertServiceImpl -->
<bean id="alertService" class="com.jx.spring.jms.server.AlertServiceImpl">
<property name="jmsOperations" ref="sendTmsTemplate"/>
</bean>
傳送端介面與實現類
介面
public interface AlertService {
void sendUserMsg(String dest , String msg);
}
實現類
package com.jx.spring.jms.server;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
public class AlertServiceImpl implements AlertService{
private JmsOperations jmsOperations; //注入jms模板
@Override
public void sendUserMsg(final String dest , final String msg) {
jmsOperations.send( //傳送訊息
dest, //指定目的地
new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg); //建立訊息
}
}
);
}
public JmsOperations getJmsOperations() {
return jmsOperations;
}
public void setJmsOperations(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
}
傳送端傳送測試類
public static void main(String[] args) {
ClassPathXmlApplicationContext cxt = new ClassPathXmlApplicationContext("server.xml");
AlertServiceImpl alert = (AlertServiceImpl) cxt.getBean("alertService");
String dest = "myQueue";
String msg = "hello";
alert.sendUserMsg(dest , msg);
}
接收端配置檔案
<!-- 配置連線工廠,指定訊息代理的URL -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="10"/>
</bean>
<bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.7.234:61616"/>
</bean>
<!-- 設定兩個目的地,一個是佇列形式,一個是主題模式(訂閱/釋出) -->
<bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
<!-- 指定佇列名稱 -->
<constructor-arg>
<value>myQueue</value>
</constructor-arg>
</bean>
<bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic">
<!-- 指定主題名稱 -->
<constructor-arg>
<value>myTopic</value>
</constructor-arg>
</bean>
<!-- 訊息監聽器 -->
<bean id="myMessageListener" class="com.jx.spring.jms.client.MyMessageListener"/>
<!-- 訊息監聽容器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="myQueue" />
<property name="messageListener" ref="myMessageListener" />
</bean>
接收端監聽類
package com.jx.spring.jms.client;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener{
@Override
public void onMessage(Message msg) {
TextMessage text = (TextMessage) msg;
System.out.println("client 1 收到一條訊息");
try {
System.out.println(text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接收端測試類
public static void main(String[] args) {
ClassPathXmlApplicationContext cxt = new ClassPathXmlApplicationContext("client.xml");
System.out.println("接收者1啟動");
}
其他說明
1 傳送端,接收端 連線工廠保持一致,目的地設定保持一致
2 若要傳送訂閱/釋出模式的,修改傳送者 pubSubDomain 改成 true
<!-- 配置傳送者 -->
<bean id="sendTmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
<property name="connectionFactory" ref="connectionFactory"/>
<!-- true/訂閱模式傳送,false/點對點的模式傳送 -->
<property name="pubSubDomain" value="false"/>
</bean>
3 MessageListener 是JMS標準的,純粹的接收訊息使用
SessionAwareMessageListener 是Spring為我們提供的,它不是標準的JMS MessageListener,可以在接收訊息的時候回覆一條訊息給傳送端
相關推薦
spring4學習記錄08-呼叫遠端服務RPC(非同步,activeMQ)
簡介 上一篇文章說的是應用之間同步進行互動的場景,這裡介紹非同步的使用。 概念 同步,當客戶端呼叫遠端方法時,客戶端必須等待遠端方法完成之後,才能繼續執行,即使遠端方法並不返回任何訊息,客戶端也必須阻塞到服務完成。 非同步,客戶端不需要等待伺服器處理
RPC呼叫遠端服務
RPC(Remote Procedure Call Protocol,遠端過程呼叫協議),可以使本機像呼叫本地類一樣呼叫遠端服務,只需要提供一個介面給呼叫端,就可以使用RPC工具類拿到遠端服務代理,進而呼叫遠端服務(釋出的RPC服務)。關於RPC的具體介紹可以參見知乎上的回
HttpClient實現簽名並呼叫遠端服務
使用treeMap傳參,實現字典序排列計算簽名並呼叫請求 // 轉發獲取角色等級請求 Map<String, String> params = new TreeMap<String, String>(); params.put("zoneId", zo
kubernetes學習記錄(5)——服務發現機制與Cluster DNS的安裝(無CA認證版)
服務發現機制 Kubernetes提供了兩種發現Service的方法: 1.環境變數 當Pod執行的時候,Kubernetes會將之前存在的Service的資訊通過環境變數寫到Pod中。 這種方法要求Pod必須要在Service之後啟動。 在Ser
jvm學習記錄--08 效能監控Java工具篇
前言 在jdk中提供了一些工具幫助開發人員解決一些問題。上一篇中提到的jps,jstack就是出自jdk。 jdk/bin目錄下提供了很多exe檔案其實都是jar檔案的包裝,真正的實現在jdk/lib/tools.jar中 jps–檢視Java
SpringBoot專案中使用webservice呼叫遠端服務
有一個webservice專案,對外提供資源查詢服務,使用了Apache的cxf框架,soap協議。如今都使用微服務,在一個springboot專案中需要遠端呼叫這個資源查詢服務,RPC的一種,既然是RPC,就需要有客戶端的存根(stub)和服務端的骨架(sk
基於HttpClient實現RPC遠端服務呼叫功能【SpringBoot專案】
一、什麼是RPC? RPC(Remote Procedure Call)—遠端過程呼叫,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之間攜帶資訊資料。在OSI網路通訊模
如何實現RPC遠端服務呼叫
想要完成RPC呼叫,需要解決四個問題: 客戶端與服務端如何建立網路連線 服務端如何處理請求 資料傳輸採用什麼協議 資料該如何序列化和反序列化 1.客戶端與服務端如何建立網路連線 HTTP通訊 HTTP通訊是基於應用層HTTP協議的,而HT
遠端呼叫服務(RPC)和訊息佇列(Message Queue)對比及其適用/不適用場合分析
最近考慮把公司系統重構升級,將原有的垂直MVC架構遷移為分散式系統,因此著重瞭解了下遠端呼叫服務(RPC)和訊息佇列(MQ)。RPC和MQ都是用於分散式系統的兩個關鍵技術,並且裡面都有服務提供者和消費者的概念,可在一定程度上對系統進行解耦。但對於彼此應用場景的區分還不是特別
【學習記錄】CentOS建立Git服務器
font cat 添加 gen bsp microsoft 一行 highlight 登錄 0.所有代碼沒有特別說明都是在root權限下執行,其他用戶權限執行失敗時候,切換root用戶或者添加sudo前綴。 1.安裝git,並創建git用戶 yum install g
linux學習記錄-----vsftpdf服務安裝配置
安裝 關閉防火墻 防火墻 換行 服務器 查看 pass 17.1 word 1、掛載iso鏡像到mnt下:mount /dev/cdrom /mnt 2、安裝vsftpd服務器,在/mnt/Packages/下有所需要的安裝包 rpm -ivh vsftpd-*****
SpringCloud微服務搭建之fegin客戶端遠端服務呼叫
什麼是Feign Feign是一個宣告式的偽Http客戶端,它使得寫Http客戶端變得更簡單。使用Feign,只需要建立一個介面並註解。它具有可插拔的註解特性,可使用Feign 註解和JAX-RS註解。Feign支援可插拔的編碼器和解碼器。Feign預設集成了Ribbon,並和Eureka結合,預設實現了負
SpringBoot學習筆記08——解決Cache快取同類中呼叫失敗問題
問題描述 今天遇到了一個問題,使用快取的情況下,如果在快取服務類方法中呼叫快取的方法會呼叫失敗,就是this.快取方法名,這樣使用就不會從快取中獲取資料,而是直接呼叫快取方法,錯誤示例程式碼如下: package com.youyou.address.service; import org
dubbo其實很簡單,就是一個遠端服務呼叫的框架(1)
dubbo專題」dubbo其實很簡單,就是一個遠端服務呼叫的框架(1) 一、dubbo是什麼? 1)本質:一個Jar包,一個分散式框架,,一個遠端服務呼叫的分散式框架。 既然是新手教學,肯定很多同學不明白什麼是分散式和遠端服務呼叫,為什麼要分散式,為什麼要遠端呼叫。我簡單畫個對比圖說明(
Android-NDK學習記錄5-Jni呼叫例項方法
上一篇看了jni呼叫靜態方法和修改靜態欄位,這一篇學習了jni呼叫例項方法和修改例項欄位 呼叫例項方法,步驟: 找到類:利用FindClass,找到類 找到要呼叫的方法id:利用GetMethodID,找到方法id 建立例項物件:利用例項物件的構
Android-NDK學習記錄4-C呼叫Java靜態方法修改靜態欄位
一. jni互動相關-方法簽名 方法簽名在jni的使用中經常都會用到,在java中會有過載,那麼定位到一個方法的方式:類+方法名稱+方法簽名,那麼我們先學習下簽名規則: 基本型別簽名: 咱們基本型別有各自的簽名,如下表 型別名
遠端呼叫方式 ==> RPC與Http的比較
一.遠端呼叫方式 無論是微服務還是分散式服務(都是SOA,都是面向服務程式設計),都面臨著服務間的遠端呼叫。那麼服務間的遠端呼叫方式有哪些呢? 常見的遠端呼叫方式有以下幾種: RPC:Remote Produce Call遠端過程呼叫,類似的還有RMI(Remote M
MySQL通過Navicat實現遠端連線的過程 學習記錄
1.首先使用localhost登入到想要進行遠端連線的資料庫 2.開啟阿里雲伺服器上安裝的windows系統,以管理員許可權開啟命令提示視窗,輸入如下命令: mysql> grant all privileges on *.* to 'root'@'%' ident
Spring Cloud 入門教程(六): 用宣告式REST客戶端Feign呼叫遠端HTTP服務
首先簡單解釋一下什麼是宣告式實現? 要做一件事, 需要知道三個要素,where, what, how。即在哪裡( where)用什麼辦法(how)做什麼(what)。什麼時候做(when)我們納入how的範疇。 1)程式設計式實現: 每一個要素(where,wh
使用Spring Cloud Feign作為HTTP客戶端呼叫遠端HTTP服務
在Spring Cloud Netflix棧中,各個微服務都是以HTTP介面的形式暴露自身服務的,因此在呼叫遠端服務時就必須使用HTTP客戶端。我們可以使用JDK原生的URLConnection、Apache的Http Client、Netty的非同步HTTP Client,