Search code examples
scalatransactionsreactivemongo

Transactions With JSONCollection instead of BSONCollection


I have a problem to make Transaction via JSONCollection, I getting the following error:

JsResultException(errors:List((,List(JsonValidationError(List(CommandError[code=14, errmsg=BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long', doc: {"operationTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"ok":0,"errmsg":"BSON field 'OperationSessionInfo.txnNumber' is the wrong type 'int', expected type 'long'","code":14,"codeName":"TypeMismatch","$clusterTime":{"clusterTime":{"$time":1596894245,"$i":5,"$timestamp":{"t":1596894245,"i":5}},"signature":{"hash":{"$binary":"0000000000000000000000000000000000000000","$type":"00"},"keyId":0}}}]),WrappedArray())))))

I tried to change my project to BSONCollection but got some troubles, maybe there solution to overcome the above error with JSONCollection. Also the exceptions occurs on testing update method, but checking the insertOneViaTransaction and setRuleAsInactiveViaTransaction is finished with success This is my code for Transaction: Update:

 def update(oldRule: ExistRuleDto): Future[UpdateResult] = {
    val transaction = (collection: JSONCollection) => for {
      newRule <- dao.insertOneViaTransaction(collection,oldRule.toUpdatedRule) // insert new with ref to old
      oldRule <- dao.setRuleAsInactiveViaTransaction(collection,oldRule.id)
    } yield UpdateResult(oldRule, newRule)

      makeTransaction[UpdateResult](transaction)
  }

makeTransaction:

def makeTransaction[Out](block: JSONCollection => Future[Out]): Future[Out] = for {
    dbWithSession <- dao.collection.db.startSession()
    dbWithTx <- dbWithSession.startTransaction(None)
    coll = dbWithTx.collection[JSONCollection](dao.collection.name)

    // Operations:
    res <- block(coll)

    _ <- dbWithTx.commitTransaction()
    _ <- dbWithSession.endSession()
  } yield res

insertOneViaTransaction:

  def insertOneViaTransaction(collection: JSONCollection, rule: Rule): Future[Rule] = {
    collection.insert.one(rule).map {
      case DefaultWriteResult(true, 1, _, _, _, _) => rule
      case err => throw GeneralDBError(s"$rule was not inserted, something went wrong: $err")
    }.recover {
        case WriteResult.Code(11000) => throw DuplicationError(s"$rule exist on DB")
        case err => throw GeneralDBError(err.getMessage)
      }
  }

setRuleAsInactiveViaTransaction:

 def setRuleAsInactiveViaTransaction(collection: JSONCollection, ruleId: BSONObjectID): Future[Rule] = {
    collection.findAndUpdate(
      Json.obj(s"${Rule.ID}" -> ruleId),
      Json.obj(
        "$set" -> Json.obj(s"${Rule.Metadata}.${Metadata.Active}" -> false),
        "$unset" -> Json.obj(s"${Rule.Metadata}.${Metadata.LastVersionExists}" -> "")),
    fetchNewObject = true, upsert = false, sort = None, fields = None, bypassDocumentValidation = false, writeConcern = WriteConcern.Acknowledged, maxTime = None, collation = None, arrayFilters = Nil
    ).map(el => el.result[Rule].getOrElse {
      val msg = s"Operation fail for updating ruleId = $ruleId"
      logger.error(msg)
      throw GeneralUpdateError(msg)
    })
  }

I'm using the following dependencies: Play:

    "com.typesafe.play" % "sbt-plugin" % "2.7.2

Reactivemongo:

    "org.reactivemongo" %% "play2-reactivemongo" % "0.18.8-play27"

Solution

  • Solve it. (Not with compact) Serializers:

      implicit object JsValueHandler extends BSONHandler[BSONValue, JsValue] {
        implicit override def read(bson: BSONValue): JsValue = BSONFormats.toJSON(bson)
        implicit override def write(j: JsValue): BSONValue = BSONFormats.toBSON(j).get
      }
    

    asTransaction:

    def asTransaction[Out](block: BSONCollection => Future[Out]): Future[Out] = {
        for {
          dbWithSession <- collection.db.startSession()
          dbWithTx <- dbWithSession.startTransaction(None)
          collectionWithTx = dbWithTx.collection[BSONCollection](collection.name)
          out <- block(collectionWithTx)
          _ <- dbWithTx.commitTransaction()
          _ <- dbWithSession.endSession()
        } yield out
      }.recover {
        case ex: Exception =>
          logger.warn(s"asTransaction failed with ex: ${ex.getMessage}, rollback to previous state...")
          throw GeneralDBErrorOnTx(ex.getMessage)
      }
    

    transaction example:

     def `change visibility of ExistsRules and insert UpdateEvents`(oldRules: List[Rule], active: Boolean): Future[Unit] = {
        ruleDao.asTransaction { collectionTx =>
          for {
            // (1) - $active old Rules
            _ <- ruleDao.updateManyWithBsonCollection(
              collectionTx,
              filter = BSONDocument(s"${Rule.ID}" -> BSONDocument("$in" -> oldRules.map(_._id))),
              update = BSONDocument("$set" -> BSONDocument(s"${Rule.Metadata}.${Metadata.Active}" -> active)))
    
            // (2) - Sync Cache with Update Events
            _ <- eventsService.addEvents(oldRules.map(rule => RuleEvent(rule.metadata.cacheKey, Update)))
    
          } yield ()
        }
      }
    

    Enjoy!