I am trying to ingest data from kafka into aerospike. What am I missing in the kafka message being sent?
I am sending below data into kafka for pushing into aerospike:
ubuntu@ubuntu-VirtualBox:/opt/kafka_2.13-2.8.1$ bin/kafka-console-producer.sh --topic phone --bootstrap-server localhost:9092
>{"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"name","type":"string","optional":true}]},"payload":{"name":"Anuj"}}
Kafka connect gives the below error:
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
[2021-12-13 21:33:34,747] ERROR failed to put record SinkRecord{kafkaOffset=13, timestampType=CreateTime} ConnectRecord{topic='phone', kafkaPartition=0, key=null, keySchema=null, value=Struct{name=Anuj}, valueSchema=Schema{STRUCT}, timestamp=1639411413702, headers=ConnectHeaders(headers=)} (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:288)
com.aerospike.connect.inbound.aerospike.exception.ConvertToAerospikeException: user key missing from record
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractUserKey(AerospikeRecordConverter.kt:131)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractKey(AerospikeRecordConverter.kt:68)
at com.aerospike.connect.inbound.converter.AerospikeRecordConverter.extractRecord(AerospikeRecordConverter.kt:41)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:69)
at com.aerospike.connect.kafka.inbound.KafkaInboundDefaultMessageTransformer.transform(KafkaInboundDefaultMessageTransformer.kt:25)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.applyTransform(AerospikeSinkTask.kt:341)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.toAerospikeOperation(AerospikeSinkTask.kt:315)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.putRecord(AerospikeSinkTask.kt:239)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask.access$putRecord(AerospikeSinkTask.kt:47)
at com.aerospike.connect.kafka.inbound.AerospikeSinkTask$put$2$2.invokeSuspend(AerospikeSinkTask.kt:220)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-12-13 21:33:35,458] INFO 1 errors for topic phone (com.aerospike.connect.kafka.inbound.AerospikeSinkTask:552)
aerospike-kafka-inbound.yml file:
GNU nano 4.8 /home/ubuntu/ib-zipaerospikesink/aerospike-kafka-inbound-2.2.0/lib/etc/aerospike-kafka-inbound/aerospike-kafka-inbound.yml
# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connect/streaming-to-asdb/from-kafka-to-asdb-overview.html
# for details.
# Map of Kafka topic name to its configuration.
topics:
phone: # Kafka topic name.
invalid-record: ignore # not Kill task on invalid record.
mapping: # Config to convert Kafka record to Aerospike record.
namespace: # Aerospike record namespace config.
mode: static
value: test
set: # Aerospike record set config.
mode: static
value: t1
key-field: # Aerospike record key config.
source: key # Use Kafka record key as the Aerospike record key.
bins: # Aerospike record bins config.
type: multi-bins
# all-value-fields: true # Convert all values in Kafka record to Aerospike record bins.
map:
name:
source: value-field
field-name: firstName
# The Aerospike cluster connection properties.
aerospike:
seeds:
- 127.0.0.1:
port: 3000
It looks like you are not specifying a key when you are sending your kafka message. By default Kafka sends a null key and your config says to use the kafka key as the aerospike key. In order to send a kafka key you need to set parse.key to true and specify what your separator will be (in the kafka producer).
see step 8 here
https://kafka-tutorials.confluent.io/kafka-console-consumer-producer-basics/kafka.html
kafka-console-producer \
--topic orders \
--bootstrap-server broker:9092 \
--property parse.key=true \
--property key.separator=":"
The two properties tell the kafka producer to expect a key in your messages and a separator to tell the key from the value.
In this example there are two records one with the key foo
and the other with fun
.
foo:bar
fun:programming
This will result in those two records being written to aerospike with the primary keys matching the kafka keys foo
and fun
.