Search code examples
apache-kafkaapache-kafka-connectaerospike

Kafka Connect error: com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record


I am trying to ingest data from kafka into aerospike. What am I missing in the kafka message being sent?

I am sending below data into kafka for pushing into aerospike:

ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/kafka-console-producer.sh --topic phone --bootstrap-server localhost:9092
>{"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"name","type":"string","optional":true}]},"payload":{"name":"Anuj"}}

Kafka connect gives the below error:

com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record

[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put$2$2.invokeSuspend(AerospikeSinkTask.kt:220)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552)

aerospike-kafka-inbound.yml file:

GNU nano 4.8 /home/ubuntu/ib-zipaerospikesink/aerospike-kafka-inbound-2.2.0/lib/etc/aerospike-kafka-inbound/aerospike-kafka-inbound.yml

# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connect/streaming-to-asdb/from-kafka-to-asdb-overview.html
# for details.

# Map of Kafka topic name to its configuration.
topics:
  phone: # Kafka topic name.
    invalid-record: ignore # not Kill task on invalid record.
    mapping: # Config to convert Kafka record to Aerospike record.
      namespace: # Aerospike record namespace config.
        mode: static
        value: test
      set: # Aerospike record set config.
        mode: static
        value: t1
      key-field: # Aerospike record key config.
        source: key  # Use Kafka record key as the Aerospike record key.
      bins: # Aerospike record bins config.
        type: multi-bins
        # all-value-fields: true # Convert all values in Kafka record to Aerospike record bins.
        map:
          name:
            source: value-field
            field-name: firstName
    # The Aerospike cluster connection properties.
aerospike:
  seeds:
    - 127.0.0.1:
        port: 3000

Solution

  • It looks like you are not specifying a key when you are sending your kafka message. By default Kafka sends a null key and your config says to use the kafka key as the aerospike key. In order to send a kafka key you need to set parse.key to true and specify what your separator will be (in the kafka producer).

    see step 8 here

    https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html

    kafka-console-producer \
      --topic orders \
      --bootstrap-server broker:9092 \
      --property parse.key=true \
      --property key.separator=":"
    

    The two properties tell the kafka producer to expect a key in your messages and a separator to tell the key from the value.

    In this example there are two records one with the key foo and the other with fun.

    foo:bar
    fun:programming
    

    This will result in those two records being written to aerospike with the primary keys matching the kafka keys foo and fun.