Search code examples
pythonpandasapache-sparkpysparkbinaryfiles

Reading binary file in PySpark


I have a binary file that I can read with numpy and pandas using:

dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])

df = pandas.from_array(
    numpy.fromfile(file, dtype=dt),
    columns=data.dtype.names)
)

I want to use PySpark instead without first creating a pandas dataframe as the file may be bigger than memory.

I saw that one recommended way was:

df = spark.read.format("binaryFile").load(file)
df.printSchema()
df.show()

But this doesn't allow me to specify the types of each column. Also, even with that test file I got an java.lang.OutOfMemoryError.

So now I'm trying sto load it to an RDD:

rdd = spark.sparkContext.binaryFiles(file)

And then apply a map as suggested here with Scala:

import java.nio.ByteBuffer

val result = YourRDD.map(x=>(ByteBuffer.wrap(x.take(4)).getInt,
             ByteBuffer.wrap(x.drop(4).take(2)).getShort,
             ByteBuffer.wrap(x.drop(6)).getLong))

But I am having trouble getting it working. For example, when I try rdd.first() I get the whole file. Here is what I tried:

rdd = spark.sparkContext.binaryFiles(file)

def func1(x):
    
    dt = numpy.dtype([('time', numpy.int64), ('e', numpy.float32), ('id', numpy.int32)])

    df = pandas.DataFrame(
        numpy.frombuffer(x, dtype=dt),
        columns=dt.names
    )

    return (df.col1,df.col2,df.col3)

result = rdd.mapValues(lambda x: func1(x))

result.first()

But this gives me a single entry with the full column:

('file',
 (0          2317613314222
                 ...      
  4026940    7317606063913
  Name: col1, Length: 4026941, dtype: int64,
  0          1.551823
               ...   
  4026940    2.379845
  Name: col2, Length: 4026941, dtype: float32,
  0             556
              ...  
  4026940    131336
  Name: col3, Length: 4026941, dtype: int32))

How can I load this file?

EDIT: Small excerpt of the file:

with open(file, mode="rb") as open_file:
    contents = open_file.readlines()
    
contents[0:5]

Results in:

[b'\xae\xb0\x84\x9c\x1b\x02\x00\x00 \xa2\xc6?,\x02\x00\x00\x0cB\x95\x9c\x1b\x02\x00\x00\xe0a\x9a?\x19\x02\x02\x00\x0f\xf7\xa4\x9c\x1b\x02\x00\x00`\xe9\x82?0\x03\x02\x00\x96@\x03\x9d\x1b\x02\x00\x00\xd0H\x05@;\x02\x02\x00\xd5\n',
 b'\n',
 b'\x9d\x1b\x02\x00\x00\x00^\xa1?\x0f\x01\x00\x00nq,\x9d\x1b\x02\x00\x00\xe0\x89\xad?\xae\x03\x02\x00F\x8e\xb6\x9d\x1b\x02\x00\x00@U\xd1?<\x03\x02\x00\xc3_\xfa\x9d\x1b\x02\x00\x00@}\x87?)\x02\x02\x00\xac\x92K\x9e\x1b\x02\x00\x00P/\x1f@\n',
 b"\x02\x04\x00\x07Q\x9a\x9e\x1b\x02\x00\x00PI\x04@,\x01\x02\x00\x04-\xb2\x9e\x1b\x02\x00\x00\x80\xdc\xf0?\x1d\x00\x04\x00\x0cw\xbd\x9e\x1b\x02\x00\x00\xa0-\xef?\x0c\x02\x02\x00\xb0\x86\xcf\x9e\x1b\x02\x00\x00 \xc2\xb4?,\x02\x00\x00\x12\x03\x1e\x9f\x1b\x02\x00\x00\x80\xb6\x85?)\x02\x02\x00E\xc9w\x9f\x1b\x02\x00\x000\xf3\x03@\x13\x00\x04\x00P\x1b\x91\x9f\x1b\x02\x00\x00\x00\xea\x06@%\x00\x00\x00\x9b:\x9c\x9f\x1b\x02\x00\x00\xe0T\x0b@\x06\x03\x00\x00\x9b\x9f\xa4\x9f\x1b\x02\x00\x00\xc0/\xf4?\x06\x03\x00\x00Z\xcb\xb8\x9f\x1b\x02\x00\x00\x00A\xe1?!\x02\x02\x00\xbcJ\xbd\x9f\x1b\x02\x00\x00\xe0\xc9\xd2?!\x02\x04\x00\x06]\xd0\x9f\x1b\x02\x00\x00`D\xda?\x1d\x00\x04\x00hB\xde\x9f\x1b\x02\x00\x00\xe0\x10\xff?\x1d\x01\x02\x00\x9fi0\xa0\x1b\x02\x00\x00\xa0f\xec?\x86\x03\x00\x00\xf5Ws\xa0\x1b\x02\x00\x00 \xd5\xca?\x1d\x00\x04\x00L\xa0\x8d\xa0\x1b\x02\x00\x00\xc0#\xda?\x1d\x00\x04\x00|<,\xa1\x1b\x02\x00\x00 \xbc\xd1?\x1d\x00\x04\x00\x8b\xfb2\xa1\x1b\x02\x00\x00\xa0\xbf\xcb?\x08\x02\x02\x00d\xd2X\xa1\x1b\x02\x00\x00 \xc6\xb4?5\x00\x04\x00\xae\x1fc\xa1\x1b\x02\x00\x00@\x07\x90?1\x03\x02\x00\xf3\x80g\xa1\x1b\x02\x00\x00`\xbd\xde?4\x00\x04\x00g\x1dm\xa1\x1b\x02\x00\x00@7\x96?\x98\x03\x00\x00\xb8@|\xa1\x1b\x02\x00\x00PK\x11@\x06\x03\x00\x00\xedj\x83\xa1\x1b\x02\x00\x00\xc0\x11\xdd?,\x02\x00\x00\xb1\xbd\x8e\xa1\x1b\x02\x00\x00\xa0\xc7\xc5?\r\x02\x02\x00\xbd\x0f\xba\xa1\x1b\x02\x00\x00 \xe3\xc1?\x1f\x01\x02\x00\xf5\xa6\xf5\xa1\x1b\x02\x00\x00\x80\xdf\xcd?\x06\x01\x00\x00'\xb5 \xa2\x1b\x02\x00\x00\x00\x02\xb6?\x1d\x00\x04\x00\xfas/\xa2\x1b\x02\x00\x00\xc0\xb2\xbb?\x98\x03\x02\x00=\xaan\xa2\x1b\x02\x00\x00`\xaf\xe8?\x08\x02\x02\x00\xa2\x83\x8f\xa2\x1b\x02\x00\x00\x00\x02\xcd?\x1d\x00\x04\x00\xb2\xce\xcb\xa2\x1b\x02\x00\x00`\x9e\xc1?\x1a\x03\x00\x00\x95\x9f\xef\xa2\x1b\x02\x00\x00\xe0\xa4\x8c?)\x02\x02\x005\x86\xfa\xa2\x1b\x02\x00\x00 \x86\xc8?\x98\x03\x02\x00bH\x12\xa3\x1b\x02\x00\x00\xf0\x1b\x1e@\x8c\x02\x02\x00\xa6\xfa\x1b\xa3\x1b\x02\x00\x00\xe0\n",
 b'\xaf?\x1d\x00\x04\x00\xfb\xcd3\xa3\x1b\x02\x00\x00`\xd2\xde?\x84\x03\x00\x00\x81\xcaQ\xa3\x1b\x02\x00\x00 \xc0\xdb?8\x01\x02\x00t\x01\x9a\xa3\x1b\x02\x00\x00\x803\xf8?#\x01\x02\x00@\xdb\xa8\xa3\x1b\x02\x00\x00@k\x02@\x84\x03\x00\x00r\x8e&\xa4\x1b\x02\x00\x00\xe0\x96\xc8?\x1d\x01\x02\x00\xa3\x05?\xa4\x1b\x02\x00\x00\xa0v\xd2?\x1d\x00\x04\x00E\xc3\x8c\xa4\x1b\x02\x00\x000#\x0c@\x02\x01\x02\x00\xf3n\x9f\xa4\x1b\x02\x00\x00\xf0\x06\x13@8\x01\x02\x00\xac<\xc5\xa4\x1b\x02\x00\x00\x80A\x9f?\x8a\x03\x00\x00}\xfc\xe5\xa4\x1b\x02\x00\x00\x80\x00\xc4?\x19\x02\x02\x00\x126\xff\xa4\x1b\x02\x00\x00 \x1d\xac?\n']

And the head(5) of the dataframe:

            col1      col2    col3
0  2317613314222  1.551823     556
1  2317614400012  1.206112  131609
2  2317615429391  1.022747  131888
3  2317621608598  2.082569  131643
4  2317622053589  1.260681     271

Solution

  • One problem was that spark.sparkContext.binaryFiles() is meant for several files. In this case, spark.sparkContext.binaryRecord() should be used.

    For my case:

    rdd = spark.sparkContext.binaryRecords(file,16)
    
    def func1(x):
    
        dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])
    
        df = pandas.DataFrame(
            numpy.frombuffer(x, dtype=dt),
            columns=dt.names
        )
    
        return df.to_dict("index")[0]
    
    result = rdd.map(func1)
    
    columns = types.StructType([
             types.StructField('col1', types.LongType(), True),
             types.StructField('col2', types.FloatType(), True),
             types.StructField('col3', types.IntegerType(), True)
             ])
    
    df = result.toDF(schema=columns)
    

    The module struct can also be used instead of numpy.