Search code examples
apache-kafkasftpapache-kafka-connectconfluent-platform

how to make confluent sftp connector repeatedly process a csv file


Good day,

I am trying to follow this link for the Confluent sftp connector: https://docs.confluent.io/kafka-connect-sftp/current/source-connector/index.html

The following is my sftp.json, basically nothing much different, but I just put in my local info:

{
  "name": "CsvSFTP",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
    "cleanup.policy":"MOVE",
    "behavior.on.error":"IGNORE",
    "input.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1",
    "error.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/error",
    "finished.path": "/home/meow/Workspace/confluentPlatform/confluent-7.0.1/finished",
    "input.file.pattern": "csv-sftp-source.csv",
    "sftp.username":"meow",
    "sftp.password":"password",
    "sftp.host":"localhost",
    "sftp.port":"22",
    "kafka.topic": "sftp-testing-topic",
    "csv.first.row.as.header": "true",
    "schema.generation.enabled": "true"
  }
}

After this, I run the following command to run the connector:

confluent local services connect connector load CsvSFTP --config sftp.json

Next, I upload the same csv file to the input folder. Yes, I saw the file disappear and being move to finished.path.

I use the following consumer command to check the data being push to topic:

[meow@localhost bin]$ ./kafka-avro-console-consumer     --bootstrap-server localhost:9092     --property schema.registry.url=http://localhost:8081     --topic sftp-testing-topic2     --from-beginning
{"id":{"string":"1"},"first_name":{"string":"Salmon"},"last_name":{"string":"Baitman"},"email":{"string":"[email protected]"},"gender":{"string":"Male"},"ip_address":{"string":"120.181.75.98"},"last_login":{"string":"2015-03-01T06:01:15Z"},"account_balance":{"string":"17462.66"},"country":{"string":"IT"},"favorite_color":{"string":"#f09bc0"}}
{"id":{"string":"2"},"first_name":{"string":"Debby"},"last_name":{"string":"Brea"},"email":{"string":"[email protected]"},"gender":{"string":"Female"},"ip_address":{"string":"153.239.187.49"},"last_login":{"string":"2018-10-21T12:27:12Z"},"account_balance":{"string":"14693.49"},"country":{"string":"CZ"},"favorite_color":{"string":"#73893a"}}

So far so good, everything is working fine now.

After this, I use back the csv file, and edit the first name from 'Salmon' to 'Salmon2'. And then I upload again the csv file, but at this time, the file not being processed. As I check the connector status, its running, even I check the connect.log, I only saw it printing no records were produced:

[2022-03-16 17:14:22,129] INFO [CsvSFTP|task-0|offsets] WorkerSourceTask{id=CsvSFTP2-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:503)

Then, I unload the connector and load it again, and I saw the file disappear again and move to finished.path. I expected the consumer will print another 2 line of record, which 1 of it contain my changes on the first name, which is "Salmon2", but it didnt, the consumer just remain as the same.

May I know is there any mistaken I did? Or this is the expected result?


Solution

  • This sounds like expected behavior. Source connectors (mostly) maintain state in an offsets topic. If it processed the file, then it tracks that it shouldn't do it again, if the connector ever restarted or is otherwise reloaded.

    You'll need to modify this, or change the name of the connector to make it be unique to "start over"