I’ve been mulling over how to solve a given problem in Beam and thought I’d reach out to a larger audience for some advice. At present things seem to be working sparsely and I was curious if someone could provide a sounding-board to see if this workflow makes sense.
The primary high-level goal is to read records from Kafka that may be out of order and need to be windowed in Event Time according to another property found on the records and eventually emitting the contents of those windows and writing them out to GCS.
The current pipeline looks roughly like the following:
val partitionedEvents = pipeline
.apply("Read Events from Kafka",
KafkaIO
.read<String, Log>()
.withBootstrapServers(options.brokerUrl)
.withTopic(options.incomingEventsTopic)
.withKeyDeserializer(StringDeserializer::class.java)
.withValueDeserializerAndCoder(
SpecificAvroDeserializer<Log>()::class.java,
AvroCoder.of(Log::class.java)
)
.withReadCommitted()
.commitOffsetsInFinalize()
// Set the watermark to use a specific field for event time
.withTimestampPolicyFactory { _, previousWatermark -> WatermarkPolicy(previousWatermark) }
.withConsumerConfigUpdates(
ImmutableMap.of<String, Any?>(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
"schema.registry.url", options.schemaRegistryUrl
)
).withoutMetadata()
)
.apply("Logging Incoming Logs", ParDo.of(Events.log()))
.apply("Rekey Logs by Tenant", ParDo.of(Events.key()))
.apply("Partition Logs by Source",
// This is a custom function that will partition incoming records by a specific
// datasource field
Partition.of(dataSources.size, Events.partition<KV<String, Log>>(dataSources))
)
dataSources.forEach { dataSource ->
// Store a reference to the data source name to avoid serialization issues
val sourceName = dataSource.name
val tempDirectory = Directories.resolveTemporaryDirectory(options.output)
// Grab all of the events for this specific partition and apply the source-specific windowing
// strategies
partitionedEvents[dataSource.partition]
.apply(
"Building Windows for $sourceName",
SourceSpecificWindow.of<KV<String, Log>>(dataSource)
)
.apply("Group Windowed Logs by Key for $sourceName", GroupByKey.create())
.apply("Log Events After Windowing for $sourceName", ParDo.of(Events.logAfterWindowing()))
.apply(
"Writing Windowed Logs to Files for $sourceName",
FileIO.writeDynamic<String, KV<String, MutableIterable<Log>>>()
.withNumShards(1)
.by { row -> "${row.key}/${sourceName}" }
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(SerializableFunction { logs -> Files.stringify(logs.value) }), TextIO.sink())
.to(options.output)
.withNaming { partition -> Files.name(partition)}
.withTempDirectory(tempDirectory)
)
}
In a simpler, bulleted form, it might look like this:
The problem is that the incoming Kafka topic contains out-of-order data across multiple tenants (e.g. events for tenant1 might be streaming in now, but then a few minutes later you’ll get them for tenant2 in the same partition, etc.). This would cause the watermark to bounce back and forth in time as each incoming record would not be guaranteed to continually increase, which sounds like it would be a problem, but I'm not certain. It certainly seems that while data is flowing through, some files are simply not being emitted at all.
The custom windowing function is extremely simple and was aimed to emit a single window once the allowed lateness and windowing duration has elapsed:
object SourceSpecificWindow {
fun <T> of(dataSource: DataSource): Window<T> {
return Window.into<T>(FixedWindows.of(dataSource.windowDuration()))
.triggering(Never.ever())
.withAllowedLateness(dataSource.allowedLateness(), Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes()
}
}
However, it seemed inconsistent since we'd see logging come out after the closing of the window, but not necessarily files being written out to GCS.
Does anything seem blatantly wrong or incorrect with this approach? Since the data can come in out of order within the source (i.e. right now, 2 hours ago, 5 minutes from now) and covers data across multiple tenants, but the aim is try and ensure that one tenant that keeps up to date won't drown out tenants that might come in the past.
Would we potentially need another Beam application or something to "split" this single stream of events into sub-streams that are each processed independently (so that each watermark processes on their own)? Is that where a SplittableDoFn
would come in? Since I'm running on the SparkRunner
, which doesn't appear to support that - but it seems as though it'd be a valid use case.
Any advice would be greatly appreciated or even just another set of eyes. I'd be happy to provide any additional details that I could.
While this may not be the most helpful response, I'll be transparent as far as the end result. Eventually the logic required for this specific use-case extended far beyond the built-in capabilities of those in Apache Beam, primarily in the area around windowing/governance of time.
The solution that was landed on was to switch the preferred streaming technology from Apache Beam to Apache Flink, which as you might imagine was quite a leap. The stateful-centric nature of Flink allowed us to more easily handle our use cases, define custom eviction criteria (and ordering) around windowing, while losing a layer of abstraction over it.