restapi(4)- rest-mongo : MongoDB資料庫前端的httpserver
完成了一套標準的rest風格資料庫CRUD操作httpserver後發現有許多不足。主要是為了追求“通用”兩個字,想把所有服務介面做的更“範generic”些,結果反而限制了目標資料庫的特點,最終產生了一套功能弱小的玩具。比如說吧:標準rest風格getbyId需要所有的資料表都具備id這個欄位,有點傻。然後get返回的結果集又沒有什麼靈活的控制方法如返回數量、欄位、排序等。特別對MongoDB這樣的在查詢操作方面接近關係式資料庫的分散式資料庫:上篇提到過,它的query能力強大,條件組合靈活,如果不能在網路服務api中體現出來就太可惜了。所以,這篇博文會討論一套專門針對MongoDB的rest-server。我想達到的目的是:後臺資料庫是MongoDB,通過httpserver提供對MongoDB的CRUD操作,客戶端通過http呼叫CRUD服務。後臺開發對每一個數據庫表單使用統一的標準增添一套新的CRUD服務。希望如此能夠提高開發效率,減少程式碼出錯機會。
MongoDB是一種檔案型別資料庫,資料格式更加多樣化。在這次示範裡希望能把MongoDB有特點的資料型別以及它們的處理方法都介紹了,包括:日期型別,二進位制型別blob(圖片)等。順便提一下:普通大型文字檔案也可以用二進位制blob方式存入MongoDB,因為檔案在http傳輸過程中必須以byte方式進行,所以後臺httpserver接收的檔案格式是一串byte,不用任何格式轉換就可以直接存入MongoDB blob欄位。客戶端從後臺下載時就需要把bytes轉換成UTF8字元就可以恢復檔案內容了。
首先,我們先從Model開始,在scala裡用case class來表示。Model是MongoDB Document的對應。在scala程式設計裡我們是用case class 當作Document來操作的。我們設計的Model都會繼承一個ModelBase trait:
trait ModelBase[E] { def to: E } case class Person( userid: String = "", name: String = "", age: Option[Int] = None, dob: Option[MGODate] = None, //生日 address: Option[String] = None ) extends ModelBase[Document] { import org.mongodb.scala.bson._ override def to: Document = { var doc = Document( "userid" -> this.userid, "name" -> this.name) if (this.age != None) doc = doc + ("age" -> this.age.get) if (this.dob != None) doc = doc + ("dob" -> this.dob.get) if (this.address != None) doc = doc + ("address" -> this.address.getOrElse("")) doc } } object Person { val fromDocument: Document => Person = doc => { val keyset = doc.keySet Person( userid = doc.getString("userid"), name = doc.getString("name"), age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]], dob = {if (keyset.contains("dob")) Some(doc.getDate("dob")) else None }, address = mgoGetStringOrNone(doc,"address") ) } }
在上面例子裡Person對應MongoDB裡一個Document。除了注意對應型別屬性與表字段型別外,還提供了to,fromDecument兩個轉換函式。其中to函式是繼承ModelBase的,代表所有MongoDB Model都必須具備to這個函式。這點很重要,因為在從json構建成Model時,如果屬於ModelBase則肯定可以呼叫一個to函式:
class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])( ... post { entity(as[String]) { json => val extractedEntity: M = fromJson[M](json) val doc: Document = extractedEntity.to val futmsg = repository.insert(doc).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(c) => c.toString() case None => "insert may not complete!" } case Left(err) => err.getMessage } }
注意這個extractedEntity:我們現在還不能確定它的具體型別,是Person,Animal,Machine? 但我們確定它是M型別,而M<:ModalBase[Document],所以M是MongoDB Model。可以呼叫extractedEntity.to獲取一個Document。
仔細看,Person裡並不包括blob型別欄位。因為到現在我還沒有想到辦法在一個httprequest裡把多個欄位和圖片一次性發出來,必須分兩個request才能完成一個Document的上傳。httpserver收到兩個requests後還要進行requests的匹配對應管理,十分的複雜。所以含blob型別的Document只能把blob分拆到另一個Document裡,然後用這個Document唯一一個id欄位來連結:
case class Photo ( id: String, photo: Option[MGOBlob] ) extends ModelBase[Document] { override def to: Document = { var doc = Document("id" -> this.id) if (photo != None) doc = doc + ("photo" -> this.photo) doc } } object Photo { def fromDocument: Document => Photo = doc => { val keyset = doc.keySet Photo( id = doc.getString("id"), photo = mgoGetBlobOrNone(doc, "photo") ) } }
從另一個角度來講,把blob和正常欄位分開來儲存也有一定的優勢,最多也就是需要兩次query罷了。
第二部分是repository:資料庫操作函式:
class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) { def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = { var res = Seq[ResultOptions]() next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))} sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))} fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))} top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)} val ctxFind = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_QUERY) .setCommand(Find(andThen = res)) mgoQuery[Seq[R]](ctxFind,converter) } def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = { var res = Seq[ResultOptions]() next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))} sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))} fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))} top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)} val ctxFind = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_QUERY) .setCommand(Find(filter = Some(filtr),andThen = res)) mgoQuery[Seq[R]](ctxFind,converter) } def getOneDocument(filtr: Bson): DBOResult[Document] = { val ctxFind = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_QUERY) .setCommand(Find(filter = Some(filtr),firstOnly = true)) mgoQuery[Document](ctxFind,converter) } def insert(doc: Document): DBOResult[Completed] = { val ctxInsert = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Insert(Seq(doc))) mgoUpdate[Completed](ctxInsert) } def delete(filter: Bson): DBOResult[DeleteResult] = { val ctxDelete = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Delete(filter)) mgoUpdate[DeleteResult](ctxDelete) } def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = { val ctxUpdate = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Update(filter,update,None,!many)) mgoUpdate[UpdateResult](ctxUpdate) } def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = { val ctxUpdate = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Replace(filter,row)) mgoUpdate[UpdateResult](ctxUpdate) } }
這部分上篇博文討論過。最後是akka-http的核心部分:Route。MongoDB CRUD服務對外的api:
(get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) { (filter,fields,sort,top,next) => { dbor = { filter match { case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top) case None => repository.getAll(next,sort,fields,top) } } val futRows = dbor.value.value.runToFuture.map { eolr => eolr match { case Right(olr) => olr match { case Some(lr) => lr case None => Seq[M]() } case Left(_) => Seq[M]() } } complete(futureToJson(futRows)) } } ~ post { entity(as[String]) { json => val extractedEntity: M = fromJson[M](json) val doc: Document = extractedEntity.to val futmsg = repository.insert(doc).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(c) => c.toString() case None => "insert may not complete!" } case Left(err) => err.getMessage } } complete(futmsg) } } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) => val bson = Document(filter) if (set == None) { entity(as[String]) { json => val extractedEntity: M = fromJson[M](json) val doc: Document = extractedEntity.to val futmsg = repository.replace(bson, doc).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated." case None => "update may not complete!" } case Left(err) => err.getMessage } } complete(futureToJson(futmsg)) } } else { set match { case Some(u) => val ubson = Document(u) dbou = repository.update(bson, ubson, many.getOrElse(true)) case None => dbou = Left(new IllegalArgumentException("missing set statement for update!")) } val futmsg = dbou.value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated." case None => "update may not complete!" } case Left(err) => err.getMessage } } complete(futureToJson(futmsg)) } } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) => val bson = Document(filter) val futmsg = repository.delete(bson).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(d) => s"${d.getDeletedCount} rows deleted." case None => "delete may not complete!" } case Left(err) => err.getMessage } } complete(futureToJson(futmsg)) } }
與上篇最大的區別就是這次的Route支援MongoDB特性的query string,bson型別的引數。如:
http://192.168.0.189:50081/private/crud/person http://192.168.0.189:50081/private/crud/person?filter={"userid":"c001"} http://192.168.0.189:50081/private/crud/person?sort={"userid":-1} http://192.168.0.189:50081/private/crud/person?filter={"userid":{$gt:"c000"}}&sort={"userid":-1}&top=3
可惜的是bson表示式中有些字元是url禁止的,所以必須預先處理一下。可以用公網的UrlEncoder線上轉換:
https://www.url-encoder.com {"userid":"c001"} -> %7B%22userid%22%3A%22c001%22%7D
在程式裡可以用軟體工具:"com.github.tasubo" % "jurl-tools" % "0.6" URLEncode.encode(xyz)
val sort = """ |{userid:-1} """.stripMargin val getAllRequest = HttpRequest( HttpMethods.GET, uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort), ).addHeader(authentication)
blob服務的api Route:
pathPrefix("blob") { (get & path(Remaining)) { id => val filtr = equal("id", id) val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map { eodoc => eodoc match { case Right(odoc) => odoc match { case Some(doc) => if (doc == null) None else mgoGetBlobOrNone(doc, "photo") case None => None } case Left(_) => None } } onComplete(futOptPic) { case Success(optBlob) => optBlob match { case Some(blob) => withoutSizeLimit { encodeResponseWith(Gzip) { complete( HttpEntity( ContentTypes.`application/octet-stream`, ByteArrayToSource(blob.getData)) ) } } case None => complete(StatusCodes.NotFound) } case Failure(err) => complete(err) } } ~ (post & parameter('id)) { id => withoutSizeLimit { decodeRequest { extractDataBytes { bytes => val fut = bytes.runFold(ByteString()) { case (hd, bs) => hd ++ bs } onComplete(fut) { case Success(b) => val doc = Document("id" -> id, "photo" -> b.toArray) val futmsg = repository.insert(doc).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(c) => c.toString() case None => "insert may not complete!" } case Left(err) => err.getMessage } } complete(futmsg) case Failure(err) => complete(err) } } } } } }
注意:MongoRoute[M]是個範型別。我希望對任何Model的Route只需要指定M即可,如:
implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument)) implicit val picDao = new MongoRepo[Photo]("testdb","photo", None) ... pathPrefix("public") { (pathPrefix("crud")) { new MongoRoute[Person]("person")(personDao) .route ~ new MongoRoute[Photo]("photo")(picDao) .route } }
是否省力多了?但是,回到原來問題:blob型別在整個移動過程中都不需要進行格式轉換。所以id欄位名稱是指定的,這點在設計表結構時要注意。
如何測試一個httpserver還是比較頭痛的。用瀏覽器只能測試GET,其它POST,PUT,DELETE應該怎麼測試?其實可以用curl:
curl -i -X GET http://rest-api.io/items curl -i -X GET http://rest-api.io/items/5069b47aa892630aae059584 curl -i -X DELETE http://rest-api.io/items/5069b47aa892630aae059584 curl -i -X POST -H 'Content-Type: application/json' -d '{"name": "New item", "year": "2009"}' http://rest-api.io/items curl -i -X PUT -H 'Content-Type: application/json' -d '{"name": "Updated item", "year": "2010"}' http://rest-api.io/items/5069b47aa892630aae059584
下面寫兩個客戶端分別測試crud和blob:
TestCrudClient.scala
import akka.actor._ import akka.http.scaladsl.model.headers._ import scala.concurrent._ import scala.concurrent.duration._ import akka.http.scaladsl.Http import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer import com.github.tasubo.jurl.URLEncode import com.datatech.rest.mongo.MongoModels.Person import de.heikoseeberger.akkahttpjson4s.Json4sSupport import org.json4s.jackson import com.datatech.sdp.mongo.engine.MGOClasses._ trait JsonCodec extends Json4sSupport { import org.json4s.DefaultFormats import org.json4s.ext.JodaTimeSerializers implicit val serilizer = jackson.Serialization implicit val formats = DefaultFormats ++ JodaTimeSerializers.all } object JsConverters extends JsonCodec object TestCrudClient { type UserInfo = Map[String,Any] def main(args: Array[String]): Unit = { import JsConverters._ implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd")) val authRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.0.189:50081/auth", headers = List(authorization) ) val futToken: Future[HttpResponse] = Http().singleRequest(authRequest) val respToken = for { resp <- futToken jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String} } yield jstr val jstr = Await.result[String](respToken,2 seconds) println(jstr) scala.io.StdIn.readLine() val authentication = headers.Authorization(OAuth2BearerToken(jstr)) val sort = """ |{userid:-1} """.stripMargin val getAllRequest = HttpRequest( HttpMethods.GET, uri = "http://192.168.0.189:50081/public/crud/person?sort="+URLEncode.encode(sort), ).addHeader(authentication) val futGetAll: Future[HttpResponse] = Http().singleRequest(getAllRequest) println(Await.result(futGetAll,2 seconds)) scala.io.StdIn.readLine() var bf = """ |{"userid":"c888"} """.stripMargin println(URLEncode.encode(bf)) val delRequest = HttpRequest( HttpMethods.DELETE, uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf) ).addHeader(authentication) val futDel: Future[HttpResponse] = Http().singleRequest(delRequest) println(Await.result(futDel,2 seconds)) scala.io.StdIn.readLine() bf = """ |{"userid":"c001"} """.stripMargin val getRequest = HttpRequest( HttpMethods.GET, uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf), ).addHeader(authentication) val futGet: Future[HttpResponse] = Http().singleRequest(getRequest) println(Await.result(futGet,2 seconds)) scala.io.StdIn.readLine() val tiger = Person("c001","tiger chan",Some(56)) val john = Person("c002", "johnny dep", Some(60)) val peter = Person("c003", "pete brad", Some(58)) val susan = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) ) val ns = Person("c004", "susan boyr", Some(68),Some(mgoDate(1950,11,5)) ) val saveRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.0.189:50081/public/crud/person" ).addHeader(authentication) val futPost: Future[HttpResponse] = for { reqEntity <- Marshal(peter).to[RequestEntity] response <- Http().singleRequest(saveRequest.copy(entity=reqEntity)) } yield response println(Await.result(futPost,2 seconds)) scala.io.StdIn.readLine() var set = """ | {$set: | { | name:"tiger the king", | age:18 | } | } """.stripMargin val updateRequest = HttpRequest( HttpMethods.PUT, uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode( bf)+"&set="+URLEncode.encode(set)+"&many=true" ).addHeader(authentication) val futUpdate: Future[HttpResponse] = Http().singleRequest(updateRequest) println(Await.result(futUpdate,2 seconds)) scala.io.StdIn.readLine() val repRequest = HttpRequest( HttpMethods.PUT, uri = "http://192.168.0.189:50081/public/crud/person?filter="+URLEncode.encode(bf) ).addHeader(authentication) val futReplace: Future[HttpResponse] = for { reqEntity <- Marshal(susan).to[RequestEntity] response <- Http().singleRequest(updateRequest.copy(entity=reqEntity)) } yield response println(Await.result(futReplace,2 seconds)) scala.io.StdIn.readLine() system.terminate() } }
TestFileClient.scala
import akka.stream._ import java.nio.file._ import java.io._ import akka.http.scaladsl.model.headers._ import scala.concurrent._ import com.datatech.rest.mongo.FileStreaming._ import scala.concurrent.duration._ import akka.actor.ActorSystem import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ import akka.http.scaladsl.Http import akka.stream.scaladsl.{FileIO, Source} import scala.util._ case class FileUtil(implicit sys: ActorSystem) { import sys.dispatcher implicit val mat = ActorMaterializer() def createEntity(file: File): RequestEntity = { require(file.exists()) val formData = Multipart.FormData( Source.single( Multipart.FormData.BodyPart( "test", HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance Map("filename" -> file.getName)))) Await.result(Marshal(formData).to[RequestEntity], 3 seconds) } def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = { implicit val mat = ActorMaterializer() import sys.dispatcher val futResp = Http(sys).singleRequest( // Gzip.encodeMessage( request.copy(entity = dataEntity) //.addHeader(`Content-Encoding`(HttpEncodings.gzip)) // ) ) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.dataBytes.map(_.utf8String).runForeach(println) case Success(r@HttpResponse(code, _, _, _)) => println(s"Upload request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to Upload file!") case Failure(err) => println(s"Upload failed: ${err.getMessage}") } } def downloadFileTo(request: HttpRequest, destPath: String) = { // val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip)) val futResp = Http(sys).singleRequest(request) //.map(Gzip.decodeMessage(_)) futResp .andThen { case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) => entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath))) .onComplete { case _ => println(s"Download file saved to: $destPath") } case Success(r@HttpResponse(code, _, _, _)) => println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) => println("Unable to download file!") case Failure(err) => println(s"Download failed: ${err.getMessage}") } } } object TestFileClient { type UserInfo = Map[String,Any] def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() // needed for the future flatMap/onComplete in the end implicit val executionContext = system.dispatcher val helloRequest = HttpRequest(uri = "http://192.168.0.189:50081/") val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd")) val authRequest = HttpRequest( HttpMethods.POST, uri = "http://192.168.0.189:50081/auth", headers = List(authorization) ) val futToken: Future[HttpResponse] = Http().singleRequest(authRequest) val respToken = for { resp <- futToken jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String} } yield jstr val jstr = Await.result[String](respToken,2 seconds) println(jstr) scala.io.StdIn.readLine() val authentication = headers.Authorization(OAuth2BearerToken(jstr)) val entity = HttpEntity( ContentTypes.`application/octet-stream`, fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024) ) // val chunked = HttpEntity.Chunked.fromData( ContentTypes.`application/octet-stream`, fileStreamSource("/Users/tiger/pictures/MeTiger.png",1024) ) val uploadRequest = HttpRequest( HttpMethods.POST, // uri = "http://192.168.0.189:50081/private/file?filename=tiger.jpg", uri = "http://192.168.0.189:50081/public/crud/photo/blob?id=tiger.jpg", ).addHeader(authentication) //upload file Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds) //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds) val dlRequest = HttpRequest( HttpMethods.GET, // uri = "http://192.168.0.189:50081/api/file/mypic.jpg", uri = "http://192.168.0.189:50081/public/crud/photo/blob/tiger.jpg", ).addHeader(authentication) FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg") scala.io.StdIn.readLine() system.terminate() } }
下面是本次示範中的原始碼:
build.sbt
name := "rest-mongo" version := "0.1" scalaVersion := "2.12.8" scalacOptions += "-Ypartial-unification" val akkaVersion = "2.5.23" val akkaHttpVersion = "10.1.8" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http" % "10.1.8", "com.typesafe.akka" %% "akka-stream" % "2.5.23", "com.pauldijou" %% "jwt-core" % "3.0.1", "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0", "org.json4s" %% "json4s-native" % "3.6.1", "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", "org.slf4j" % "slf4j-simple" % "1.7.25", "org.json4s" %% "json4s-jackson" % "3.6.7", "org.json4s" %% "json4s-ext" % "3.6.7", // for scalikejdbc "org.scalikejdbc" %% "scalikejdbc" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test", "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", "com.h2database" % "h2" % "1.4.199", "com.zaxxer" % "HikariCP" % "2.7.4", "com.jolbox" % "bonecp" % "0.8.0.RELEASE", "com.typesafe.slick" %% "slick" % "3.3.2", //for cassandra 3.6.0 "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0", //for mongodb 4.0 "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0", "ch.qos.logback" % "logback-classic" % "1.2.3", "io.monix" %% "monix" % "3.0.0-RC3", "org.typelevel" %% "cats-core" % "2.0.0-M4", "com.github.tasubo" % "jurl-tools" % "0.6" )
MongoHttpServer.scala
package com.datatech.rest.mongo import akka.actor._ import akka.stream._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import pdi.jwt._ import AuthBase._ import MockUserAuthService._ import org.mongodb.scala._ import scala.collection.JavaConverters._ import MongoModels._ import MongoRepo._ import MongoRoute._ object MongoHttpServer extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher val settings: MongoClientSettings = MongoClientSettings.builder() .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava)) .build() implicit val client: MongoClient = MongoClient(settings) implicit val personDao = new MongoRepo[Person]("testdb","person", Some(Person.fromDocument)) implicit val picDao = new MongoRepo[Photo]("testdb","photo", None) implicit val authenticator = new AuthBase() .withAlgorithm(JwtAlgorithm.HS256) .withSecretKey("OpenSesame") .withUserFunc(getValidUser) val route = path("auth") { authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo => post { complete(authenticator.issueJwt(userinfo))} } } ~ pathPrefix("private") { authenticateOAuth2(realm = "private", authenticator.authenticateToken) { validToken => FileRoute(validToken) .route // ~ ... } } ~ pathPrefix("public") { (pathPrefix("crud")) { new MongoRoute[Person]("person")(personDao) .route ~ new MongoRoute[Photo]("photo")(picDao) .route } } val (port, host) = (50081,"192.168.0.189") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ => httpSys.terminate()) }
ModalBase.scala
package com.datatech.rest.mongo trait ModelBase[E] { def to: E }
MongoModel.scala
package com.datatech.rest.mongo import org.mongodb.scala._ import com.datatech.sdp.mongo.engine._ import MGOClasses._ object MongoModels { case class Person( userid: String = "", name: String = "", age: Option[Int] = None, dob: Option[MGODate] = None, address: Option[String] = None ) extends ModelBase[Document] { import org.mongodb.scala.bson._ override def to: Document = { var doc = Document( "userid" -> this.userid, "name" -> this.name) if (this.age != None) doc = doc + ("age" -> this.age.get) if (this.dob != None) doc = doc + ("dob" -> this.dob.get) if (this.address != None) doc = doc + ("address" -> this.address.getOrElse("")) doc } } object Person { val fromDocument: Document => Person = doc => { val keyset = doc.keySet Person( userid = doc.getString("userid"), name = doc.getString("name"), age = mgoGetIntOrNone(doc,"age").asInstanceOf[Option[Int]], dob = {if (keyset.contains("dob")) Some(doc.getDate("dob")) else None }, address = mgoGetStringOrNone(doc,"address") ) } } case class Photo ( id: String, photo: Option[MGOBlob] ) extends ModelBase[Document] { override def to: Document = { var doc = Document("id" -> this.id) if (photo != None) doc = doc + ("photo" -> this.photo) doc } } object Photo { def fromDocument: Document => Photo = doc => { val keyset = doc.keySet Photo( id = doc.getString("id"), photo = mgoGetBlobOrNone(doc, "photo") ) } } }
MongoRepo.scala
package com.datatech.rest.mongo import org.mongodb.scala._ import org.bson.conversions.Bson import org.mongodb.scala.result._ import com.datatech.sdp.mongo.engine._ import MGOClasses._ import MGOEngine._ import MGOCommands._ import com.datatech.sdp.result.DBOResult.DBOResult import MongoModels._ object MongoRepo { class MongoRepo[R](db:String, coll: String, converter: Option[Document => R])(implicit client: MongoClient) { def getAll(next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = { var res = Seq[ResultOptions]() next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))} sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))} fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))} top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)} val ctxFind = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_QUERY) .setCommand(Find(andThen = res)) mgoQuery[Seq[R]](ctxFind,converter) } def query(filtr: Bson, next:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = { var res = Seq[ResultOptions]() next.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_FILTER,Some(Document(b)))} sort.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_SORT,Some(Document(b)))} fields.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(Document(b)))} top.foreach {b => res = res :+ ResultOptions(FOD_TYPE.FOD_LIMIT,None,b)} val ctxFind = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_QUERY) .setCommand(Find(filter = Some(filtr),andThen = res)) mgoQuery[Seq[R]](ctxFind,converter) } def getOneDocument(filtr: Bson): DBOResult[Document] = { val ctxFind = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_QUERY) .setCommand(Find(filter = Some(filtr),firstOnly = true)) mgoQuery[Document](ctxFind,converter) } def insert(doc: Document): DBOResult[Completed] = { val ctxInsert = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Insert(Seq(doc))) mgoUpdate[Completed](ctxInsert) } def delete(filter: Bson): DBOResult[DeleteResult] = { val ctxDelete = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Delete(filter)) mgoUpdate[DeleteResult](ctxDelete) } def update(filter: Bson, update: Bson, many: Boolean): DBOResult[UpdateResult] = { val ctxUpdate = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Update(filter,update,None,!many)) mgoUpdate[UpdateResult](ctxUpdate) } def replace(filter: Bson, row: Document): DBOResult[UpdateResult] = { val ctxUpdate = MGOContext(dbName = db,collName=coll) .setActionType(MGO_ACTION_TYPE.MGO_UPDATE) .setCommand(Replace(filter,row)) mgoUpdate[UpdateResult](ctxUpdate) } } }
MongoRoute.scala
package com.datatech.rest.mongo import akka.http.scaladsl.server.Directives import scala.util._ import org.mongodb.scala._ import com.datatech.sdp.file.Streaming._ import org.mongodb.scala.result._ import MongoRepo._ import akka.stream.ActorMaterializer import com.datatech.sdp.result.DBOResult._ import org.mongodb.scala.model.Filters._ import com.datatech.sdp.mongo.engine.MGOClasses._ import monix.execution.CancelableFuture import akka.util._ import akka.http.scaladsl.model._ import akka.http.scaladsl.coding.Gzip object MongoRoute { class MongoRoute[M <: ModelBase[Document]](val pathName: String)(repository: MongoRepo[M])( implicit c: MongoClient, m: Manifest[M], mat: ActorMaterializer) extends Directives with JsonConverter { import monix.execution.Scheduler.Implicits.global var dbor: DBOResult[Seq[M]] = _ var dbou: DBOResult[UpdateResult] = _ val route = pathPrefix(pathName) { pathPrefix("blob") { (get & path(Remaining)) { id => val filtr = equal("id", id) val futOptPic: CancelableFuture[Option[MGOBlob]] = repository.getOneDocument(filtr).value.value.runToFuture.map { eodoc => eodoc match { case Right(odoc) => odoc match { case Some(doc) => if (doc == null) None else mgoGetBlobOrNone(doc, "photo") case None => None } case Left(_) => None } } onComplete(futOptPic) { case Success(optBlob) => optBlob match { case Some(blob) => withoutSizeLimit { encodeResponseWith(Gzip) { complete( HttpEntity( ContentTypes.`application/octet-stream`, ByteArrayToSource(blob.getData)) ) } } case None => complete(StatusCodes.NotFound) } case Failure(err) => complete(err) } } ~ (post & parameter('id)) { id => withoutSizeLimit { decodeRequest { extractDataBytes { bytes => val fut = bytes.runFold(ByteString()) { case (hd, bs) => hd ++ bs } onComplete(fut) { case Success(b) => val doc = Document("id" -> id, "photo" -> b.toArray) val futmsg = repository.insert(doc).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(c) => c.toString() case None => "insert may not complete!" } case Left(err) => err.getMessage } } complete(futmsg) case Failure(err) => complete(err) } } } } } } ~ (get & parameters('filter.?,'fields.?,'sort.?,'top.as[Int].?,'next.?)) { (filter,fields,sort,top,next) => { dbor = { filter match { case Some(fltr) => repository.query(Document(fltr),next,sort,fields,top) case None => repository.getAll(next,sort,fields,top) } } val futRows = dbor.value.value.runToFuture.map { eolr => eolr match { case Right(olr) => olr match { case Some(lr) => lr case None => Seq[M]() } case Left(_) => Seq[M]() } } complete(futureToJson(futRows)) } } ~ post { entity(as[String]) { json => val extractedEntity: M = fromJson[M](json) val doc: Document = extractedEntity.to val futmsg = repository.insert(doc).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(c) => c.toString() case None => "insert may not complete!" } case Left(err) => err.getMessage } } complete(futmsg) } } ~ (put & parameter('filter,'set.?, 'many.as[Boolean].?)) { (filter, set, many) => val bson = Document(filter) if (set == None) { entity(as[String]) { json => val extractedEntity: M = fromJson[M](json) val doc: Document = extractedEntity.to val futmsg = repository.replace(bson, doc).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated." case None => "update may not complete!" } case Left(err) => err.getMessage } } complete(futureToJson(futmsg)) } } else { set match { case Some(u) => val ubson = Document(u) dbou = repository.update(bson, ubson, many.getOrElse(true)) case None => dbou = Left(new IllegalArgumentException("missing set statement for update!")) } val futmsg = dbou.value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(d) => s"${d.getMatchedCount} matched rows, ${d.getModifiedCount} rows updated." case None => "update may not complete!" } case Left(err) => err.getMessage } } complete(futureToJson(futmsg)) } } ~ (delete & parameters('filter, 'many.as[Boolean].?)) { (filter,many) => val bson = Document(filter) val futmsg = repository.delete(bson).value.value.runToFuture.map { eoc => eoc match { case Right(oc) => oc match { case Some(d) => s"${d.getDeletedCount} rows deleted." case None => "delete may not complete!" } case Left(err) => err.getMessage } } complete(futureToJson(futmsg)) } } } }
&n