Search code examples
scalaakka-streamstream-processing

Merge two Stream based on Same ID in Akka Stream


I have two Input Stream. I would like to merge two stream element based on same ID. Here is the code details

  implicit val system = ActorSystem("sourceDemo")
  implicit val materializer = ActorMaterializer()

  case class Foo(id: Int, value: String)
  case class Bar(id: Int, value: String)
  case class MergeResult(id: Int, fooValue: String, barValue: String)

  val sourceOne = Source(List.fill(100)(Foo(Random.nextInt(100), value = "foo")))
  val sourceTwo = Source(List.fill(100)(Bar(Random.nextInt(100), value = "bar")))

What I would like to get the result is MergeResult, which is based on the same id in Foo and Bar.

Also, for some Foo and Bar which has mismatched id, I would like to keep in the memory, I wonder if there is a clean way to do it because it is stateful.

More importantly, the source elements are in order. If there are ID duplicates found, the strategy should be first matched first served. That means if Foo(1, "foo-1"), Foo(1, "foo-2") and Bar(1, "Bar-1"), the match should be MergeResult(1, "foo-1", "Bar-1") .

I am looking at some solutions from akka stream at the moment. If there are some good solution like Spark, Flink and so on, that would be helpful as well.

Thanks in advance.


Solution

  • You are precisely describing a join operation.

    Akka streams does not support join operations. You may find a way to do that using windowing on each stream and some actor/stateful transformation to do the lookup between them, but last time I searched for this I found nothing (not so long ago), so you are probably in uncharted waters.

    You will only find joins on streams on more heavy-weight frameworks: Flink, Spark Streaming, Kafka streams. The reason is that joins fundamentally is a lookup of one stream against another, which means that it needs more complex stuff (state management) than the designers of Akka streams wanted to deal with.