Search code examples
akka-streamreactive-kafka

connect producer flow to graph


I am new using akka streams kafka (and akka streams in general) . I am trying to construct a graph in order to publish a message to different topics. How can I connect the producer as flow in order to commit the processed messages ? I tried using Producer.flow but I can't get the commitScaladsl

object TestFoo {
  import akka.kafka.ProducerMessage.Message
  implicit val system = ActorSystem("test-kafka")
  implicit val materializer = ActorMaterializer()
  val evenNumbersTopic = "even_numbers"
  val allNumbersTopic = "all_numbers"
  lazy val consumerSettings = ConsumerSettings(system, new StringDeserializer(), new JsonDeserializer[Int])
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  lazy val source =  Consumer.committableSource(consumerSettings, Subscriptions.topics(Set(evenNumbersTopic, allNumbersTopic)))
  val producerSettings = ProducerSettings(system,  new StringSerializer(), new StringSerializer())
    .withBootstrapServers("localhost:9092")
  val flow: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
    import akka.stream.scaladsl.GraphDSL.Implicits._
    type TypedMessage =  Message[String, Int,CommittableOffset]
    val bcast = b.add(Broadcast[TypedMessage](2))
    val merge = b.add(Merge[TypedMessage](2))

    val evenFilter = Flow[TypedMessage].filter (  c => c.record.value() % 2 == 0)
    val justEven = Flow[TypedMessage].map{
      case Message(pr, offset) =>
      val r = new ProducerRecord[String, Int]("general", pr.value())
      Message(r, offset)
    }
    val allNumbers = Flow[TypedMessage].map{
      case Message(pr, offset) =>
      val r = new ProducerRecord[String, Int](allNumbersTopic, pr.value())
      Message(r, offset)
    }

    val toMsg = Flow[ConsumerMessage.CommittableMessage[String, Int]].map{ msg =>
      val r = new ProducerRecord[String, Int]("general", msg.record.value())
      Message(r, msg.committableOffset)
    }
    source ~> toMsg ~> bcast

    bcast ~> evenFilter ~> justEven ~> merge
    bcast ~> allNumbers ~> merge
    merge ~> Producer.flow(producerSettings).mapAsync(producerSettings.parallelism) { result =>
      result.message.passThrough.commitScaladsl() //this doesn't compile, cannot get the .commitScaladsl()
    }
    ClosedShape 
  })}

Solution

  • Because you are using the GraphDSL, the compiler cannot infer the PassThrough type from the previous stage. Try and explicitly pass the type parameters to the Producer.flow function, e.g.

    merge ~> Producer.flow[K, V, CommittableOffset](producerSettings).mapAsync(producerSettings.parallelism) { result =>
      result.message.passThrough.commitScaladsl()
    }
    

    I have left K and V as unbound param, please fit there whatever key/value types your Producer is bound to produce. If you want the code above to be correctly wired, you'll need to match the producerSettings types with what comes from the merge stage. You'll need something like:

    val producerSettings = ProducerSettings(system,  new StringSerializer(), new JsonSerializer[Int])
        .withBootstrapServers("localhost:9092")