Search code examples
apache-kafkaftpapache-kafka-connectexactly-once

Kafka distributed connect produce duplicated message


Operation Environment


  • Three servers
  • Three Kafka broker, connect, schema-registry (confluent-7.1.0)
  • One ftp connector for test (3 tasks)

Problem


  • Connect produce duplicated message. However, I hope that the ftp connector to issue one message per file.

Distributed connect produce same message three times (one message for each connect task)

  • Below is log when connector task produce message.
  • This log is printed each connect process log
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] poll (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:77)
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] connect 10.0.0.138:None (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:294)
[2022-06-26 15:23:12,862] INFO [ftp-test-conn|task-0] successfully connected to the ftp server and logged in (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:311)
[2022-06-26 15:23:12,863] INFO [ftp-test-conn|task-0] passive we are (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:318)
[2022-06-26 15:23:12,870] INFO [ftp-test-conn|task-0] Found 4 items in /home/smheo/ftp-dir/* (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:245)
[2022-06-26 15:23:12,877] INFO [ftp-test-conn|task-0] meta store storage HASN'T /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore:48)
[2022-06-26 15:23:12,878] INFO [ftp-test-conn|task-0] fetching /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:102)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] fetched /home/smheo/ftp-dir/msg-4, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:218)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] dump entire /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:219)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] got some fileChanges: /home/smheo/ftp-dir/msg-4, offset = -1 (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:96)

Consumer consume same message

(base) ubuntu@ubuntu:~/distributed-pipeline/confluent-7.1.0$ ./bin/kafka-console-consumer --bootstrap-server <BROKER_IP>:9092 --topic default-topic-1

hello

hello

hello

FTP connector

{
    "ftp-test-conn": {
        "info": {
            "name": "ftp-test-conn",
            "config": {
                "connector.class": "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector",
                "connect.ftp.address": "<FTP HOST IP>",
                "connect.ftp.keystyle": "string",
                "compression.type": "gzip",
                "connect.ftp.user": "ftpusername",
                "connect.ftp.refresh": "PT1M",
                "tasks.max": "3",
                "connect.ftp.file.maxage": "P7D",
                "name": "ftp-test-conn",
                "connect.ftp.monitor.update": "/home/username/ftp-dir/:default-topic-1",
                "connect.ftp.timeout": "3000000",
                "connect.ftp.password": "<PASSWORD>"
            },
            "tasks": [
                {
                    "connector": "ftp-test-conn",
                    "task": 0
                },
                {
                    "connector": "ftp-test-conn",
                    "task": 1
                },
                {
                    "connector": "ftp-test-conn",
                    "task": 2
                }
            ],
            "type": "source"
        },
        "status": {
            "name": "ftp-test-conn",
            "connector": {
                "state": "RUNNING",
                "worker_id": "<BROKER 1 IP>:8083"
            },
            "tasks": [
                {
                    "id": 0,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 1 IP>:8083"
                },
                {
                    "id": 1,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 2 IP>:8083"
                },
                {
                    "id": 2,
                    "state": "RUNNING",
                    "worker_id": "<BROKER 3 IP>:8083"
                }
            ],
            "type": "source"
        }
    }
}

Solution

  • Each task is most-likely reading the same file. Try only setting tasks.max=1. More specifically, there is no filesystem locking between FTP clients (each task starts its own connection), so you would only be limited to one reader task.

    Look closer at the logs, you can see task ID in [ftp-test-conn|task-0]

    Also, it's not recommended to run Connect on the same hosts as the brokers.