Search code examples
apache-kafkaavroapache-kafka-connectconfluent-platformksqldb

How get the stream from kafka topic to elasticsearch with confluent?


I'm read data from machine and stream it as JSON to a kafka topic. I would like to read this topic and store the streamdata into elasticsearch with confluent.

My steps: 1. Create KSQL Streams to convert from JSON to AVRO

json stream:

 CREATE STREAM source_json_pressure 
 (
  timestamp BIGINT, 
  opcuaObject VARCHAR, 
  value DOUBLE
  ) 
 WITH (KAFKA_TOPIC='7d12h100mbpressure',
   VALUE_FORMAT='JSON');

avro stream:

 CREATE STREAM target_avro_pressure 
  WITH (
     KAFKA_TOPIC='7d12h100mbpressure_avro', 
     VALUE_FORMAT='AVRO'
  ) AS 
  SELECT * FROM source_json_pressure;

after this i get this avro stream:

 ksql> print "7d12h100mbpressure_avro";

 Format:AVRO
 23.04.19 19:29:58 MESZ,   jK?C, {"TIMESTAMP": 1556040449728, "OPCUAOBJECT": "DatLuDrUeb.EinDru", "VALUE": 7.42}

My elasticsearch.properties:

15 name=elasticsearch-sink
16 connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
17 tasks.max=1
18 topics=7d12h100mbpressure_avro
19 key.ignore=true
20 connection.url=http://localhost:9200
21 type.name=kafka-connect

After this i expect the stream in ES but i get the indices without the streamdata.

Where i make mistake?

ERRORS from confluent log connect:

 [2019-04-24 11:01:29,316] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Setting newly assigned partitions: 7d12h100mbpressure_avro-3, 7d12h100mbpressure_avro-2, 7d12h100mbpressure_avro-1, 7d12h100mbpressure_avro-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)
 [2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-3 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
 [2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-2 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
 [2019-04-24 11:01:29,327] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-1 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
 [2019-04-24 11:01:29,328] INFO [Consumer clientId=consumer-4, groupId=connect-elasticsearch-sink] Resetting offset for partition 7d12h100mbpressure_avro-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)
 [2019-04-24 11:01:29,667] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic 7d12h100mbpressure_avro to Avro:
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more 
 Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 92747
 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:145)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:90)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 [2019-04-24 11:01:29,668] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

My connect-avro-distributed.properties:

 # Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
   bootstrap.servers=localhost:9092

  key.converter=io.confluent.connect.avro.AvroConverter
  key.converter.schema.registry.url=http://localhost:8081
  value.converter=io.confluent.connect.avro.AvroConverter
  value.converter.schema.registry.url=http://localhost:8081

  config.storage.topic=connect-configs
  offset.storage.topic=connect-offsets
  status.storage.topic=connect-statuses

  config.storage.replication.factor=1
  offset.storage.replication.factor=1
  status.storage.replication.factor=1

 internal.key.converter=org.apache.kafka.connect.json.JsonConverter
 internal.value.converter=org.apache.kafka.connect.json.JsonConverter
 internal.key.converter.schemas.enable=false
 internal.value.converter.schemas.enable=false

Solution

  • You set key.ignore=true from the Elasticsink, however, that doesn't stop Connect from try to deserialize the record.

    When you just do confluent start, it'll always uses AvroConverter for both key and value converters.

    It's worth mentioning that, VALUE_FORMAT='AVRO' in KSQL only makes the value as Avro, I believe, not the key.

    One of those reasons might explain why you see one of

    • subject not found
    • schema not found
    • Error retrieving Avro schema for id

    To workaround this, in your elasticsearch.properties you can override key.converter to be something else like org.apache.kafka.connect.storage.StringConverter


    Also, rather than debugging with Connect+KSQL, I suggest using kafka-avro-console-consumer and including the --property print.key=true option to see if you get a similar error.