Search code examples
apache-kafkakafka-rest

JSONParseException reading data using Kafka rest API


KAFKA TOPIC (test3)

$ kafka-console-consumer --bootstrap-server broker:9092 --topic test3 --from-beginning

"Can we write to a topic that does not exist?"
"Can we write to a topic that does not exist?"
{"foo":"bar"}
["foo","bar"]
confluent
confluent
confluent
kafka
logs


0

0

Consumer (kafka-rest API on localhost:8082)

  1. Create a consumer POST request to http://localhost:8082/consumers/rested

Request Body:

 {
   "format": "json",
   "auto.offset.reset": "earliest",
   "auto.commit.enable": "false"
 }

Response Body:

{
   "instance_id": "rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff",

   "base_uri": "http://rest-proxy:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff"

}
  1. Create a subscription usihg POST to http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/subscription

using Headers:

Host: http://localhost:8082
Content-Type: application/vnd.kafka.v2+json

and Request Body:

{
    "topics": [
      "test3"
    ]
}

returns a Response of 204 No Content.

  1. Read records by making a GET request to http://localhost:8082/consumers/rested/instances/rest-consumer-dfa6ee0e-4f24-46dc-b0dc-dda3b80866ff/records

using the Headers:

Host: http://localhost:8082
Accept: application/vnd.kafka.json.v2+json

returns the Response:

{
    "error_code": 50002,
    "message": "Kafka error: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')\n at [Source: (byte[])\"key\"; line: 1, column: 7]"
}

How can we fix this issue and ensure that we receive the data?

Exception (on Kafka)

The running Kafka Rest Proxy server log has the following exception:

rest-proxy         | [2018-12-31 03:09:27,232] INFO 172.25.0.1 - - [31/Dec/2018:03:09:26 +0000] "GET /consumers/rest-consumer/instances/rest-consumer-8e49873e-13ce-46a5-be1f-0237a0369efe/records HTTP/1.1" 500 211  341 (io.confluent.rest-utils.requests)
rest-proxy         | [2018-12-31 03:09:27,235] ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@59611e28  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
rest-proxy         | org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         | Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')
rest-proxy         |  at [Source: (byte[])"key"; line: 1, column: 7]
rest-proxy         |    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
rest-proxy         |    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
rest-proxy         |    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
rest-proxy         |    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.deserialize(JsonKafkaConsumerState.java:79)
rest-proxy         |    at io.confluent.kafkarest.v2.JsonKafkaConsumerState.createConsumerRecord(JsonKafkaConsumerState.java:64)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.maybeAddRecord(KafkaConsumerReadTask.java:158)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.addRecords(KafkaConsumerReadTask.java:142)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerReadTask.doPartialRead(KafkaConsumerReadTask.java:99)
rest-proxy         |    at io.confluent.kafkarest.v2.KafkaConsumerManager$RunnableReadTask.run(KafkaConsumerManager.java:370)
rest-proxy         |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
rest-proxy         |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
rest-proxy         |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
rest-proxy         |    at java.lang.Thread.run(Thread.java:748)

Consumer-Groups CLI

I can view the consumer-group on the CLI but it has no active members:

$ kafka-consumer-groups --bootstrap-server broker:9092 --list

has the result:

console-consumer-60695
console-consumer-62259
console-consumer-19307
console-consumer-47906
console-consumer-40838
rested

However, when I attempt to retrieve the members:

$ kafka-consumer-groups --bootstrap-server localhost:29092 --group rest-consumer --describe --members

Consumer group 'rested' has no active members.

Solution

  • TL;DR

    You need to wrap your key in double-quotes, Not because all keys need to be wrapped in quotes but with a JSON parser you need to make your key valid JSON and a string wrapped in double-quotes is valid JSON.

    If you really need to process this message you would need to read it in a different format than JSON.

    Long Answer

    You have a record with a key that does not have quotes which makes the value invalid JSON so when the Jackson JSON parser tries to parse the key it's not valid JSON (which isn't clear from the error message but when it doesn't see a quote or a square or curly bracket it starts to assume it's a boolean or null).

    You can see where it's grabbing they key and trying to decode it as JSON here

    https://github.com/confluentinc/kafka-rest/blob/a9b7cc527a26fdf09db27d148f2e71bfe3d87a6a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/JsonKafkaConsumerState.java#L69

    I was able to reproduce your error using this method

    curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
          --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "latest"}' \
          http://localhost:8082/consumers/my_json_consumer
    
    curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["testjsontopic"]}' \
     http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
    
    
    ./bin/kafka-console-producer \
      --broker-list :9092 \
      --topic testjsontopic \
      --property parse.key=true \
      --property key.separator="&"
    
    >"key"&{"foo":"bar"}
    
    *Ctrl-C
    
    curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
          http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
    

    At this point I am able to read the record but when I add a key without the quotes I get the same error as you

    ./bin/kafka-console-producer \
      --broker-list :9092 \
      --topic testjsontopic \
      --property parse.key=true \
      --property key.separator="&"
    
    >key&{"foo":"bar"}
    

    Now when I call this code

    curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
          http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
    

    Now I receive this error

    com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'key': was expecting ('true', 'false' or 'null')

    Use this to read your topics keys also

    ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testjsontopic --property print.key=true --from-beginning