I am trying to build pipelines for events with Apache Beam. What I wanted to do is reading streaming data from GCP PubSub and read related metadata from MySQL using the ids in the events, then combine those two streams and write to my clickhouse database.
But JdbcIO.readall() doesn't seem to return its stream. As you can see on the ClickhousePipeline
, after applying CoGroupByKey.create()
, I am trying to combine two PCollection
, but userMetaData
comes in empty, and ParDo
that chained right after UserMetadataEnricher()
hasn't executed too.
In withRowMapper
on UserMetadataEnricher
, I added println()
to chech if it's running, and It worked properly and print the results from my database, however, It doesn't return data to the next pipeline.
I guess the problem is related to Windowing
, I checked it's working when I tested it without windowing. But, PubSubIO is Unbounded PCollection
, so I have to apply window to use JDBCIO.readall()
right? I have no clue to solve this problem. I hope to get an answer soon!
MainPipeline
object MainPipeline {
@JvmStatic
fun run(options: MainPipelineOptions) {
val p = Pipeline.create(options)
val events = p
.apply(
"Read DetailViewEvent PubSub",
PubsubIO.readStrings().fromSubscription(options.inputSubscription)
)
.apply(
"Extract messages",
ParseJsons.of(FoodDetailViewEvent::class.java)
.exceptionsInto(
TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())
)
.exceptionsVia { KV.of(it.element(), it.exception().javaClass.canonicalName) }
)
val validEvents =
events.output().setCoder(SerializableCoder.of(FoodDetailViewEvent::class.java))
val invalidEvents = events.failures()
invalidEvents.apply(FailurePipeline(options))
validEvents.apply(ClickhousePipeline(options))
p.run().waitUntilFinish()
}
@JvmStatic
fun main(args: Array<String>) {
val options = PipelineOptionsFactory
.fromArgs(*args)
.withValidation()
.`as`(MainPipelineOptions::class.java)
run(options)
}
}
ClickhousePipeline
class ClickhousePipeline(private val options: MainPipelineOptions) :
PTransform<PCollection<DetailViewEvent>, PDone>() {
override fun expand(events: PCollection<DetailViewEvent>): PDone {
val windowedEvents = events
.apply(
"Window", Window
.into<DetailViewEvent>(GlobalWindows())
.triggering(
Repeatedly
.forever(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5))
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
val userIdDetailViewEvents = windowedEvents
.apply(
MapElements.via(object :
SimpleFunction<DetailViewEvent, KV<String, DetailViewEvent>>() {
override fun apply(input: DetailViewEvent): KV<String, DetailViewEvent> {
return KV.of(input.userInfo.userId, input)
}
})
)
val userMetaData = userIdDetailViewEvents
.apply(
MapElements.via(object :
SimpleFunction<KV<String, DetailViewEvent>, String>() {
override fun apply(input: KV<String, DetailViewEvent>): String {
return input.key!!
}
})
)
.apply(
UserMetadataEnricher(options)
)
.apply(
ParDo.of(
object : DoFn<UserMetadata, KV<String, UserMetadata>>() {
@ProcessElement
fun processElement(
@Element data: UserMetadata,
out: OutputReceiver<KV<String, UserMetadata>>
) {
println("User:: ${data}") // Not printed!!
out.output(KV.of(data.userId, data))
}
})
)
val sourceTag = object : TupleTag<DetailViewEvent>() {}
val userMetadataTag = object : TupleTag<UserMetadata>() {}
val joinedPipeline: PCollection<KV<String, CoGbkResult>> =
KeyedPCollectionTuple.of(sourceTag, userIdDetailViewEvents)
.and(userMetadataTag, userMetaData)
.apply(CoGroupByKey.create())
val enrichedData = joinedPipeline.apply(
ParDo.of(object : DoFn<KV<String, CoGbkResult>, ClickHouseModel>() {
@ProcessElement
fun processElement(
@Element data: KV<String, CoGbkResult>,
out: OutputReceiver<ClickHouseModel>
) {
val name = data.key
val source = data.value.getAll(sourceTag)
val userMetadataSource = data.value.getAll(userMetadataTag)
println("==========================")
for (metadata in userMetadataSource.iterator()) {
println("Metadata:: $metadata") // This is always empty
}
for (event in source.iterator()) {
println("Event:: $event")
}
println("==========================")
val sourceEvent = source.iterator().next()
if (userMetadataSource.iterator().hasNext()) {
val userMetadataEvent = userMetadataSource.iterator().next()
out.output(
ClickHouseModel(
eventType = sourceEvent.eventType,
userMetadata = userMetadataEvent
)
)
}
}
})
)
val clickhouseData = enrichedData.apply(
ParDo.of(object : DoFn<ClickHouseModel, Row>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val data = context.element()
context.output(
data.toSchema()
)
}
})
)
return clickhouseData
.setRowSchema(ClickHouseModel.schemaType())
.apply(
ClickHouseIO.write(
"jdbc:clickhouse://127.0.0.1:8123/test?password=example",
"clickhouse_test"
)
)
}
}
UserMetadataEnricher
class UserMetadataEnricher(private val options: MainPipelineOptions) :
PTransform<PCollection<String>, PCollection<UserMetadata>>() {
@Throws(Exception::class)
override fun expand(events: PCollection<String>): PCollection<UserMetadata> {
return events
.apply(
JdbcIO.readAll<String, UserMetadata>()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/beam-test"
)
.withUsername("root")
.withPassword("example")
)
.withQuery("select id,name,gender from user where id = ?")
.withParameterSetter { id: String, preparedStatement: PreparedStatement ->
preparedStatement.setString(1, id)
}
.withCoder(
SerializableCoder.of(
UserMetadata::class.java
)
)
.withRowMapper
{
println("RowMapper:: ${it.getString(1)}") // printed!!
UserMetadata(
it.getString(1),
it.getString(2),
it.getString(3)
)
}
)
}
}
output
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
I've changed my window setting and added print to SimpleFunction
that is assigned to userIdDetailViewEvents.
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
Duration.standardSeconds(1)
)
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
And It prints:
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
==========================
Event:: DetailViewEvent(...)
==========================
==========================
Event:: DetailViewEvent(...)
==========================
Window.into<FoodDetailViewEvent>(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(
Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
)
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
outputs
userIdDetailViewEvents Called
userIdDetailViewEvents Called
RowMapper:: test-02
RowMapper:: test-01
I think using AfterWatermark is correct but It is hanging on somewhere... I guess It's JdbcIO
You are correct, the standard configuration doesn't return results with unbounded collections.
As long as you have a functional window / trigger combination the trick is to set
.withOutputParallelization(false);
to the JdbcIO.<>readAll()
call as follows:
p.apply("Read from JDBC", JdbcIO.<>readAll()
.withDataSourceConfiguration(getConfiguration())
.withQuery(getStreamingQuery())
.withParameterSetter((JdbcIO.PreparedStatementSetter<String>) (element, preparedStatement) -> {
// Prepare statement here.
})
.withRowMapper((JdbcIO.RowMapper<>) results -> {
// Map results here.
}).withOutputParallelization(false));
I struggled with this for hours but came across the code in this article and it finally worked.
I used
Window.into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1))
)
.discardingFiredPanes())
for my trigger to process one element at a time.