Search code examples
scalaakkaakka-stream

How to pass results from one source stream to another


I have a method that processes a Source and returns. I am trying to modify it but can't seem to be able to return the same thing:

Original

def originalMethod[as: AS, mat: MAT, ec: EC](checkType: String) 
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
      collectStuff
      .map { ts =>
        val errors = MyEngine.checkAll(ts.code)
        (ts, errors)
      }
      .map { x =>
        x._2
          .leftMap(xs => {
            addInformation(x._1, xs.toList)
          })
          .toEither
      }
}

I am modifying by using another source and pass result of that to the original source and yet return the same thing:

def calculate[T: AS: MAT](source: Source[T, NotUsed]): Future[Seq[T]] = 
{
 source.runWith(Sink.seq)
}


def modifiedMethod[as: AS, mat: MAT, ec: EC](checkType: String, mySource: Source[LoanApplicationRegister, NotUsed]) 
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
  for {
    calc <- calculate(mySource)
    orig <-  collectStuff
        .map { ts =>
          val errors = MyEngine.checkAll(ts.code, calc)
          (ts, errors)
        }
        .map { x =>
          x._2
            .leftMap(xs => {
              addInformation(x._1, xs.toList)
            })
            .toEither
        }
  }
  yield {
    orig
  }
}

But I'm getting compilation error Expression of type Future[Nothing] doesn't conform to existing type Flow[ByteString, MyValidation[MyClass]

How can I return Flow[ByteString, MyValidation[MyClass] in my modifiedMethod just like the originalMethod was


Solution

  •   for { calc <- calculate(mySource)}
      yield {
        collectStuff
            .map { ts =>
              val errors = MyEngine.checkAll(ts.code, calc)
              (ts, errors)
            }
            .map { x =>
              x._2
                .leftMap(xs => {
                  addInformation(x._1, xs.toList)
                })
                .toEither
            }
      }
    

    would give you a Future[Flow[ByteString, MyValidation[MyClass], NotUsed]] instead of Future[Nothing] but if you want to remove the Future you'd need to Await somewhere for it (either when you call calculate (and then you don't need the for) or after it. Usually, that's not the way to use Futures