博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
restapi(3)- MongoDBEngine : MongoDB Scala编程工具库
阅读量:5096 次
发布时间:2019-06-13

本文共 55068 字,大约阅读时间需要 183 分钟。

最近刚好有同事在学习MongoDB,我们讨论过MongoDB应该置于服务器端然后通过web-service为客户端提供数据的上传下载服务。我们可以用上节讨论的respapi框架来实现针对MongoDB的CRUD操作。在谈到restapi之前我在这篇讨论先介绍一下MongoDB数据库操作的scala编程,因为与传统的SQL数据库操作编程有比较大的差别。

在前面有关sdp (streaming-data-processor)系列的博文中有一段是关于MongoDBEngine的。刚好把这套工具的使用在这里介绍一下。

MongoDBEngine是基于mongodb-scala-driver上开发的一套MongoDB数据库CRUD Scala编程工具,其主要功能可以从下面这三个函数中反映出来:

def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T]   // T => FindIterable  e.g List[Document] def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed]

其中: mgoUpdate功能包括:insert,update,delete,replace ...

            mgoQuery: find,count,distinct ...

            mgoAdmin: dropCollection, createCollection ...

首先需要注意的是它们的返回结果类型: DBOResult[T],实质上是 Future[Either[String,Option[T]]]

type DBOError[A] = EitherT[Task,Throwable,A] type DBOResult[A] = OptionT[DBOError,A]

看起来很复杂,实际容易解释:设计这个类型的初衷是针对数据库操作的,所以:

1、异步操作,所以用Future (Task即Future, 如:Task.runToFuture)

2、返回结果可能为空,所以用Option

3、发生错误结果也为空,但需要知道空值是由错误产生的,所以用了Either

把所有返回结果类型统一成DBOResult[T]是为了方便进行函数组合,如:

for {    a <- mgoQuery(...)    _ <- mgoUpdate(a, ...)    b <- mgoQuery(...)} yield b

但另一方面也为写代码带来一些麻烦,如从结构中抽出运算结果值:

mgoQuery[List[Document]](ctxFind).value.value.runToFuture {      case Success(eold) => eold match {        case Right(old) => old match {          case Some(ld) => ld.map(toPO(_)).foreach(showPO)          case None => println(s"Empty document found!")        }        case Left(err) => println(s"Error: ${err.getMessage}") }

是有些麻烦,不过能更详细的了解命令执行过程,而且是统一标准的写法(ctlr-c, ctlr-v 就可以了)。

上面三个函数都有一个同样的MGOContext类型的入参数,这是一个命令类型:

case class MGOContext(                         dbName: String,                         collName: String,                         actionType: MGO_ACTION_TYPE = MGO_QUERY,                         action: Option[MGOCommands] = None,                         actionOptions: Option[Any] = None,                         actionTargets: Seq[String] = Nil                       ) {    ctx =>    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)    def setCollName(name: String): MGOContext = ctx.copy(collName = name)    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))    def toSomeProto = MGOProtoConversion.ctxToProto(this)  }  object MGOContext {    def apply(db: String, coll: String) = new MGOContext(db, coll)    def fromProto(proto: sdp.grpc.services.ProtoMGOContext): MGOContext =      MGOProtoConversion.ctxFromProto(proto)  }

可以看到MGOContext.action就是具体的操作命令。下面是一个mgoAdmin命令的示范:

val ctx = MGOContext("testdb","po").setCommand(    DropCollection("po"))  import monix.execution.Scheduler.Implicits.global  println(getResult(mgoAdmin(ctx).value.value.runToFuture))

mgoUpdate示范:

val optInsert = new InsertManyOptions().ordered(true)  val ctxInsert = ctx.setCommand(    Insert(Seq(po1,po2),Some(optInsert))  )  println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))

我们选择MongoDB的主要目的是因为它分布式特性,适合大数据模式。但同时MongoDB具备强大的query功能,与传统SQL数据库模式相近,甚至还可以用索引。虽然MongoDB不支持数据关系,但对于我们这样的传统SQL老兵还是必然之选。MongoDB支持逗点查询组合,如:

val resultDocType = FindIterable[Document]    val resultOption = FindObservable(resultDocType)      .maxScan(...)    .limit(...)    .sort(...)    .project(...)

比如对查询结果进行排序,同时又抽选几个返回的字段可以写成:FindObservable(...).sort(...).project(...)。MongoEngine提供了一个ResultOptions类型:

case class ResultOptions(                            optType: FOD_TYPE,                            bson: Option[Bson] = None,                            value: Int = 0 ){     def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {      optType match {        case  FOD_FIRST        => find        case  FOD_FILTER       => find.filter(bson.get)        case  FOD_LIMIT        => find.limit(value)        case  FOD_SKIP         => find.skip(value)        case  FOD_PROJECTION   => find.projection(bson.get)        case  FOD_SORT         => find.sort(bson.get)        case  FOD_PARTIAL      => find.partial(value != 0)        case  FOD_CURSORTYPE   => find        case  FOD_HINT         => find.hint(bson.get)        case  FOD_MAX          => find.max(bson.get)        case  FOD_MIN          => find.min(bson.get)        case  FOD_RETURNKEY    => find.returnKey(value != 0)        case  FOD_SHOWRECORDID => find.showRecordId(value != 0)      }    }

这个类型也是MGOContext类型的一个参数。 下面是一些用例:

val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))  val sort: Bson = (descending("ponum"))  val proj: Bson = (and(include("ponum","podate")                   ,include("vendor"),excludeId()))  val resSort = ResultOptions(FOD_SORT,Some(sort))  val resProj = ResultOptions(FOD_PROJECTION,Some(proj))  val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))  val ctxFindArrayItem = ctx.setCommand(    Find(filter = Some(equal("podtl.qty",100)))  )

下面是一个完整的例子:

import akka.actor.ActorSystemimport akka.stream.ActorMaterializerimport org.mongodb.scala._import scala.collection.JavaConverters._import com.mongodb.client.model._import com.datatech.sdp.mongo.engine._import MGOClasses._import scala.util._object TestMongoEngine extends App {  import MGOEngine._  import MGOHelpers._  import MGOCommands._  import MGOAdmins._  // or provide custom MongoClientSettings  val settings: MongoClientSettings = MongoClientSettings.builder()    .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))    .build()  implicit val client: MongoClient = MongoClient(settings)  implicit val system = ActorSystem()  implicit val mat = ActorMaterializer() // implicit val ec = system.dispatcher  val ctx = MGOContext("testdb","po").setCommand(    DropCollection("po"))  import monix.execution.Scheduler.Implicits.global  println(getResult(mgoAdmin(ctx).value.value.runToFuture))scala.io.StdIn.readLine()  val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")  val po1 = Document (    "ponum" -> "po18012301",    "vendor" -> "The smartphone compay",    "podate" -> mgoDate(2017,5,13),    "remarks" -> "urgent, rush order",    "handler" -> pic,    "podtl" -> Seq(      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")    )  )  val po2 = Document (    "ponum" -> "po18022002",    "vendor" -> "The Samsung compay",    "podate" -> mgoDate(2015,11,6),    "podtl" -> Seq(      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")    )  )  val optInsert = new InsertManyOptions().ordered(true)  val ctxInsert = ctx.setCommand(    Insert(Seq(po1,po2),Some(optInsert))  )  println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))  scala.io.StdIn.readLine()  case class PO (                  ponum: String,                  podate: MGODate,                  vendor: String,                  remarks: Option[String],                  podtl: Option[MGOArray],                  handler: Option[MGOBlob]                )  def toPO(doc: Document): PO = {    PO(      ponum = doc.getString("ponum"),      podate = doc.getDate("podate"),      vendor = doc.getString("vendor"),      remarks = mgoGetStringOrNone(doc,"remarks"),      podtl = mgoGetArrayOrNone(doc,"podtl"),      handler = mgoGetBlobOrNone(doc,"handler")    )  }  case class PODTL(                    item: String,                    price: Double,                    qty: Int,                    packing: Option[String],                    payTerm: Option[String]                  )  def toPODTL(podtl: Document): PODTL = {    PODTL(      item = podtl.getString("item"),      price = podtl.getDouble("price"),      qty = podtl.getInteger("qty"),      packing = mgoGetStringOrNone(podtl,"packing"),      payTerm = mgoGetStringOrNone(podtl,"payterm")    )  }  def showPO(po: PO) = {    println(s"po number: ${po.ponum}")    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")    println(s"vendor: ${po.vendor}")    if (po.remarks != None)      println(s"remarks: ${po.remarks.get}")    po.podtl match {      case Some(barr) =>        mgoArrayToDocumentList(barr)          .map { dc => toPODTL(dc)}          .foreach { doc: PODTL =>            print(s"==>Item: ${doc.item} ")            print(s"price: ${doc.price} ")            print(s"qty: ${doc.qty} ")            doc.packing.foreach(pk => print(s"packing: ${pk} "))            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))            println("")          }      case _ =>    }    po.handler match {      case Some(blob) =>        val fileName = s"/users/tiger/${po.ponum}.png"        mgoBlobToFile(blob,fileName)        println(s"picture saved to ${fileName}")      case None => println("no picture provided")    }  }  import org.mongodb.scala.model.Projections._  import org.mongodb.scala.model.Filters._  import org.mongodb.scala.model.Sorts._  import org.mongodb.scala.bson.conversions._  import org.mongodb.scala.bson.Document  val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))  val sort: Bson = (descending("ponum"))  val proj: Bson = (and(include("ponum","podate")                   ,include("vendor"),excludeId()))  val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))  val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))  val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))  val ctxFindArrayItem = ctx.setCommand(    Find(filter = Some(equal("podtl.qty",100)))  )  for {    _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {      case Success(eold) => eold match {        case Right(old) => old match {          case Some(ld) => ld.map(toPO(_)).foreach(showPO)          case None => println(s"Empty document found!")        }        case Left(err) => println(s"Error: ${err.getMessage}")      }        println("-------------------------------")      case Failure(e) => println(e.getMessage)    }    _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {      case Success(eop) => eop match {        case Right(op) => op match {          case Some(p) => showPO(_)          case None => println(s"Empty document found!")        }        case Left(err) => println(s"Error: ${err.getMessage}")      }        println("-------------------------------")      case Failure(e) => println(e.getMessage)    }    _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {      case Success(eops) => eops match {        case Right(ops) => ops match {          case Some(lp) => lp.foreach(showPO)          case None => println(s"Empty document found!")        }        case Left(err) => println(s"Error: ${err.getMessage}")      }        println("-------------------------------")      case Failure(e) => println(e.getMessage)    }  } yield()  scala.io.StdIn.readLine()  system.terminate()}

运行程序后结果如下:

Right(Some(The operation completed successfully))Right(Some(The operation completed successfully))po number: po18022002po date: 2015-12-06vendor: The Samsung compayno picture providedpo number: po18012301po date: 2017-06-13vendor: The smartphone compayno picture provided--------------------------------------------------------------po number: po18022002po date: 2015-12-06vendor: The Samsung compay==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard ==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days ==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury no picture provided-------------------------------

以下是本次讨论涉及的全部源代码:

build.sbt

name := "dt-dal"version := "0.2"scalaVersion := "2.12.8"scalacOptions += "-Ypartial-unification"val akkaVersion = "2.5.23"val akkaHttpVersion = "10.1.8"libraryDependencies := Seq(  // for scalikejdbc  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",  "com.h2database"  %  "h2" % "1.4.199",  "com.zaxxer" % "HikariCP" % "2.7.4",  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",  "com.typesafe.slick" %% "slick" % "3.3.2",  //for cassandra 3.6.0  "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "1.1.0",  //for mongodb 4.0  "org.mongodb.scala" %% "mongo-scala-driver" % "2.6.0",  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.1.0",  "ch.qos.logback"  %  "logback-classic"   % "1.2.3",  "io.monix" %% "monix" % "3.0.0-RC3",  "org.typelevel" %% "cats-core" % "2.0.0-M4")

converters/DBOResultType.scala

package com.datatech.sdp.resultimport cats._import cats.data.EitherTimport cats.data.OptionTimport monix.eval.Taskimport cats.implicits._import scala.concurrent._import scala.collection.TraversableOnceobject DBOResult {  type DBOError[A] = EitherT[Task,Throwable,A]  type DBOResult[A] = OptionT[DBOError,A]  implicit def valueToDBOResult[A](a: A): DBOResult[A] =         Applicative[DBOResult].pure(a)  implicit def optionToDBOResult[A](o: Option[A]): DBOResult[A] =         OptionT((o: Option[A]).pure[DBOError])  implicit def eitherToDBOResult[A](e: Either[Throwable,A]): DBOResult[A] = { //   val error: DBOError[A] = EitherT[Task,Throwable, A](Task.eval(e))         OptionT.liftF(EitherT.fromEither[Task](e))  }  implicit def futureToDBOResult[A](fut: Future[A]): DBOResult[A] = {       val task = Task.fromFuture[A](fut)       val et = EitherT.liftF[Task,Throwable,A](task)       OptionT.liftF(et)  }  implicit class DBOResultToTask[A](r: DBOResult[A]) {    def toTask = r.value.value  }  implicit class DBOResultToOption[A](r:Either[Throwable,Option[A]]) {    def someValue: Option[A] = r match {      case Left(err) => (None: Option[A])      case Right(oa) => oa    }  }  def wrapCollectionInOption[A, C[_] <: TraversableOnce[_]](coll: C[A]): DBOResult[C[A]] =    if (coll.isEmpty)      optionToDBOResult(None: Option[C[A]])    else      optionToDBOResult(Some(coll): Option[C[A]])}

filestream/FileStreaming.scala

package com.datatech.sdp.fileimport java.io.{ByteArrayInputStream, InputStream}import java.nio.ByteBufferimport java.nio.file.Pathsimport akka.stream.Materializerimport akka.stream.scaladsl.{FileIO, StreamConverters}import akka.util._import scala.concurrent.Awaitimport scala.concurrent.duration._object Streaming {  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer):ByteBuffer = {    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>      hd ++ bs    }    (Await.result(fut, timeOut)).toByteBuffer  }  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer): Array[Byte] = {    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>      hd ++ bs    }    (Await.result(fut, timeOut)).toArray  }  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer): InputStream = {    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>      hd ++ bs    }    val buf = (Await.result(fut, timeOut)).toArray    new ByteArrayInputStream(buf)  }  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(    implicit mat: Materializer) = {    val ba = new Array[Byte](byteBuf.remaining())    byteBuf.get(ba,0,ba.length)    val baInput = new ByteArrayInputStream(ba)    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))    source.runWith(FileIO.toPath(Paths.get(fileName)))  }  def ByteArrayToFile(bytes: Array[Byte], fileName: String)(    implicit mat: Materializer) = {    val bb = ByteBuffer.wrap(bytes)    val baInput = new ByteArrayInputStream(bytes)    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))    source.runWith(FileIO.toPath(Paths.get(fileName)))  }  def InputStreamToFile(is: InputStream, fileName: String)(    implicit mat: Materializer) = {    val source = StreamConverters.fromInputStream(() => is)    source.runWith(FileIO.toPath(Paths.get(fileName)))  }}

logging/Log.scala

package com.datatech.sdp.loggingimport org.slf4j.Logger/**  * Logger which just wraps org.slf4j.Logger internally.  *  * @param logger logger  */class Log(logger: Logger) {  // use var consciously to enable squeezing later  var isDebugEnabled: Boolean = logger.isDebugEnabled  var isInfoEnabled: Boolean = logger.isInfoEnabled  var isWarnEnabled: Boolean = logger.isWarnEnabled  var isErrorEnabled: Boolean = logger.isErrorEnabled  var isTraceEnabled: Boolean = logger.isTraceEnabled  def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {    level match {      case 'debug | 'DEBUG => debug(msg)      case 'info | 'INFO => info(msg)      case 'warn | 'WARN => warn(msg)      case 'error | 'ERROR => error(msg)      case 'trace | 'TRACE => trace(msg)      case _ => // nothing to do    }  }  var stepOn: Boolean = false  def step(msg: => String): Unit = {    if(stepOn)      logger.info("\n****** {} ******\n",msg)  }  def trace(msg: => String): Unit = {    if (isTraceEnabled && logger.isTraceEnabled) {      logger.trace(msg)    }  }  def trace(msg: => String, e: Throwable): Unit = {    if (isTraceEnabled && logger.isTraceEnabled) {      logger.trace(msg, e)    }  }  def debug(msg: => String): Unit = {    if (isDebugEnabled && logger.isDebugEnabled) {      logger.debug(msg)    }  }  def debug(msg: => String, e: Throwable): Unit = {    if (isDebugEnabled && logger.isDebugEnabled) {      logger.debug(msg, e)    }  }  def info(msg: => String): Unit = {    if (isInfoEnabled && logger.isInfoEnabled) {      logger.info(msg)    }  }  def info(msg: => String, e: Throwable): Unit = {    if (isInfoEnabled && logger.isInfoEnabled) {      logger.info(msg, e)    }  }  def warn(msg: => String): Unit = {    if (isWarnEnabled && logger.isWarnEnabled) {      logger.warn(msg)    }  }  def warn(msg: => String, e: Throwable): Unit = {    if (isWarnEnabled && logger.isWarnEnabled) {      logger.warn(msg, e)    }  }  def error(msg: => String): Unit = {    if (isErrorEnabled && logger.isErrorEnabled) {      logger.error(msg)    }  }  def error(msg: => String, e: Throwable): Unit = {    if (isErrorEnabled && logger.isErrorEnabled) {      logger.error(msg, e)    }  }}

logging/LogSupport.scala

package com.datatech.sdp.loggingimport org.slf4j.LoggerFactorytrait LogSupport {  /**    * Logger    */  protected val log = new Log(LoggerFactory.getLogger(this.getClass))}

mgo/engine/ObservableToPublisher.scala

package com.datatech.sdp.mongo.engineimport java.util.concurrent.atomic.AtomicBooleanimport org.mongodb.{scala => mongoDB}import org.{reactivestreams => rxStreams}final case class ObservableToPublisher[T](observable: mongoDB.Observable[T])  extends rxStreams.Publisher[T] {  def subscribe(subscriber: rxStreams.Subscriber[_ >: T]): Unit =    observable.subscribe(new mongoDB.Observer[T]() {      override def onSubscribe(subscription: mongoDB.Subscription): Unit =        subscriber.onSubscribe(new rxStreams.Subscription() {          private final val cancelled: AtomicBoolean = new AtomicBoolean          override def request(n: Long): Unit =            if (!subscription.isUnsubscribed && !cancelled.get() && n < 1) {              subscriber.onError(                new IllegalArgumentException(                  s"Demand from publisher should be a positive amount. Current amount is:$n"                )              )            } else {              subscription.request(n)            }          override def cancel(): Unit =            if (!cancelled.getAndSet(true)) subscription.unsubscribe()        })      def onNext(result: T): Unit = subscriber.onNext(result)      def onError(e: Throwable): Unit = subscriber.onError(e)      def onComplete(): Unit = subscriber.onComplete()    })}

mgo/engine/MongoDBEngine.scala

package com.datatech.sdp.mongo.engineimport java.text.SimpleDateFormatimport java.util.Calendarimport akka.NotUsedimport akka.stream.Materializerimport akka.stream.alpakka.mongodb.scaladsl._import akka.stream.scaladsl.{Flow, Source}import org.bson.conversions.Bsonimport org.mongodb.scala.bson.collection.immutable.Documentimport org.mongodb.scala.bson.{BsonArray, BsonBinary}import org.mongodb.scala.model._import org.mongodb.scala.{MongoClient, _}import com.datatech.sdpimport sdp.file.Streaming._import sdp.logging.LogSupportimport scala.collection.JavaConverters._import scala.concurrent._import scala.concurrent.duration._object MGOClasses {  type MGO_ACTION_TYPE = Int  object MGO_ACTION_TYPE {    val MGO_QUERY = 0    val MGO_UPDATE = 1    val MGO_ADMIN = 2  }  /*  org.mongodb.scala.FindObservable    import com.mongodb.async.client.FindIterable    val resultDocType = FindIterable[Document]    val resultOption = FindObservable(resultDocType)      .maxScan(...)    .limit(...)    .sort(...)    .project(...) */  type FOD_TYPE = Int  object FOD_TYPE {    val FOD_FIRST = 0 //def first(): SingleObservable[TResult], return the first item    val FOD_FILTER = 1 //def filter(filter: Bson): FindObservable[TResult]    val FOD_LIMIT = 2 //def limit(limit: Int): FindObservable[TResult]    val FOD_SKIP = 3 //def skip(skip: Int): FindObservable[TResult]    val FOD_PROJECTION = 4 //def projection(projection: Bson): FindObservable[TResult]    //Sets a document describing the fields to return for all matching documents    val FOD_SORT = 5 //def sort(sort: Bson): FindObservable[TResult]    val FOD_PARTIAL = 6 //def partial(partial: Boolean): FindObservable[TResult]    //Get partial results from a sharded cluster if one or more shards are unreachable (instead of throwing an error)    val FOD_CURSORTYPE = 7 //def cursorType(cursorType: CursorType): FindObservable[TResult]    //Sets the cursor type    val FOD_HINT = 8 //def hint(hint: Bson): FindObservable[TResult]    //Sets the hint for which index to use. A null value means no hint is set    val FOD_MAX = 9 //def max(max: Bson): FindObservable[TResult]    //Sets the exclusive upper bound for a specific index. A null value means no max is set    val FOD_MIN = 10 //def min(min: Bson): FindObservable[TResult]    //Sets the minimum inclusive lower bound for a specific index. A null value means no max is set    val FOD_RETURNKEY = 11 //def returnKey(returnKey: Boolean): FindObservable[TResult]    //Sets the returnKey. If true the find operation will return only the index keys in the resulting documents    val FOD_SHOWRECORDID = 12 //def showRecordId(showRecordId: Boolean): FindObservable[TResult]    //Sets the showRecordId. Set to true to add a field `\$recordId` to the returned documents  }  case class ResultOptions(                            optType: FOD_TYPE,                            bson: Option[Bson] = None,                            value: Int = 0 ){    import FOD_TYPE._     def toFindObservable: FindObservable[Document] => FindObservable[Document] = find => {      optType match {        case  FOD_FIRST        => find        case  FOD_FILTER       => find.filter(bson.get)        case  FOD_LIMIT        => find.limit(value)        case  FOD_SKIP         => find.skip(value)        case  FOD_PROJECTION   => find.projection(bson.get)        case  FOD_SORT         => find.sort(bson.get)        case  FOD_PARTIAL      => find.partial(value != 0)        case  FOD_CURSORTYPE   => find        case  FOD_HINT         => find.hint(bson.get)        case  FOD_MAX          => find.max(bson.get)        case  FOD_MIN          => find.min(bson.get)        case  FOD_RETURNKEY    => find.returnKey(value != 0)        case  FOD_SHOWRECORDID => find.showRecordId(value != 0)      }    }  }  trait MGOCommands  object MGOCommands {    case class Count(filter: Option[Bson] = None, options: Option[Any] = None) extends MGOCommands    case class Distict(fieldName: String, filter: Option[Bson] = None) extends MGOCommands    /*  org.mongodb.scala.FindObservable    import com.mongodb.async.client.FindIterable    val resultDocType = FindIterable[Document]    val resultOption = FindObservable(resultDocType)      .maxScan(...)    .limit(...)    .sort(...)    .project(...) */    case class Find(filter: Option[Bson] = None,                       andThen: Seq[ResultOptions] = Seq.empty[ResultOptions],                       firstOnly: Boolean = false) extends MGOCommands    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands  }  object MGOAdmins {    case class DropCollection(collName: String) extends MGOCommands    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands    case class ListCollection(dbName: String) extends MGOCommands    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands  }  case class MGOContext(                         dbName: String,                         collName: String,                         actionType: MGO_ACTION_TYPE = MGO_ACTION_TYPE.MGO_QUERY,                         action: Option[MGOCommands] = None,                         actionOptions: Option[Any] = None,                         actionTargets: Seq[String] = Nil                       ) {    ctx =>    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)    def setCollName(name: String): MGOContext = ctx.copy(collName = name)    def setActionType(at: MGO_ACTION_TYPE): MGOContext = ctx.copy(actionType = at)    def setCommand(cmd: MGOCommands): MGOContext  = ctx.copy(action = Some(cmd))  }  object MGOContext {    def apply(db: String, coll: String) = new MGOContext(db, coll)  }  case class MGOBatContext(contexts: Seq[MGOContext], tx: Boolean = false) {    ctxs =>    def setTx(txopt: Boolean): MGOBatContext = ctxs.copy(tx = txopt)    def appendContext(ctx: MGOContext): MGOBatContext =      ctxs.copy(contexts = contexts :+ ctx)  }  object MGOBatContext {    def apply(ctxs: Seq[MGOContext], tx: Boolean = false) = new MGOBatContext(ctxs,tx)  }  type MGODate = java.util.Date  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {    val ca = Calendar.getInstance()    ca.set(yyyy,mm,dd)    ca.getTime()  }  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {    val ca = Calendar.getInstance()    ca.set(yyyy,mm,dd,hr,min,sec)    ca.getTime()  }  def mgoDateTimeNow: MGODate = {    val ca = Calendar.getInstance()    ca.getTime  }  def mgoDateToString(dt: MGODate, formatString: String): String = {    val fmt= new SimpleDateFormat(formatString)    fmt.format(dt)  }  type MGOBlob = BsonBinary  type MGOArray = BsonArray  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(    implicit mat: Materializer) = FileToByteArray(fileName,timeOut)  def mgoBlobToFile(blob: MGOBlob, fileName: String)(    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)  def mgoGetStringOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getString(fieldName))    else None  }  def mgoGetIntOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getInteger(fieldName))    else None  }  def mgoGetLonggOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getLong(fieldName))    else None  }  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getDouble(fieldName))    else None  }  def mgoGetBoolOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getBoolean(fieldName))    else None  }  def mgoGetDateOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      Some(doc.getDate(fieldName))    else None  }  def mgoGetBlobOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      doc.get(fieldName).asInstanceOf[Option[MGOBlob]]    else None  }  def mgoGetArrayOrNone(doc: Document, fieldName: String) = {    if (doc.keySet.contains(fieldName))      doc.get(fieldName).asInstanceOf[Option[MGOArray]]    else None  }  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {    (arr.getValues.asScala.toList)      .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]]  }  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]}object MGOEngine extends LogSupport {  import MGOClasses._  import MGOAdmins._  import MGOCommands._  import sdp.result.DBOResult._  import com.mongodb.reactivestreams.client.MongoClients  object TxUpdateMode {    private def mgoTxUpdate(ctxs: MGOBatContext, observable: SingleObservable[ClientSession])(              implicit client: MongoClient, ec: ExecutionContext): SingleObservable[ClientSession] = {      log.info(s"mgoTxUpdate> calling ...")      observable.map(clientSession => {        val transactionOptions =          TransactionOptions.builder()            .readConcern(ReadConcern.SNAPSHOT)            .writeConcern(WriteConcern.MAJORITY).build()        clientSession.startTransaction(transactionOptions)/*        val fut = Future.traverse(ctxs.contexts) { ctx =>          mgoUpdateObservable[Completed](ctx).map(identity).toFuture()        }        Await.ready(fut, 3 seconds) */        ctxs.contexts.foreach { ctx =>          mgoUpdateObservable[Completed](ctx).map(identity).toFuture()        }        clientSession      })    }    private def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {      log.info(s"commitAndRetry> calling ...")      observable.recoverWith({        case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {          log.warn("commitAndRetry> UnknownTransactionCommitResult, retrying commit operation ...")          commitAndRetry(observable)        }        case e: Exception => {          log.error(s"commitAndRetry> Exception during commit ...: $e")          throw e        }      })    }    private def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {      log.info(s"runTransactionAndRetry> calling ...")      observable.recoverWith({        case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {          log.warn("runTransactionAndRetry> TransientTransactionError, aborting transaction and retrying ...")          runTransactionAndRetry(observable)        }      })    }    def mgoTxBatch(ctxs: MGOBatContext)(            implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {      log.info(s"mgoTxBatch>  MGOBatContext: ${ctxs}")      val updateObservable: Observable[ClientSession] = mgoTxUpdate(ctxs, client.startSession())      val commitTransactionObservable: SingleObservable[Completed] =        updateObservable.flatMap(clientSession => clientSession.commitTransaction())      val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)      runTransactionAndRetry(commitAndRetryObservable)      valueToDBOResult(Completed())    }  }  def mgoUpdateBatch(ctxs: MGOBatContext)(implicit client: MongoClient, ec: ExecutionContext): DBOResult[Completed] = {    log.info(s"mgoUpdateBatch>  MGOBatContext: ${ctxs}")    if (ctxs.tx) {        TxUpdateMode.mgoTxBatch(ctxs)      } else {/*        val fut = Future.traverse(ctxs.contexts) { ctx =>          mgoUpdate[Completed](ctx).map(identity) }        Await.ready(fut, 3 seconds)        FastFastFuture.successful(new Completed) */        ctxs.contexts.foreach { ctx =>          mgoUpdate[Completed](ctx).map(identity) }         valueToDBOResult(Completed())      }  }  def mongoStream(ctx: MGOContext)(    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {    log.info(s"mongoStream>  MGOContext: ${ctx}")    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mongoStream> uery action cannot be null!")      throw new IllegalArgumentException("query action cannot be null!")    }    try {      ctx.action.get match {        case Find(None, Nil, false) => //FindObservable          MongoSource(ObservableToPublisher(coll.find()))        case Find(None, Nil, true) => //FindObservable          MongoSource(ObservableToPublisher(coll.find().first()))        case Find(Some(filter), Nil, false) => //FindObservable          MongoSource(ObservableToPublisher(coll.find(filter)))        case Find(Some(filter), Nil, true) => //FindObservable          MongoSource(ObservableToPublisher(coll.find(filter).first()))        case Find(None, sro, _) => //FindObservable          val next = toResultOption(sro)          MongoSource(ObservableToPublisher(next(coll.find[Document]())))        case Find(Some(filter), sro, _) => //FindObservable          val next = toResultOption(sro)          MongoSource(ObservableToPublisher(next(coll.find[Document](filter))))        case _ =>          log.error(s"mongoStream> unsupported streaming query [${ctx.action.get}]")          throw new RuntimeException(s"mongoStream> unsupported streaming query [${ctx.action.get}]")      }    }    catch { case e: Exception =>      log.error(s"mongoStream> runtime error: ${e.getMessage}")      throw new RuntimeException(s"mongoStream> Error: ${e.getMessage}")    }  }  // T => FindIterable  e.g List[Document]  def mgoQuery[T](ctx: MGOContext, Converter: Option[Document => Any] = None)(implicit client: MongoClient): DBOResult[T] = {    log.info(s"mgoQuery>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    def toResultOption(rts: Seq[ResultOptions]): FindObservable[Document] => FindObservable[Document] = findObj =>      rts.foldRight(findObj)((a,b) => a.toFindObservable(b))    if ( ctx.action == None) {      log.error(s"mgoQuery> uery action cannot be null!")      Left(new IllegalArgumentException("query action cannot be null!"))    }    try {      ctx.action.get match {        /* count */        case Count(Some(filter), Some(opt)) => //SingleObservable          coll.countDocuments(filter, opt.asInstanceOf[CountOptions])            .toFuture().asInstanceOf[Future[T]]        case Count(Some(filter), None) => //SingleObservable          coll.countDocuments(filter).toFuture()            .asInstanceOf[Future[T]]        case Count(None, None) => //SingleObservable          coll.countDocuments().toFuture()            .asInstanceOf[Future[T]]        /* distinct */        case Distict(field, Some(filter)) => //DistinctObservable          coll.distinct(field, filter).toFuture()            .asInstanceOf[Future[T]]        case Distict(field, None) => //DistinctObservable          coll.distinct((field)).toFuture()            .asInstanceOf[Future[T]]        /* find */        case Find(None, Nil, false) => //FindObservable          if (Converter == None) coll.find().toFuture().asInstanceOf[Future[T]]          else coll.find().map(Converter.get).toFuture().asInstanceOf[Future[T]]        case Find(None, Nil, true) => //FindObservable          if (Converter == None) coll.find().first().head().asInstanceOf[Future[T]]          else coll.find().first().map(Converter.get).head().asInstanceOf[Future[T]]        case Find(Some(filter), Nil, false) => //FindObservable          if (Converter == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]          else coll.find(filter).map(Converter.get).toFuture().asInstanceOf[Future[T]]        case Find(Some(filter), Nil, true) => //FindObservable          if (Converter == None) coll.find(filter).first().head().asInstanceOf[Future[T]]          else coll.find(filter).first().map(Converter.get).head().asInstanceOf[Future[T]]        case Find(None, sro, _) => //FindObservable          val next = toResultOption(sro)          if (Converter == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]          else next(coll.find[Document]()).map(Converter.get).toFuture().asInstanceOf[Future[T]]        case Find(Some(filter), sro, _) => //FindObservable          val next = toResultOption(sro)          if (Converter == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]          else next(coll.find[Document](filter)).map(Converter.get).toFuture().asInstanceOf[Future[T]]        /* aggregate AggregateObservable*/        case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]        /* mapReduce MapReduceObservable*/        case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]        /* list collection */        case ListCollection(dbName) => //ListConllectionObservable          client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]      }    }    catch { case e: Exception =>      log.error(s"mgoQuery> runtime error: ${e.getMessage}")      Left(new RuntimeException(s"mgoQuery> Error: ${e.getMessage}"))    }  }  //T => Completed, result.UpdateResult, result.DeleteResult  def mgoUpdate[T](ctx: MGOContext)(implicit client: MongoClient): DBOResult[T] =    try {      mgoUpdateObservable[T](ctx).toFuture()    }    catch { case e: Exception =>      log.error(s"mgoUpdate> runtime error: ${e.getMessage}")      Left(new RuntimeException(s"mgoUpdate> Error: ${e.getMessage}"))    }  def mgoUpdateObservable[T](ctx: MGOContext)(implicit client: MongoClient): SingleObservable[T] = {    log.info(s"mgoUpdateObservable>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mgoUpdateObservable> uery action cannot be null!")      throw new IllegalArgumentException("mgoUpdateObservable> query action cannot be null!")    }    try {      ctx.action.get match {        /* insert */        case Insert(docs, Some(opt)) => //SingleObservable[Completed]          if (docs.size > 1)            coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).asInstanceOf[SingleObservable[T]]          else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).asInstanceOf[SingleObservable[T]]        case Insert(docs, None) => //SingleObservable          if (docs.size > 1) coll.insertMany(docs).asInstanceOf[SingleObservable[T]]          else coll.insertOne(docs.head).asInstanceOf[SingleObservable[T]]        /* delete */        case Delete(filter, None, onlyOne) => //SingleObservable          if (onlyOne) coll.deleteOne(filter).asInstanceOf[SingleObservable[T]]          else coll.deleteMany(filter).asInstanceOf[SingleObservable[T]]        case Delete(filter, Some(opt), onlyOne) => //SingleObservable          if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]          else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).asInstanceOf[SingleObservable[T]]        /* replace */        case Replace(filter, replacement, None) => //SingleObservable          coll.replaceOne(filter, replacement).asInstanceOf[SingleObservable[T]]        case Replace(filter, replacement, Some(opt)) => //SingleObservable          coll.replaceOne(filter, replacement, opt.asInstanceOf[ReplaceOptions]).asInstanceOf[SingleObservable[T]]        /* update */        case Update(filter, update, None, onlyOne) => //SingleObservable          if (onlyOne) coll.updateOne(filter, update).asInstanceOf[SingleObservable[T]]          else coll.updateMany(filter, update).asInstanceOf[SingleObservable[T]]        case Update(filter, update, Some(opt), onlyOne) => //SingleObservable          if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]          else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).asInstanceOf[SingleObservable[T]]        /* bulkWrite */        case BulkWrite(commands, None) => //SingleObservable          coll.bulkWrite(commands).asInstanceOf[SingleObservable[T]]        case BulkWrite(commands, Some(opt)) => //SingleObservable          coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).asInstanceOf[SingleObservable[T]]      }    }    catch { case e: Exception =>      log.error(s"mgoUpdateObservable> runtime error: ${e.getMessage}")      throw new RuntimeException(s"mgoUpdateObservable> Error: ${e.getMessage}")    }  }  def mgoAdmin(ctx: MGOContext)(implicit client: MongoClient): DBOResult[Completed] = {    log.info(s"mgoAdmin>  MGOContext: ${ctx}")    val db = client.getDatabase(ctx.dbName)    val coll = db.getCollection(ctx.collName)    if ( ctx.action == None) {      log.error(s"mgoAdmin> uery action cannot be null!")      Left(new IllegalArgumentException("mgoAdmin> query action cannot be null!"))    }    try {      ctx.action.get match {        /* drop collection */        case DropCollection(collName) => //SingleObservable          val coll = db.getCollection(collName)          coll.drop().toFuture()        /* create collection */        case CreateCollection(collName, None) => //SingleObservable          db.createCollection(collName).toFuture()        case CreateCollection(collName, Some(opt)) => //SingleObservable          db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture()        /* list collection      case ListCollection(dbName) =>   //ListConllectionObservable        client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]]        */        /* create view */        case CreateView(viewName, viewOn, pline, None) => //SingleObservable          db.createView(viewName, viewOn, pline).toFuture()        case CreateView(viewName, viewOn, pline, Some(opt)) => //SingleObservable          db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture()        /* create index */        case CreateIndex(key, None) => //SingleObservable          coll.createIndex(key).toFuture().asInstanceOf[Future[Completed]] //   asInstanceOf[SingleObservable[Completed]]        case CreateIndex(key, Some(opt)) => //SingleObservable          coll.createIndex(key, opt.asInstanceOf[IndexOptions]).asInstanceOf[Future[Completed]] // asInstanceOf[SingleObservable[Completed]]        /* drop index */        case DropIndexByName(indexName, None) => //SingleObservable          coll.dropIndex(indexName).toFuture()        case DropIndexByName(indexName, Some(opt)) => //SingleObservable          coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture()        case DropIndexByKey(key, None) => //SingleObservable          coll.dropIndex(key).toFuture()        case DropIndexByKey(key, Some(opt)) => //SingleObservable          coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture()        case DropAllIndexes(None) => //SingleObservable          coll.dropIndexes().toFuture()        case DropAllIndexes(Some(opt)) => //SingleObservable          coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture()      }    }    catch { case e: Exception =>      log.error(s"mgoAdmin> runtime error: ${e.getMessage}")      throw new RuntimeException(s"mgoAdmin> Error: ${e.getMessage}")    }  }}object MongoActionStream {  import MGOClasses._  case class StreamingInsert[A](dbName: String,                                collName: String,                                converter: A => Document,                                parallelism: Int = 1                               ) extends MGOCommands  case class StreamingDelete[A](dbName: String,                                collName: String,                                toFilter: A => Bson,                                parallelism: Int = 1,                                justOne: Boolean = false                               ) extends MGOCommands  case class StreamingUpdate[A](dbName: String,                                collName: String,                                toFilter: A => Bson,                                toUpdate: A => Bson,                                parallelism: Int = 1,                                justOne: Boolean = false                               ) extends MGOCommands  case class InsertAction[A](ctx: StreamingInsert[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =      Flow[A].map(ctx.converter)        .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))  }  case class UpdateAction[A](ctx: StreamingUpdate[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =      if (ctx.justOne) {        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))      } else        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))  }  case class DeleteAction[A](ctx: StreamingDelete[A])(    implicit mongoClient: MongoClient) {    val database = mongoClient.getDatabase(ctx.dbName)    val collection = database.getCollection(ctx.collName)    def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =      if (ctx.justOne) {        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))      } else        Flow[A]          .mapAsync(ctx.parallelism)(a =>            collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))  }}object MGOHelpers {  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {    override val converter: (Document) => String = (doc) => doc.toJson  }  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {    override val converter: (C) => String = (doc) => doc.toString  }  trait ImplicitObservable[C] {    val observable: Observable[C]    val converter: (C) => String    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)    def headResult() = Await.result(observable.head(), 10 seconds)    def printResults(initial: String = ""): Unit = {      if (initial.length > 0) print(initial)      results().foreach(res => println(converter(res)))    }    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")  }  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {    Await.result(fut, timeOut)  }  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {    Await.result(fut, timeOut)  }  import monix.eval.Task  import monix.execution.Scheduler.Implicits.global  final class FutureToTask[A](x: => Future[A]) {    def asTask: Task[A] = Task.deferFuture[A](x)  }  final class TaskToFuture[A](x: => Task[A]) {    def asFuture: Future[A] = x.runToFuture  }}

TestMongoEngine.scala

import akka.actor.ActorSystemimport akka.stream.ActorMaterializerimport org.mongodb.scala._import scala.collection.JavaConverters._import com.mongodb.client.model._import com.datatech.sdp.mongo.engine._import MGOClasses._import scala.util._object TestMongoEngine extends App {  import MGOEngine._  import MGOHelpers._  import MGOCommands._  import MGOAdmins._  // or provide custom MongoClientSettings  val settings: MongoClientSettings = MongoClientSettings.builder()    .applyToClusterSettings(b => b.hosts(List(new ServerAddress("localhost")).asJava))    .build()  implicit val client: MongoClient = MongoClient(settings)  implicit val system = ActorSystem()  implicit val mat = ActorMaterializer() // implicit val ec = system.dispatcher  val ctx = MGOContext("testdb","po").setCommand(    DropCollection("po"))  import monix.execution.Scheduler.Implicits.global  println(getResult(mgoAdmin(ctx).value.value.runToFuture))scala.io.StdIn.readLine()  val pic = fileToMGOBlob("/users/tiger/cert/MeTiger.png")  val po1 = Document (    "ponum" -> "po18012301",    "vendor" -> "The smartphone compay",    "podate" -> mgoDate(2017,5,13),    "remarks" -> "urgent, rush order",    "handler" -> pic,    "podtl" -> Seq(      Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),      Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")    )  )  val po2 = Document (    "ponum" -> "po18022002",    "vendor" -> "The Samsung compay",    "podate" -> mgoDate(2015,11,6),    "podtl" -> Seq(      Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),      Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),      Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")    )  )  val optInsert = new InsertManyOptions().ordered(true)  val ctxInsert = ctx.setCommand(    Insert(Seq(po1,po2),Some(optInsert))  )  println(getResult(mgoUpdate(ctxInsert).value.value.runToFuture))  scala.io.StdIn.readLine()  case class PO (                  ponum: String,                  podate: MGODate,                  vendor: String,                  remarks: Option[String],                  podtl: Option[MGOArray],                  handler: Option[MGOBlob]                )  def toPO(doc: Document): PO = {    PO(      ponum = doc.getString("ponum"),      podate = doc.getDate("podate"),      vendor = doc.getString("vendor"),      remarks = mgoGetStringOrNone(doc,"remarks"),      podtl = mgoGetArrayOrNone(doc,"podtl"),      handler = mgoGetBlobOrNone(doc,"handler")    )  }  case class PODTL(                    item: String,                    price: Double,                    qty: Int,                    packing: Option[String],                    payTerm: Option[String]                  )  def toPODTL(podtl: Document): PODTL = {    PODTL(      item = podtl.getString("item"),      price = podtl.getDouble("price"),      qty = podtl.getInteger("qty"),      packing = mgoGetStringOrNone(podtl,"packing"),      payTerm = mgoGetStringOrNone(podtl,"payterm")    )  }  def showPO(po: PO) = {    println(s"po number: ${po.ponum}")    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")    println(s"vendor: ${po.vendor}")    if (po.remarks != None)      println(s"remarks: ${po.remarks.get}")    po.podtl match {      case Some(barr) =>        mgoArrayToDocumentList(barr)          .map { dc => toPODTL(dc)}          .foreach { doc: PODTL =>            print(s"==>Item: ${doc.item} ")            print(s"price: ${doc.price} ")            print(s"qty: ${doc.qty} ")            doc.packing.foreach(pk => print(s"packing: ${pk} "))            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))            println("")          }      case _ =>    }    po.handler match {      case Some(blob) =>        val fileName = s"/users/tiger/${po.ponum}.png"        mgoBlobToFile(blob,fileName)        println(s"picture saved to ${fileName}")      case None => println("no picture provided")    }  }  import org.mongodb.scala.model.Projections._  import org.mongodb.scala.model.Filters._  import org.mongodb.scala.model.Sorts._  import org.mongodb.scala.bson.conversions._  import org.mongodb.scala.bson.Document  val ctxFilter = Find(filter=Some(equal("podtl.qty",100)))  val sort: Bson = (descending("ponum"))  val proj: Bson = (and(include("ponum","podate")                   ,include("vendor"),excludeId()))  val resSort = ResultOptions(FOD_TYPE.FOD_SORT,Some(sort))  val resProj = ResultOptions(FOD_TYPE.FOD_PROJECTION,Some(proj))  val ctxFind = ctx.setCommand(Find(andThen=Seq(resProj,resSort)))  val ctxFindFirst = ctx.setCommand(Find(firstOnly=true))  val ctxFindArrayItem = ctx.setCommand(    Find(filter = Some(equal("podtl.qty",100)))  )  for {    _ <- mgoQuery[List[Document]](ctxFind).value.value.runToFuture.andThen {      case Success(eold) => eold match {        case Right(old) => old match {          case Some(ld) => ld.map(toPO(_)).foreach(showPO)          case None => println(s"Empty document found!")        }        case Left(err) => println(s"Error: ${err.getMessage}")      }        println("-------------------------------")      case Failure(e) => println(e.getMessage)    }    _ <- mgoQuery[PO](ctxFindFirst,Some(toPO _)).value.value.runToFuture.andThen {      case Success(eop) => eop match {        case Right(op) => op match {          case Some(p) => showPO(_)          case None => println(s"Empty document found!")        }        case Left(err) => println(s"Error: ${err.getMessage}")      }        println("-------------------------------")      case Failure(e) => println(e.getMessage)    }    _ <- mgoQuery[List[PO]](ctxFindArrayItem,Some(toPO _)).value.value.runToFuture.andThen {      case Success(eops) => eops match {        case Right(ops) => ops match {          case Some(lp) => lp.foreach(showPO)          case None => println(s"Empty document found!")        }        case Left(err) => println(s"Error: ${err.getMessage}")      }        println("-------------------------------")      case Failure(e) => println(e.getMessage)    }  } yield()  scala.io.StdIn.readLine()  system.terminate()}

 

转载于:https://www.cnblogs.com/tiger-xc/p/11301362.html

你可能感兴趣的文章
查看linux系统版本命令
查看>>
20155302 课堂实践二
查看>>
JavaScript数值类型保留显示小数方法
查看>>
python--以1-31的数字作为结尾的列表?论英文好的重要性!
查看>>
nginx笔记---http配置
查看>>
linux命令df中df -h和df -i
查看>>
201771010130 王志成《面向对象程序设计(java)》第十二周学习总结
查看>>
百词斩
查看>>
Unity3D 开发问题记录笔记
查看>>
Linux 下的图形库介绍
查看>>
面试问我 Java 逃逸分析,瞬间被秒杀了。。
查看>>
公式/定理
查看>>
dockerfile mysql
查看>>
Linux 释放cache化缓存
查看>>
loadrunner11的移动端性能测试之场景设计
查看>>
C#颜色转Delphi颜色的C#代码
查看>>
MainFrame知识小结(20110925)--cobol中table越界
查看>>
hdu 5429 Geometric Progression(存个大数模板)
查看>>
轻松搞定面试中的二叉树题目
查看>>
How to detect when a list is scrolling (or not)
查看>>