windowsapache-kafkadocker-composeapache-kafka-connect

Unable to create kafka connect source & sink connector for CSV to Kafka using SpoolDirCsvSourceConnector in Windows 11


I want to upload my testdata.csv to Kafka topic using this repo. Now curl commands are not working to create custom connector curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config -d @sample.json

This is my docker compose

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.2.0
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use broker:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - broker
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092

  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.2.0
    container_name: kafka-connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
    #  ---------------
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    # If you want to use the Confluent Hub installer to d/l component, but make them available
    # when running this offline, spin up the stack once and then run : 
    #   docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
    volumes:
      - $PWD/data:/data
    # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:2.0.0
        confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.60
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.21.0
    container_name: ksqldb
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807
      KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.2.0
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT_CLUSTER: 'kafka-connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_KSQL_KSQLDB_URL: "http://ksqldb:8088"
      # The advertised URL needs to be the URL on which the browser
      #  can access the KSQL server (e.g. http://localhost:8088/info)
      CONTROL_CENTER_KSQL_KSQLDB_ADVERTISED_URL: "http://localhost:8088"
      # -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
      # Useful settings for development/laptop use - modify as needed for Prod
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
      CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 104857600
    command:
      - bash
      - -c 
      - |
        echo "Waiting two minutes for Kafka brokers to start and 
               necessary topics to be available"
        sleep 120  
        /etc/confluent/docker/run

# Other systems
  mysql:
    # *-----------------------------*
    # To connect to the DB:
    #   docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
    # *-----------------------------*
    image: mysql:8.0
    container_name: mysql
    ports:
      - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
    volumes:
     - ${PWD}/data/mysql:/docker-entrypoint-initdb.d
     - ${PWD}/data:/data

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1
    container_name: elasticsearch
    hostname: elasticsearch
    ports:
      - 9200:9200
    environment:
      xpack.security.enabled: "false"
      ES_JAVA_OPTS: "-Xms1g -Xmx1g"
      discovery.type: "single-node"

  kibana:
    image: docker.elastic.co/kibana/kibana:7.10.1
    container_name: kibana
    hostname: kibana
    depends_on:
      - elasticsearch
    ports:
      - 5601:5601
    environment:
      xpack.security.enabled: "false"
      discovery.type: "single-node"
    command:
      - bash
      - -c
      - |
        /usr/local/bin/kibana-docker &
        echo "Waiting for Kibana to be ready ⏳"
        while [ $$(curl -H 'kbn-xsrf: true' -s -o /dev/null -w %{http_code} http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) -ne 200 ] ; do 
          echo -e "\t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) " (waiting for 200)"
          sleep 5  
        done

        echo -e "\t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*)

        echo -e "\n--\n+> Pre-creating index pattern"
        curl -s -XPOST 'http://localhost:5601/api/saved_objects/index-pattern/mysql-debezium-asgard.demo.orders' \
          -H 'kbn-xsrf: nevergonnagiveyouup' \
          -H 'Content-Type: application/json' \
          -d '{"attributes":{"title":"mysql-debezium-asgard.demo.orders","timeFieldName":"CREATE_TS"}}'

        echo -e "\n--\n+> Setting the index pattern as default"
        curl -s -XPOST 'http://localhost:5601/api/kibana/settings' \
          -H 'kbn-xsrf: nevergonnagiveyouup' \
          -H 'content-type: application/json' \
          -d '{"changes":{"defaultIndex":"mysql-debezium-asgard.demo.orders"}}'

        echo -e "\n--\n+> Opt out of Kibana telemetry"
        curl 'http://localhost:5601/api/telemetry/v2/optIn' \
            -H 'kbn-xsrf: nevergonnagiveyouup' \
            -H 'content-type: application/json' \
            -H 'accept: application/json' \
            --data-binary '{"enabled":false}' \
            --compressed

        sleep infinity

  neo4j:
    image: neo4j:4.2.3
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

and this is my sample.json

{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "orders_spooldir_00",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.enabled":"true",
        "csv.first.row.as.header":"true"
        }

And output of

C:\Users\vinee\Downloads\kafka-connect\demo-scene-master\kafka-connect-zero-to-hero>curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config  -d @csv-connector.json
HTTP/1.1 400 Bad Request
Date: Sat, 04 Nov 2023 10:14:54 GMT
Content-Type: application/json
Content-Length: 381
Server: Jetty(9.4.40.v20210413)

{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value '/data/unprocessed' must be a directory. for configuration input.path\nInvalid value '/data/error' must be a directory. for configuration error.path\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

or

{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value File 'C:/Users/vinee/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero/data/unprocessed' is not an absolute path. for configuration input.path\nInvalid value File 'C:/Users/vinee/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero/data/error' is not an absolute path. for configuration error.path\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

I have tried changing to everything that came to my mind for getting valid values of "input.path","output.path" and "error.path" but output doesn't change except what is in quotes after Invalid value

C:\Users\vinee\Downloads\kafka-connect\demo-scene-master\kafka-connect-zero-to-hero\data\unprocessed 
C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero//data//unprocessed 
dir://C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero//data//unprocessed
/c:/Users/vinee/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero/data/unprocessed
.\data\unprocessed

This is the data folder

enter image description here


Solution

  • Changing PWD to C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero in docker-compose.yaml and C://Users//vinee//Downloads//kafka-connect//demo-scene-master//kafka-connect-zero-to-hero//data//unprocessed to /data/unprocessed worked and I got status 201. Thanks @OneCricketeer for your assistance.