Search code examples
apache-kafkaapache-kafka-connectoracle-cloud-infrastructure

Why am I getting this error while creating source connector through a POST request


I am trying to create a source connector for my oci stream in my kafka connect cluster which is run in distributed mode in my local machine. But trying to do so using a POST request yields the following error

{"error_code":500,"message":"Unrecognized field \"connector.class\" (class org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest), not marked as ignorable (2 known properties: \"config\", \"name\"])\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 142] (through reference chain: org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest[\"connector.class\"])"}

It says connector.class unrecognised even though its a required property for creating the connector

I installed apache kafka in my system and started the zookeeper server and kafka server using the scripts in the bin directory of kafka. Then kafka connect was run in connect-distributed mode using the following connect-distributed.properties file

bootstrap.servers={_}.streaming.{region}.oci.oraclecloud.com:9092
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="..." password="...";
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
group.id=my-example-group
config.storage.topic={harness-ocid}-config
offset.storage.topic={harness-ocid}-offset
status.storage.topic={harness-ocid}-status

The json file I have used to create the connector is

{
  "name": "dupe-check-source-connector",
  "connector.class":"com.oracle.oci.kafka.connect.source.OciStreamingSourceConnector",
  "tasks.max":"1"
}

Solution

  • There are 2 endpoints that can be used to create a connector and each use a slightly different payload.

    If you want to use the POST /connectors endpoint, your payload has to be in this format:

    {
      "name": "dupe-check-source-connector",
      "config": {
         "connector.class":"com.oracle.oci.kafka.connect.source.OciStreamingSourceConnector",
         "tasks.max":"1"
      }
    }
    

    The format you used is valid on the PUT /connectors/<CONNECTOR_NAME>/config endpoint.

    I personally recommend using the PUT endpoint as it can also be used to reconfigure a connector once it's already created (and also the payload is a bit simpler without the extra config field).