Search code examples
pythonjsonpyspark

Reading a multiple line JSON with pyspark


I can't manage to read a JSON file in Python with pyspark because it has multiple records with each variable on a different line.

Exemple :

{
  "id" : "id001",
  "name" : "NAME001",
  "firstname" : "FIRSTNAME001"
}
{
  "id" : "NNI002",
  "name" : "NAME002",
  "firstname" : "FIRSTNAME002"
}
{
  "id" : "NNI003",
  "name" : "NAME003",
  "firstname" : "FIRSTNAME003"
}

I want to load it as such :

+------------+------+-------+
|   firstname|    id|   name|
+------------+------+-------+
|FIRSTNAME001| id001|NAME001|
|FIRSTNAME002|NNI002|NAME002|
|FIRSTNAME003|NNI003|NAME003|
+------------+------+-------+

I got errors if I try spark.read.json("file.json").

And when I usespark.read.option("multiline","true").json("file.json") I got only the first record :

+------------+-----+-------+
|   firstname|   id|   name|
+------------+-----+-------+
|FIRSTNAME001|id001|NAME001|
+------------+-----+-------+

I can read it with spark.read.json("file.json") if I put every records in there own line :

{    "id" : "id001",    "name" : "NAME001",    "firstname" : "FIRSTNAME001"  }
{    "id" : "NNI002",    "name" : "NAME002",    "firstname" : "FIRSTNAME002"  }
{    "id" : "NNI003",    "name" : "NAME003",    "firstname" : "FIRSTNAME003"  }

But as I got 10M lines, it's not realy an option.

If any one got ideas to help me I would really appreciate.

Thanks a lot.


Solution

  • @Niveditha S and @blackhorse0101 pointing bad JSON formating made me search for a way to correct the formating on a very large file.

    Here is the code I used to correct the file in few seconds.

    (take note that I use nest_asyncio as I'm in a Jupyter Notebook, you can use only aiofiles for a python script)

    import aiofiles
    import nest_asyncio
    nest_asyncio.apply()
    
    async def process_file(input_file, output_file):
        async with aiofiles.open(input_file, mode='r') as f:
            content = await f.read()
    
        # Add ',' after '}' if there is none
        lines = content.split('\n')
        for i in range(len(lines) - 1):
            if '}' in lines[i] and lines[i].strip().endswith('}') and not lines[i+1].strip().startswith(','):
                lines[i] = lines[i].rstrip() + ','
        
        # Remove last ','
        last_line = lines[-2]
        if last_line.endswith('},'):
            lines[-2] = last_line.rstrip(',')
    
        content = '\n'.join(lines)
        
        content = '[' + content + ']'
        
        async with aiofiles.open(output_file, mode='w') as f:
            await f.write(content)
    
    input_file = 'file.json'
    output_file = 'file_corrected.json'
    
    await process_file(input_file, output_file)
    

    Then spark.read.option("multiline","true").json("file_corrected.json") to read the corrected file