Search code examples
apache-kafkaapache-kafka-connectconfluent-platform

Confluent HttpSinkConnector is giving the error that Replication factor is 3 when it is mentioned as 1 everywhere


I am using Confluent HttpSinkConnector and following the steps mentioned in this page: [https://docs.confluent.io/current/connect/kafka-connect-http/index.html][1]

I have updated my connector sink properties file after checking all the properties details shown on Control center page. My final properties file is as follows:

name = HttpSink
connector.class = io.confluent.connect.http.HttpSinkConnector
tasks.max = 1
value.converter = org.apache.kafka.connect.storage.StringConverter
topics = http-messages
http.api.url = http://localhost:8080/api/messages
request.method = post
auth.type = none
reporter.result.topic.replication.factor = 1
reporter.result.topic.partitions = 1
reporter.bootstrap.servers = localhost:9092
confluent.topic.bootstrap.servers = localhost:9092
confluent.topic = http-messages
confluent.topic.replication.factor = 1

I have also checked that the "http-messages" topic is already created and its configuration is shown as follows on Control Center:

Overview     MessagesSchemaConfiguration
name     http-messages
partitions     1
compression.type     producer
leader.replication.throttled.replicas
message.downconversion.enable     true
min.insync.replicas      1
segment.jitter.ms      0
cleanup.policy      delete
flush.ms      9223372036854775807
follower.replication.throttled.replicas
segment.bytes      1073741824
retention.ms   604800000
flush.messages  9223372036854775807
message.format.version   2.5-IV0
file.delete.delay.ms   60000
max.compaction.lag.ms    9223372036854775807
max.message.bytes    1048588
min.compaction.lag.ms    0
message.timestamp.type
CreateTime
preallocate   false
min.cleanable.dirty.ratio   0.5
index.interval.bytes    4096
unclean.leader.election.enable    false
retention.bytes   -1
delete.retention.ms   86400000
segment.ms    604800000
message.timestamp.difference.max.ms     9223372036854775807
segment.index.bytes    10485760

However when I am trying to run the HttpSink-connect task, the task is getting failed with the following error message as found in the output of the status message received in the response of Rest request: curl -X GET localhost:8083/connectors/HttpSink/tasks/0/status

{"id":0,"state":"FAILED","worker_id":"127.0.0.1:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Unable to manage topics:\n\tat io.confluent.connect.reporter.ReporterAdminClient.handleExecutionException(ReporterAdminClient.java:109)\n\tat io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:57)\n\tat io.confluent.connect.reporter.Reporter.createDestinationTopicsIfNeeded(Reporter.java:433)\n\tat io.confluent.connect.reporter.Reporter.configure(Reporter.java:80)\n\tat io.confluent.connect.http.HttpSinkTask.start(HttpSinkTask.java:49)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)\n\tat io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:53)\n\t... 12 more\nCaused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.\n"}

Also, I am getting the following response in the response of the Rest request "curl -X GET localhost:8083/connectors/HttpSink/topics"

{"HttpSink":{"topics":[]}}

Please help me in solving this issue. [1]: https://docs.confluent.io/current/connect/kafka-connect-http/index.html


Solution

  • To overcome this exception, in addition to reporter.result.topic.replication.factor and confluent.topic.replication.factor, you'd want to set reporter.error.topic.replication.factor to 1 as well as per kafka-connect-http configuration properties - https://docs.confluent.io/current/connect/kafka-connect-http/connector_config.html

    "confluent.topic.replication.factor": 1,
    "reporter.result.topic.replication.factor": 1,
    "reporter.error.topic.replication.factor": 1