$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning
"Can we write to a topic that does not exist?"
Consumer (kafka-rest API on localhost:8082
request to http://localhost:8082/consumers/rested
Request Body:
"format": "json",
"auto.offset.reset": "earliest",
"auto.commit.enable": "false"
Response Body:
"instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",
"base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"
to http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription
using Headers:
Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json
and Request Body:
"topics": [
returns a Response of 204 No Content
request to http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records
using the Headers:
Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json
returns the Response:
"error_code": 50002,
"message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
How can we fix this issue and ensure that we receive the data?
Exception (on Kafka)
The running Kafka Rest Proxy server log has the following exception:
rest-proxy | [2018-12-31 03:09:27,232] INFO - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211 341 (io.confluent.rest-utils.requests)
rest-proxy | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy | at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy | at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy | at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy | at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy | at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy | at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy | at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy | at java.lang.Thread.run(Thread.java:748)
Consumer-Groups CLI
I can view the consumer-group on the CLI but it has no active members:
$ kafka-consumer-groups --bootstrap-server broker:9092 --list
has the result:
However, when I attempt to retrieve the members
$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members
Consumer group 'rested' has no active members.
You need to wrap your key in double-quotes, Not because all keys need to be wrapped in quotes but with a JSON parser you need to make your key valid JSON and a string wrapped in double-quotes is valid JSON.
If you really need to process this message you would need to read it in a different format than JSON.
Long Answer
You have a record with a key that does not have quotes which makes the value invalid JSON so when the Jackson JSON parser tries to parse the key it's not valid JSON (which isn't clear from the error message but when it doesn't see a quote or a square or curly bracket it starts to assume it's a boolean or null).
You can see where it's grabbing they key and trying to decode it as JSON here
I was able to reproduce your error using this method
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
At this point I am able to read the record but when I add a key without the quotes I get the same error as you
./bin/kafka-console-producer \
--broker-list :9092 \
--topic testjsontopic \
--property parse.key=true \
--property key.separator="&"
Now when I call this code
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
Now I receive this error
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
Use this to read your topics keys also
./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning