Akka-CQRS(11)- akka-http for http-web-service: Marshalling-資料序列化
前面幾篇討論了關於gRPC方式的前後端連線整合方式。gRPC也是一個開放的標準,但講到普及性就遠遠不及基於http/1.1協議的web-service了。特別是gRPC的前端程式設計還是有一定的門檻,所以作為一種開放的網路大平臺還是必須考慮用web-service方式的整合。平臺服務api可以有兩樣選擇:一種是傳統web-service方式,新的一種是rest api款式。rest api比較適合資料庫表的crud操作。在2017年我曾經寫了一系列部落格介紹akka-http,這裡就不再敘述它的細節了。這篇我們只聚焦在解決當前問題上。在POS控制平臺例子裡不會涉及到POST操作,應該全部是GET型別的,如:
http://192.168.11.189:2588/pos/logon?opr=1010 http://192.168.11.189:2588/pos/logoff http://192.168.11.189:2588/pos/logsales?acct=001&dpt=01&code=978111&qty=3&price=1200 http://192.168.11.189:2588/pos/subtotal?level=0 http://192.168.11.189:2588/pos/discount?disctype=2&grouped=true&code=481&percent=20
可以看到,請求部分只是帶引數的uri,不含entity資料部分,資料通過querystring提供。但返回會有幾種資料型別:POSResponse,TxnsItems,vchState,這些都曾經在Protobuffer用IDL定義過:
message PBVchState { //單據狀態 string opr = 1; //收款員 int64 jseq = 2; //begin journal sequence for read-side replay int32 num = 3; //當前單號 int32 seq = 4; //當前序號 bool void = 5; //取消模式 bool refd = 6; //退款模式 bool susp = 7; //掛單 bool canc = 8; //廢單 bool due = 9; //當前餘額 string su = 10; //主管編號 string mbr = 11; //會員號 int32 mode = 12; //當前操作流程:0=logOff, 1=LogOn, 2=Payment } message PBTxnItem { //交易記錄 string txndate = 1; //交易日期 string txntime = 2; //錄入時間 string opr = 3; //操作員 int32 num = 4; //銷售單號 int32 seq = 5; //交易序號 int32 txntype = 6; //交易型別 int32 salestype = 7; //銷售型別 int32 qty = 8; //交易數量 int32 price = 9; //單價(分) int32 amount = 10; //碼洋(分) int32 disc = 11; //折扣率 (%) int32 dscamt = 12; //折扣額:負值 net實洋 = amount + dscamt string member = 13; //會員卡號 string code = 14; //編號(商品、卡號...) string acct = 15; //賬號 string dpt = 16; //部類 } message PBPOSResponse { int32 sts = 1; string msg = 2; PBVchState voucher = 3; repeated PBTxnItem txnitems = 4; }
那麼概括我們現在的主要工作包括:Uri解析,HttpResponse例項的構建和傳輸。
首先,用akka-http搭建一個http server框架:
import akka.actor._ import akka.stream._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ object HttpServerDemo extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val route = path("hello") { complete {"hello, http server "} } val (port, host) = (8011,"192.168.11.189") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) /* bindingFuture.foreach(s => println(s.localAddress.getHostString)) bindingFuture.foreach(_.unbind()) bindingFuture.onComplete { case Success(value) => value.unbind() } */ }
用akka-http的server api很快就完成了一個簡單的http-server。下一步研究一下如何構建返回的HttpResponse:httpresponse是從server端傳送到client端的。這個過程包括把HttpResponse Entity裡的資料從某種型別轉換成通訊用的二進位制資料流、到了客戶端再轉換成目標型別。akka-http的資料轉換機制Marshaller/Unmarshaller是通過型別轉換的隱式例項來實現的,akka-http提供了多個標準型別資料轉換的隱式例項,如StringMarshaller:
implicit val ByteArrayMarshaller: ToEntityMarshaller[Array[Byte]] = byteArrayMarshaller(`application/octet-stream`) def byteArrayMarshaller(contentType: ContentType): ToEntityMarshaller[Array[Byte]] = Marshaller.withFixedContentType(contentType) { bytes => HttpEntity(contentType, bytes) } implicit val ByteStringMarshaller: ToEntityMarshaller[ByteString] = byteStringMarshaller(`application/octet-stream`) def byteStringMarshaller(contentType: ContentType): ToEntityMarshaller[ByteString] = Marshaller.withFixedContentType(contentType) { bytes => HttpEntity(contentType, bytes) } implicit val StringMarshaller: ToEntityMarshaller[String] = stringMarshaller(`text/plain`) def stringMarshaller(mediaType: MediaType.WithOpenCharset): ToEntityMarshaller[String] = Marshaller.withOpenCharset(mediaType) { (s, cs) => HttpEntity(mediaType withCharset cs, s) } def stringMarshaller(mediaType: MediaType.WithFixedCharset): ToEntityMarshaller[String] = Marshaller.withFixedContentType(mediaType) { s => HttpEntity(mediaType, s) } ...
因為akka-http提供了implicit val StringMarshaller,所以在上面的例子裡我可以直接寫成: complete("hello world!"),然後系統自動構建一個含字元型別資料entity的HttpResponse。Entity.dataBytes中的資料型別是由Entity.contentType指明的:
object ContentTypes { val `application/json` = ContentType(MediaTypes.`application/json`) val `application/octet-stream` = ContentType(MediaTypes.`application/octet-stream`) val `application/x-www-form-urlencoded` = ContentType(MediaTypes.`application/x-www-form-urlencoded`) val `text/plain(UTF-8)` = MediaTypes.`text/plain` withCharset HttpCharsets.`UTF-8` val `text/html(UTF-8)` = MediaTypes.`text/html` withCharset HttpCharsets.`UTF-8` val `text/xml(UTF-8)` = MediaTypes.`text/xml` withCharset HttpCharsets.`UTF-8` val `text/csv(UTF-8)` = MediaTypes.`text/csv` withCharset HttpCharsets.`UTF-8` val `application/grpc+proto` = ContentType(MediaTypes.`application/grpc+proto`) // used for explicitly suppressing the rendering of Content-Type headers on requests and responses val NoContentType = ContentType(MediaTypes.NoMediaType) }
客戶端收到HttpResponse後把收到的二進位制資料流轉換成MediaTypes指定的型別。當然,最基本的資料型別就是String了。所有客戶端都提供String型別的反序列化deserialization。理論上來講,我們可以用字元形式來描述任何型別資料,這樣我們可以把一個特殊型別例項轉成String,然後傳送給客戶端。客戶端再按照協議好的型別轉換規則把字元轉換成目標型別:
case class TextMessage(msg: String) val helloMsg: String = TextMessage("hello string message converter").toString val route = path("hello") { complete {helloMsg} }
不過,這種情況只適用於內部系統的資料交換,因為資料型別轉換的規則方式都是內部私有的。xml,json是開放平臺系統資料交換的標準資料型別描述語言,本身是字元String形式的,只是它用String描述型別的語法是行業標準的。客戶端可以按行業標準從一個xml/json檔案裡提取裡面的資料型別和例項。所以,自定義型別的資料轉換主要包括 型別->jsonstring->bytestring->jsonstring->型別。換句話說我們只要有隱式JsonMarshaller例項就可以完成大部分的資料交換工作了。
spray-json是akka-http自帶預設的一個json工具庫,它提供了通用的針對任何型別T的Marshaller/Unmarshaller: ToEntityMarshaller[T] 和 FromEntityUnmarshaller[T]。使用spay-json很簡單,如下:
import akka.http.scaladsl.marshallers.sprayjson._ import spray.json._ object JsonMarshaller extends SprayJsonSupport with DefaultJsonProtocol { //domain models case class Person(name:String, age: Int) case class Location(province: String, city: String, zipcode: Int) case class Employee(person: Person, loccation: Location) //collect your json format instances implicit val fmtPerson = jsonFormat2(Person.apply) implicit val fmtLocation = jsonFormat3(Location.apply) implicit val fmtEmployee = jsonFormat2(Employee.apply) }
使用Marshaller時只要import JsonMarshaller._ 把幾個型別的隱式轉換例項帶進可視域即可,如下:
import JsonMarshaller._ val person = Person("Jonh Doe", 23) val location = Location("GuangDong","ShenZhen",10223) val employee = Employee(person,location) val route = path("json") { complete {employee} }
就這麼簡單,試試看:
http://192.168.11.189:8011/json {"loccation":{"city":"ShenZhen","province":"GuangDong","zipcode":10223},"person":{"age":23,"name":"Jonh Doe"}}
沒錯,客戶端的確收到正確的json資料。還有一項需求是在Reponse裡返回一個數據流(多條資料),如當前交易專案清單。這個也比較容易:akka-http本身支援json-streaming。具體使用方法如下:
import akka.http.scaladsl.common.EntityStreamingSupport import akka.stream.scaladsl._ implicit val jsonStreamingSupport = EntityStreamingSupport.json() .withParallelMarshalling(parallelism = 4, unordered = false) val persons = List(person,Person("Peter Kung",28), Person("Ketty Wang",16)) val personDataSource: Source[Person,Any] = Source.fromIterator(() => persons.iterator) val route = path("json") { complete {employee} } ~ path("stream") { complete(personDataSource) }
在客戶端browser上測試:
http://192.168.11.189:8011/stream [{"age":23,"name":"Jonh Doe"},{"age":28,"name":"Peter Kung"},{"age":16,"name":"Ketty Wang"}]
也沒問題。下面是本次示範中使用的依賴和它們的版本:
libraryDependencies ++= Seq( "de.heikoseeberger" %% "akka-http-json4s" % "1.26.0", "org.json4s" %% "json4s-jackson" % "3.6.6", "org.json4s" %% "json4s-ext" % "3.6.6", "com.typesafe.akka" %% "akka-http" % "10.1.8" , "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8", "com.typesafe.akka" %% "akka-stream" % "2.5.23" )
&n