Search code examples
scalaapache-kafkaakkaakka-stream

Add custom logic/callback/handler when EOF for JsonFraming in akka-streams


I have a flow where I consume the paths to the files in small batches from Kafka pah topics, read the files themselves(big JSON arrays) and write them back to Kafka data topics.

It looks like this:

      val fileFlow = Flow[Path].flatMapConcat(HdfsSource.data(fs, _))
        .via(JsonFraming.objectScanner(Int.MaxValue))

      Consumer
        .committableSource(newConsumerSettings, Subscriptions.topics(inputTopicNames))
        .map(value => value)
        .grouped(kafkaConsumerBatch)
        .flatMapConcat(paths => Source(paths))
        .map(path => new Path(path.record.value().get("fullPath").asInstanceOf[String]))
        //Based on: https://github.com/akka/alpakka/blob/v3.0.0/doc-examples/src/test/scala/akka/stream/alpakka/eip/scaladsl/PassThroughExamples.scala#L72-L92
        .via(PassThroughFlow(fileFlow))
        .map { case (bytes, path) => (bytes, entityConfigMap(getCountryPrefix(path))) }
        .map(bytesAndPath => (bytesAndPath._1.utf8String.parseJson.asJsObject, bytesAndPath._2))
        .map { case (bytes, entityConfig) => (toGenericRecord(bytes, entityConfig), entityConfig) }
        .map { case (record, entityConfig) =>
          producerMessagesToTopic.mark()
          ProducerMessage.single(
            new ProducerRecord[NotUsed, GenericRecord](getDataTopicName(entityConfig), record),
            passThrough = entityConfig)
        }
        .via {
          akka.kafka.scaladsl.Producer.flexiFlow(prodSettings)
        }
....More logic for logging and running/materializing the flow

Now, the problem is that, as I said, those JSON files are big, so I can't take the entire file content, frame it into separate objects, store them all to Kafka and commit only after that. I mean, that's what I need to do, but I also need to control the offset commit based on the EOF event.

I want to let Producer send data over to Kafka in its own pace and regardless of its configs, but somehow inject my custom logic into an EOF event. Maybe something like a passThrough field to signify that the file has been fully consumed and we can now commit offset for the upstream path topic.
objectScanner has a GraphStageLogic inside its definition that has onUpstreamFinish callback, but there's no direct access to it for overriding. And classes like SimpleLinearGraphStage, JsonObjectParser are marked as Internal APIs.


Solution

  • I'm struck by

    ...I can't take the entire file content, frame it into separate objects, store them all to Kafka and commit only after that

    Since it seems (and you can comment if I'm getting this wrong) that the offset commit is effectively an acknowledgement that you've fully processed a file, there's no way around not committing the offset until after all the objects in the file in the message at that offset have been produced to Kafka.

    The downside of Source.via(Flow.flatMapConcat.via(...)).map.via(...) is that it's a single stream and everything between the first and second vias, inclusive takes a while.

    If you're OK with interleaving objects from files in the output topic and are OK with an unavoidable chance of an object from a given file being produced twice to the output topic (both of these may or may not impose meaningful constraints/difficulties on the implementation of downstream consumers of that topic), you can parallelize the processing of a file. The mapAsync stream stage is especially useful for this:

    import akka.Done
    
    // assuming there's an implicit Materializer/ActorSystem (depending on the version of Akka Streams you're running) in scope
    def process(path: Path): Future[Done] =
      Source.single(path)
        .via(PassThroughFlow(fileFlow))
        .map { case (bytes, path) => (bytes, entityConfigMap(getCountryPrefix(path))) }
        .map(bytesAndPath => (bytesAndPath._1.utf8String.parseJson.asJsObject, bytesAndPath._2))
        .map { case (bytes, entityConfig) => (toGenericRecord(bytes, entityConfig), entityConfig) }
        .map { case (record, entityConfig) =>
          producerMessagesToTopic.mark()
          ProducerMessage.single(
            new ProducerRecord[NotUsed, GenericRecord](getDataTopicName(entityConfig), record),
            passThrough = entityConfig)
        }
        .via {
          akka.kafka.scaladsl.Producer.flexiFlow(prodSettings)
        }
        .runWith(Sink.ignore)
    
     // then starting right after .flatMapConcat(paths => Source(paths))
     .mapAsync(parallelism) { committableMsg =>
       val p = new Path(committableMsg.record.value().get("fullPath").asInstanceOf[String])
       process(p).map { _ => committableMsg.committableOffset }
     }
     // now have the committable offsets
    

    parallelism then limits how many paths you're processing at a given time. Ordering into the committer is maintained (i.e. an offset never reaches the committer before all messages before it have been fully processed).