Search code examples
apache-kafkakafka-topic

Inputing data in a Kakfa Topic from XML with FilePulse - ExplodeFilter


I'm creating a kakfa topic which comes from an xml and writes to the topic in avro format. I'm using the file pulse to do this, and in the documentation I saw the ExplodeFilter. I tried to configure according to the documentation, but it is not working. The connect docker console is giving the following error:

io.streamthoughts.kafka.connect.filepulse.data.DataException: leitura is not a valid field name
 at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.lookupField(TypedStruct.java:464)
 at io.streamthoughts.kafka.connect.filepulse.data.TypedStruct.get(TypedStruct.java:226)
 at io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter.apply(ExplodeFilter.java:66)
 at io.streamthoughts.kafka.connect.filepulse.filter.AbstractMergeRecordFilter.apply(AbstractMergeRecordFilter.java:51)
 at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline$FilterNode.apply(DefaultRecordFilterPipeline.java:159)
 at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:131)
 at io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline.apply(DefaultRecordFilterPipeline.java:99)
 at io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer.next(DefaultFileRecordsPollingConsumer.java:169)
 at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:131)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:272)
 at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:239)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Follow the yaml of docker-compose:

connect-file-pulse:
 image: streamthoughts/kafka-connect-file-pulse:latest
 container_name: connect
 depends_on:
 - cp-broker
 - cp-schema-registry
 ports:
 - "8083:8083"
 - "8001:8000"
 environment:
 CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
 CONNECT_REST_ADVERTISED_HOST_NAME: connect
 CONNECT_REST_PORT: 8083
 CONNECT_GROUP_ID: compose-connect-group
 CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
 CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
 CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
 CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
 CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
 CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/"
 CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
 CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR

Follow the input xml:

<?xml version="1.0" encoding="UTF-8"?>
 <playlists>
 <pai>test</pai>
 <leitura>
 <title>test</title>
 <artist>test</artist>
 <album>test</album>
 <duration>test</duration>
 </leitura>
 <leitura>
 <title>test2</title>
 <artist>test2</artist>
 <album>test2</album>
 <duration>test2</duration>
 </leitura>
 </playlists>

Follow the connector:

{
 "config":
 {
 "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
 "filters":"Explode",
 "filters.Explode.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExplodeFilter",
 "filters.Explode.source":"leitura",
 "force.array.on.fields": "leitura",
 "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
 "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
"fs.scan.directory.path":"/tmp/kafka-connect/examples/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
 "file.filter.regex.pattern":".*\\.xml$",
 "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
 "offset.strategy":"name",
 "topic":"LeituraRaw",
 "internal.kafka.reporter.id": "xml-config-start",
 "internal.kafka.reporter.bootstrap.servers": "broker:29092",
 "internal.kafka.reporter.topic":"LeituraRaw",
 "tasks.max": 1
 },
 "name": "xml-config"
 }

Solution

  • The error was due to the ExplodeFilter that did not support dot notation for selecting field. Now, this issue is fixed since Connect FilePulse v1.5.2