如何在優雅地Spring 中實現消息的發送和消費
作者簡介:遼天,阿裏巴巴技術專家,Apache RocketMQ 內核控,擁有多年分布式系統研發經驗,對Microservice、Messaging和Storage等領域有深刻理解, 目前專註 RocketMQ 內核優化以及 Messaging 生態建設。
通過本文,您將了解到:
Spring的消息框架介紹
rocketmq-spring-boot具體實現
使用示例
插播一條廣告:本周六下午,Apache RocketMQ 開發者沙龍將來到杭州,歡迎大家到現場,活動詳情請點擊“閱讀原文”。
前言
上世紀90年代末,隨著Java EE(Enterprise Edition)的出現,特別是Enterprise Java Beans的使用需要復雜的描述符配置和死板復雜的代碼實現,增加了廣大開發者的學習曲線和開發成本,由此基於簡單的XML配置和普通Java對象(Plain Old Java Objects)的Spring技術應運而生,依賴註入(Dependency Injection), 控制反轉(Inversion of Control)和面向切面編程(AOP)的技術更加敏捷地解決了傳統Java企業及版本的不足。
隨著Spring的持續演進,基於註解(Annotation)的配置逐漸取代了XML文件配置, 2014年4月1日,Spring Boot 1.0.0正式發布,它基於“約定大於配置”(Convention over configuration)這一理念來快速地開發、測試、運行和部署Spring應用,並能通過簡單地與各種啟動器(如 spring-boot-web-starter)結合,讓應用直接以命令行的方式運行,不需再部署到獨立容器中。這種簡便直接快速構建和開發應用的過程,可以使用約定的配置並且簡化部署,受到越來越多的開發者的歡迎。
Apache RocketMQ是業界知名的分布式消息和流處理中間件,簡單地理解,它由Broker服務器和客戶端兩部分組成:
其中客戶端一個是消息發布者客戶端(Producer),它負責向Broker服務器發送消息;
另外一個是消息的消費者客戶端(Consumer),多個消費者可以組成一個消費組,來訂閱和拉取消費Broker服務器上存儲的消息。
為了利用Spring Boot的快速開發和讓用戶能夠更靈活地使用RocketMQ消息客戶端,Apache RocketMQ社區推出了spring-boot-starter實現。隨著分布式事務消息功能在RocketMQ 4.3.0版本的發布,近期升級了相關的spring-boot代碼,通過註解方式支持分布式事務的回查和事務消息的發送。
本文將對當前的設計實現做一個簡單的介紹,讀者可以通過本文了解將RocketMQ Client端集成為spring-boot-starter框架的開發細節,然後通過一個簡單的示例來一步一步的講解如何使用這個spring-boot-starter工具包來配置,發送和消費RocketMQ消息。
Spring 中的消息框架
順便在這裏討論一下在Spring中關於消息的兩個主要的框架,即Spring Messaging和Spring Cloud Stream。它們都能夠與Spring Boot整合並提供了一些參考的實現。和所有的實現框架一樣,消息框架的目的是實現輕量級的消息驅動的微服務,可以有效地簡化開發人員對消息中間件的使用復雜度,讓系統開發人員可以有更多的精力關註於核心業務邏輯的處理。
2.1 Spring Messaging
Spring Messaging是Spring Framework 4中添加的模塊,是Spring與消息系統集成的一個擴展性的支持。它實現了從基於JmsTemplate的簡單的使用JMS接口到異步接收消息的一整套完整的基礎架構,Spring AMQP提供了該協議所要求的類似的功能集。 在與Spring Boot的集成後,它擁有了自動配置能力,能夠在測試和運行時與相應的消息傳遞系統進行集成。
單純對於客戶端而言,Spring Messaging提供了一套抽象的API或者說是約定的標準,對消息發送端和消息接收端的模式進行規定,不同的消息中間件提供商可以在這個模式下提供自己的Spring實現:在消息發送端需要實現的是一個XXXTemplate形式的Java Bean,結合Spring Boot的自動化配置選項提供多個不同的發送消息方法;在消息的消費端是一個XXXMessageListener接口(實現方式通常會使用一個註解來聲明一個消息驅動的POJO),提供回調方法來監聽和消費消息,這個接口同樣可以使用Spring Boot的自動化選項和一些定制化的屬性。
如果有興趣深入的了解Spring Messaging及針對不同的消息產品的使用,推薦閱讀這個文件。參考Spring Messaging的既有實現,RocketMQ的spring-boot-starter中遵循了相關的設計模式並結合RocketMQ自身的功能特點提供了相應的API(如,順序,異步和事務半消息等)。
2.2 Spring Cloud Stream
Spring Cloud Stream結合了Spring Integration的註解和功能,它的應用模型如下:
Spring Cloud Stream框架中提供一個獨立的應用內核,它通過輸入(@Input)和輸出(@Output)通道與外部世界進行通信,消息源端(Source)通過輸入通道發送消息,消費目標端(Sink)通過監聽輸出通道來獲取消費的消息。這些通道通過專用的Binder實現與外部代理連接。開發人員的代碼只需要針對應用內核提供的固定的接口和註解方式進行編程,而不需要關心運行時具體的Binder綁定的消息中間件。在運行時,Spring Cloud Stream能夠自動探測並使用在classpath下找到的Binder。
這樣開發人員可以輕松地在相同的代碼中使用不同類型的中間件:僅僅需要在構建時包含進不同的Binder。在更加復雜的使用場景中,也可以在應用中打包多個Binder並讓它自己選擇Binder,甚至在運行時為不同的通道使用不同的Binder。
Binder抽象使得Spring Cloud Stream應用可以靈活的連接到中間件,加之Spring Cloud Stream使用利用了Spring Boot的靈活配置配置能力,這樣的配置可以通過外部配置的屬性和Spring Boo支持的任何形式來提供(包括應用啟動參數、環境變量和application.yml或者application.properties文件),部署人員可以在運行時動態選擇通道連接destination(例如,Kafka的topic或者RabbitMQ的exchange)。
Binder SPI的方式來讓消息中間件產品使用可擴展的API來編寫相應的Binder,並集成到Spring Cloud Steam環境,目前RocketMQ還沒有提供相關的Binder,我們計劃在下一步將完善這一功能,也希望社區裏有這方面經驗的同學積極嘗試,貢獻PR或建議。
spring-boot-starter的實現
在開始的時候我們已經知道,spring boot starter構造的啟動器對於使用者是非常方便的,使用者只要在pom.xml引入starter的依賴定義,相應的編譯,運行和部署功能就全部自動引入。因此常用的開源組件都會為Spring的用戶提供一個spring-boot-starter封裝給開發者,讓開發者非常方便集成和使用,這裏我們詳細的介紹一下RocketMQ(客戶端)的starter實現過程。
3.1. spring-boot-starter的實現步驟
對於一個spring-boot-starter實現需要包含如下幾個部分:
在pom.xml的定義
定義最終要生成的starter組件信息
<groupId>org.apache.rocketmq</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
定義依賴包,
它分為兩個部分: A、Spring自身的依賴包; B、RocketMQ的依賴包
<dependencies>
如何在優雅地Spring 中實現消息的發送和消費