Long story short: Druid fails to ingest corrupted kafka record. How can I skip it?
I configured Druid to ingest data from kafka with schema_registry successfully. Everything works as expected until Druid hits, at some offset, a bad record which doesn't have valid data because it's corrupted. If I look into task logs, I can find the following exception:
2022-03-14T13:46:03,647 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception while running task.
org.apache.druid.java.util.common.RE: Failed to get Avro schema: 1786271608
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:144) ~[?:?]
at org.apache.druid.data.input.avro.AvroStreamReader.intermediateRowIterator(AvroStreamReader.java:69) ~[?:?]
at org.apache.druid.data.input.IntermediateRowParsingReader.read(IntermediateRowParsingReader.java:44) ~[druid-core-0.22.1.jar:0.22.1]
at org.apache.druid.segment.transform.TransformingInputEntityReader.read(TransformingInputEntityReader.java:43) ~[druid-processing-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SettableByteEntityReader.read(SettableByteEntityReader.java:78) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.StreamChunkParser.parseWithInputFormat(StreamChunkParser.java:135) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.StreamChunkParser.parse(StreamChunkParser.java:104) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:620) ~[druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:263) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:146) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:471) [druid-indexing-service-0.22.1.jar:0.22.1]
at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:443) [druid-indexing-service-0.22.1.jar:0.22.1]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:292) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659) ~[?:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291) ~[?:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276) ~[?:?]
at org.apache.druid.data.input.avro.SchemaRegistryBasedAvroBytesDecoder.parse(SchemaRegistryBasedAvroBytesDecoder.java:140) ~[?:?]
... 15 more
What the exception says is true, there is a record at some offsete where the bytes for the id of the schema are 1786271608, which is an invalid schema in the registry, so that throws the exception correctly. What I want is to skip this corrupt record and ignore it, so Druid can keep ingesting the next records. Is it possible to configure Druid that way?
I have found in the tunning config a variable called maxParseExceptions that looks like what I'm looking for, but it is already set to a big number (2147483647, which was set by default):
maxParseExceptions: The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if reportParseExceptions is set.
Here is how my spec looks like:
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "my.data.source.name",
"timestampSpec": {
"column": "lastOperationTime",
"format": "millis",
"missingValue": null
},
"dimensionsSpec": {
"dimensions": [
... (ommited) ...
],
"dimensionExclusions": [
"__time",
"lastOperationTime"
]
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": false,
"intervals": []
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"ioConfig": {
"topic": "my.topic.name",
"inputFormat": {
"type": "avro_stream",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
... (ommited) ...
]
},
"avroBytesDecoder": {
"type": "schema_registry",
"url": "http://schema_registry_host:port",
"capacity": 2147483647,
"urls": null,
"config": null,
"headers": null
},
"binaryAsString": true,
"extractUnionsByType": false
},
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT3600S",
"consumerProperties": {
"bootstrap.servers": "bootstrap servers ips and ports"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": true,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"stream": "my.topic.name",
"useEarliestSequenceNumber": true,
"autoscalerConfig": null,
"type": "kafka"
},
"tuningConfig": {
"type": "kafka",
"appendableIndexSpec": {
"type": "onheap"
},
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"skipBytesInMemoryOverheadCheck": false,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"basePersistDirectory": "/opt/druid/var/tmp/druid-realtime-persist349032434046494455",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring",
"compressRunOnSerialization": true
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs",
"segmentLoader": null
},
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": true,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 10,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
}
}
EDIT: Finally, it was a bug. See my answer below.
Finally, it was a bug in druid, corrected in version 0.23.
See the full story in my post in druid forum:
I would like to thank druid community for their fast response.