Search code examples
kotlingoogle-cloud-dataflowapache-beamdataflow

Beam JdbcIO.readAll doesn't seem to return results


I am trying to build pipelines for events with Apache Beam. What I wanted to do is reading streaming data from GCP PubSub and read related metadata from MySQL using the ids in the events, then combine those two streams and write to my clickhouse database.

But JdbcIO.readall() doesn't seem to return its stream. As you can see on the ClickhousePipeline, after applying CoGroupByKey.create(), I am trying to combine two PCollection, but userMetaData comes in empty, and ParDo that chained right after UserMetadataEnricher() hasn't executed too.

In withRowMapper on UserMetadataEnricher, I added println() to chech if it's running, and It worked properly and print the results from my database, however, It doesn't return data to the next pipeline.

I guess the problem is related to Windowing, I checked it's working when I tested it without windowing. But, PubSubIO is Unbounded PCollection, so I have to apply window to use JDBCIO.readall() right? I have no clue to solve this problem. I hope to get an answer soon!

MainPipeline

object MainPipeline {
  @JvmStatic
  fun run(options: MainPipelineOptions) {
    val p = Pipeline.create(options)

    val events = p
      .apply(
        "Read DetailViewEvent PubSub",
        PubsubIO.readStrings().fromSubscription(options.inputSubscription)
      )
      .apply(
        "Extract messages",
        ParseJsons.of(FoodDetailViewEvent::class.java)
          .exceptionsInto(
            TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
          )
          .exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
      )

    val validEvents =
      events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
    val invalidEvents = events.failures()

    invalidEvents.apply(FailurePipeline(options))
    validEvents.apply(ClickhousePipeline(options))

    p.run().waitUntilFinish()
  }

  @JvmStatic
  fun main(args: Array<String>) {
    val options = PipelineOptionsFactory
      .fromArgs(*args)
      .withValidation()
      .`as`(MainPipelineOptions::class.java)

    run(options)
  }
}

ClickhousePipeline

class ClickhousePipeline(private val options: MainPipelineOptions) :
  PTransform<PCollection<DetailViewEvent>, PDone>() {

  override fun expand(events: PCollection<DetailViewEvent>): PDone {
    val windowedEvents = events
      .apply(
        "Window", Window
          .into<DetailViewEvent>(GlobalWindows())
          .triggering(
            Repeatedly
              .forever(
                AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardSeconds(5))
              )
          )
          .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
      )

    val userIdDetailViewEvents = windowedEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
          override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
            return KV.of(input.userInfo.userId, input)
          }
        })
      )

    val userMetaData = userIdDetailViewEvents
      .apply(
        MapElements.via(object :
          SimpleFunction<KV<String, DetailViewEvent>, String>() {
          override fun apply(input: KV<String, DetailViewEvent>): String {
            return input.key!!
          }
        })
      )
      .apply(
        UserMetadataEnricher(options)
      )
      .apply(
        ParDo.of(
          object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
            @ProcessElement
            fun processElement(
              @Element data: UserMetadata,
              out: OutputReceiver<KV<String, UserMetadata>>
            ) {
              println("User:: ${data}") // Not printed!!
              out.output(KV.of(data.userId, data))
            }
          })
      )

    val sourceTag = object : TupleTag<DetailViewEvent>() {}
    val userMetadataTag = object : TupleTag<UserMetadata>() {}

    val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
      KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
        .and(userMetadataTag, userMetaData)
        .apply(CoGroupByKey.create())

    val enrichedData = joinedPipeline.apply(
      ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
        @ProcessElement
        fun processElement(
          @Element data: KV<String, CoGbkResult>,
          out: OutputReceiver<ClickHouseModel>
        ) {

          val name = data.key
          val source = data.value.getAll(sourceTag)
          val userMetadataSource = data.value.getAll(userMetadataTag)

          println("==========================")
          for (metadata in userMetadataSource.iterator()) {
            println("Metadata:: $metadata") // This is always empty
          }

          for (event in source.iterator()) {
            println("Event:: $event")
          }
          println("==========================")

          val sourceEvent = source.iterator().next()
          if (userMetadataSource.iterator().hasNext()) {
            val userMetadataEvent = userMetadataSource.iterator().next()
            out.output(
              ClickHouseModel(
                eventType = sourceEvent.eventType,
                userMetadata = userMetadataEvent
              )
            )

          }
        }
      })
    )

    val clickhouseData = enrichedData.apply(
      ParDo.of(object : DoFn<ClickHouseModel, Row>() {
        @ProcessElement
        fun processElement(context: ProcessContext) {
          val data = context.element()
          context.output(
            data.toSchema()
          )
        }
      })
    )

    return clickhouseData
      .setRowSchema(ClickHouseModel.schemaType())
      .apply(
        ClickHouseIO.write(
          "jdbc:clickhouse://127.0.0.1:8123/test?password=example",
          "clickhouse_test"
        )
      )
  }
}

UserMetadataEnricher

class UserMetadataEnricher(private val options: MainPipelineOptions) :
  PTransform<PCollection<String>, PCollection<UserMetadata>>() {

  @Throws(Exception::class)
  override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
    return events
      .apply(
        JdbcIO.readAll<String, UserMetadata>()
          .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create(
              "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
            )
              .withUsername("root")
              .withPassword("example")
          )
          .withQuery("select id,name,gender from user where id = ?")
          .withParameterSetter { id: String, preparedStatement: PreparedStatement ->
            preparedStatement.setString(1, id)
          }
          .withCoder(
            SerializableCoder.of(
              UserMetadata::class.java
            )
          )
          .withRowMapper
          {
            println("RowMapper:: ${it.getString(1)}") // printed!!
            UserMetadata(
              it.getString(1),
              it.getString(2),
              it.getString(3)
            )
          }
      )
  }
}


output

RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================

Update 1 (GlobalWindow to FixedWindow)

using AfterProcessing

I've changed my window setting and added print to SimpleFunction that is assigned to userIdDetailViewEvents.

Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
          .triggering(
            Repeatedly.forever(
              AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                Duration.standardSeconds(1)
              )
            )
          )
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes()
      )

And It prints:

userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================

Using AfterWatermark

        Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
          .triggering(
            Repeatedly.forever(
              AfterWatermark.pastEndOfWindow()
            )
          )
          .withAllowedLateness(Duration.ZERO)
          .discardingFiredPanes()

outputs

userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01

I think using AfterWatermark is correct but It is hanging on somewhere... I guess It's JdbcIO


Solution

  • You are correct, the standard configuration doesn't return results with unbounded collections.

    As long as you have a functional window / trigger combination the trick is to set .withOutputParallelization(false); to the JdbcIO.<>readAll() call as follows:

    p.apply("Read from JDBC", JdbcIO.<>readAll()
                  .withDataSourceConfiguration(getConfiguration())
                  .withQuery(getStreamingQuery())
                  .withParameterSetter((JdbcIO.PreparedStatementSetter<String>) (element, preparedStatement) -> {
                    // Prepare statement here.
                  })
                  .withRowMapper((JdbcIO.RowMapper<>) results -> {
                    // Map results here. 
                  }).withOutputParallelization(false));
    
    

    I struggled with this for hours but came across the code in this article and it finally worked.

    I used

    Window.into(new GlobalWindows()).triggering(
                            Repeatedly.forever(AfterPane.elementCountAtLeast(1))
                                    )
                                     .discardingFiredPanes())
    

    for my trigger to process one element at a time.