Kafka Connect error: com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record

I am trying to ingest data from kafka into aerospike. What am I missing in the kafka message being sent?

I am sending below data into kafka for pushing into aerospike:

ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/ --topic phone --bootstrap-server localhost:9092

Kafka connect gives the below error:

[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
    at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
    at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
    at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put$2$2.invokeSuspend(AerospikeSinkTask.kt:220)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(
    at java.util.concurrent.ThreadPoolExecutor$
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552)

aerospike-kafka-inbound.yml file:

GNU nano 4.8 /home/ubuntu/ib-zipaerospikesink/aerospike-kafka-inbound-2.2.0/lib/etc/aerospike-kafka-inbound/aerospike-kafka-inbound.yml

# Change the configuration for your use case.
# Refer to
# for details.

# Map of Kafka topic name to its configuration.
  phone: # Kafka topic name.
    invalid-record: ignore # not Kill task on invalid record.
    mapping: # Config to convert Kafka record to Aerospike record.
      namespace: # Aerospike record namespace config.
        mode: static
        value: test
      set: # Aerospike record set config.
        mode: static
        value: t1
      key-field: # Aerospike record key config.
        source: key  # Use Kafka record key as the Aerospike record key.
      bins: # Aerospike record bins config.
        type: multi-bins
        # all-value-fields: true # Convert all values in Kafka record to Aerospike record bins.
            source: value-field
            field-name: firstName
    # The Aerospike cluster connection properties.
        port: 3000


  • It looks like you are not specifying a key when you are sending your kafka message. By default Kafka sends a null key and your config says to use the kafka key as the aerospike key. In order to send a kafka key you need to set parse.key to true and specify what your separator will be (in the kafka producer).

    see step 8 here

    kafka-console-producer \
      --topic orders \
      --bootstrap-server broker:9092 \
      --property parse.key=true \
      --property key.separator=":"

    The two properties tell the kafka producer to expect a key in your messages and a separator to tell the key from the value.

    In this example there are two records one with the key foo and the other with fun.


    This will result in those two records being written to aerospike with the primary keys matching the kafka keys foo and fun.