Search code examples
jsonscalaapache-kafkasbtapache-flink

I'm getting unexpected error in JSON file


While consuming JSON file from Kafka in flink I'm getting this error:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (byte[])"[{"; line: 1, column: 2]) at [Source: (byte[])"[{"; line: 1, column: 5]

This is my JSON File:

[{
        "locationID": "ASK",
        "temp": 35
    },
    {
        "locationID": "BC",
        "temp": 45
    },
    {
        "locationID":"CHD",
        "temp": 55
    },
    {
        "locationID": "RAJ",
        "temp": 65
    },
    {
        "locationID": "EGY",
        "temp": 55
}]

This is the code :

Why I'm not able to make a keyed stream out of this


Solution

  • As far as i know you always need to input one-line json strings. So in your example use this:

    [{"locationID": "ASK", "temp": 35}, {"locationID": "BC", "temp": 45}, {"locationID": "CHD", "temp": 55}, {"locationID": "RAJ", "temp": 65}, {"locationID": "EGY", "temp": 55}]

    For a more generic approach you may use something like the following to reformat your multiline json strings:

    .selectExpr("REPLACE(CAST(value as string), '\n', '') as json")