I have a binary file that I can read with numpy and pandas using:
dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])
df = pandas.from_array(
numpy.fromfile(file, dtype=dt),
columns=data.dtype.names)
)
I want to use PySpark instead without first creating a pandas dataframe as the file may be bigger than memory.
I saw that one recommended way was:
df = spark.read.format("binaryFile").load(file)
df.printSchema()
df.show()
But this doesn't allow me to specify the types of each column.
Also, even with that test file I got an java.lang.OutOfMemoryError
.
So now I'm trying sto load it to an RDD:
rdd = spark.sparkContext.binaryFiles(file)
And then apply a map as suggested here with Scala:
import java.nio.ByteBuffer
val result = YourRDD.map(x=>(ByteBuffer.wrap(x.take(4)).getInt,
ByteBuffer.wrap(x.drop(4).take(2)).getShort,
ByteBuffer.wrap(x.drop(6)).getLong))
But I am having trouble getting it working.
For example, when I try rdd.first()
I get the whole file.
Here is what I tried:
rdd = spark.sparkContext.binaryFiles(file)
def func1(x):
dt = numpy.dtype([('time', numpy.int64), ('e', numpy.float32), ('id', numpy.int32)])
df = pandas.DataFrame(
numpy.frombuffer(x, dtype=dt),
columns=dt.names
)
return (df.col1,df.col2,df.col3)
result = rdd.mapValues(lambda x: func1(x))
result.first()
But this gives me a single entry with the full column:
('file',
(0 2317613314222
...
4026940 7317606063913
Name: col1, Length: 4026941, dtype: int64,
0 1.551823
...
4026940 2.379845
Name: col2, Length: 4026941, dtype: float32,
0 556
...
4026940 131336
Name: col3, Length: 4026941, dtype: int32))
How can I load this file?
EDIT: Small excerpt of the file:
with open(file, mode="rb") as open_file:
contents = open_file.readlines()
contents[0:5]
Results in:
[b'\xae\xb0\x84\x9c\x1b\x02\x00\x00 \xa2\xc6?,\x02\x00\x00\x0cB\x95\x9c\x1b\x02\x00\x00\xe0a\x9a?\x19\x02\x02\x00\x0f\xf7\xa4\x9c\x1b\x02\x00\x00`\xe9\x82?0\x03\x02\x00\x96@\x03\x9d\x1b\x02\x00\x00\xd0H\x05@;\x02\x02\x00\xd5\n',
b'\n',
b'\x9d\x1b\x02\x00\x00\x00^\xa1?\x0f\x01\x00\x00nq,\x9d\x1b\x02\x00\x00\xe0\x89\xad?\xae\x03\x02\x00F\x8e\xb6\x9d\x1b\x02\x00\x00@U\xd1?<\x03\x02\x00\xc3_\xfa\x9d\x1b\x02\x00\x00@}\x87?)\x02\x02\x00\xac\x92K\x9e\x1b\x02\x00\x00P/\x1f@\n',
b"\x02\x04\x00\x07Q\x9a\x9e\x1b\x02\x00\x00PI\x04@,\x01\x02\x00\x04-\xb2\x9e\x1b\x02\x00\x00\x80\xdc\xf0?\x1d\x00\x04\x00\x0cw\xbd\x9e\x1b\x02\x00\x00\xa0-\xef?\x0c\x02\x02\x00\xb0\x86\xcf\x9e\x1b\x02\x00\x00 \xc2\xb4?,\x02\x00\x00\x12\x03\x1e\x9f\x1b\x02\x00\x00\x80\xb6\x85?)\x02\x02\x00E\xc9w\x9f\x1b\x02\x00\x000\xf3\x03@\x13\x00\x04\x00P\x1b\x91\x9f\x1b\x02\x00\x00\x00\xea\x06@%\x00\x00\x00\x9b:\x9c\x9f\x1b\x02\x00\x00\xe0T\x0b@\x06\x03\x00\x00\x9b\x9f\xa4\x9f\x1b\x02\x00\x00\xc0/\xf4?\x06\x03\x00\x00Z\xcb\xb8\x9f\x1b\x02\x00\x00\x00A\xe1?!\x02\x02\x00\xbcJ\xbd\x9f\x1b\x02\x00\x00\xe0\xc9\xd2?!\x02\x04\x00\x06]\xd0\x9f\x1b\x02\x00\x00`D\xda?\x1d\x00\x04\x00hB\xde\x9f\x1b\x02\x00\x00\xe0\x10\xff?\x1d\x01\x02\x00\x9fi0\xa0\x1b\x02\x00\x00\xa0f\xec?\x86\x03\x00\x00\xf5Ws\xa0\x1b\x02\x00\x00 \xd5\xca?\x1d\x00\x04\x00L\xa0\x8d\xa0\x1b\x02\x00\x00\xc0#\xda?\x1d\x00\x04\x00|<,\xa1\x1b\x02\x00\x00 \xbc\xd1?\x1d\x00\x04\x00\x8b\xfb2\xa1\x1b\x02\x00\x00\xa0\xbf\xcb?\x08\x02\x02\x00d\xd2X\xa1\x1b\x02\x00\x00 \xc6\xb4?5\x00\x04\x00\xae\x1fc\xa1\x1b\x02\x00\x00@\x07\x90?1\x03\x02\x00\xf3\x80g\xa1\x1b\x02\x00\x00`\xbd\xde?4\x00\x04\x00g\x1dm\xa1\x1b\x02\x00\x00@7\x96?\x98\x03\x00\x00\xb8@|\xa1\x1b\x02\x00\x00PK\x11@\x06\x03\x00\x00\xedj\x83\xa1\x1b\x02\x00\x00\xc0\x11\xdd?,\x02\x00\x00\xb1\xbd\x8e\xa1\x1b\x02\x00\x00\xa0\xc7\xc5?\r\x02\x02\x00\xbd\x0f\xba\xa1\x1b\x02\x00\x00 \xe3\xc1?\x1f\x01\x02\x00\xf5\xa6\xf5\xa1\x1b\x02\x00\x00\x80\xdf\xcd?\x06\x01\x00\x00'\xb5 \xa2\x1b\x02\x00\x00\x00\x02\xb6?\x1d\x00\x04\x00\xfas/\xa2\x1b\x02\x00\x00\xc0\xb2\xbb?\x98\x03\x02\x00=\xaan\xa2\x1b\x02\x00\x00`\xaf\xe8?\x08\x02\x02\x00\xa2\x83\x8f\xa2\x1b\x02\x00\x00\x00\x02\xcd?\x1d\x00\x04\x00\xb2\xce\xcb\xa2\x1b\x02\x00\x00`\x9e\xc1?\x1a\x03\x00\x00\x95\x9f\xef\xa2\x1b\x02\x00\x00\xe0\xa4\x8c?)\x02\x02\x005\x86\xfa\xa2\x1b\x02\x00\x00 \x86\xc8?\x98\x03\x02\x00bH\x12\xa3\x1b\x02\x00\x00\xf0\x1b\x1e@\x8c\x02\x02\x00\xa6\xfa\x1b\xa3\x1b\x02\x00\x00\xe0\n",
b'\xaf?\x1d\x00\x04\x00\xfb\xcd3\xa3\x1b\x02\x00\x00`\xd2\xde?\x84\x03\x00\x00\x81\xcaQ\xa3\x1b\x02\x00\x00 \xc0\xdb?8\x01\x02\x00t\x01\x9a\xa3\x1b\x02\x00\x00\x803\xf8?#\x01\x02\x00@\xdb\xa8\xa3\x1b\x02\x00\x00@k\x02@\x84\x03\x00\x00r\x8e&\xa4\x1b\x02\x00\x00\xe0\x96\xc8?\x1d\x01\x02\x00\xa3\x05?\xa4\x1b\x02\x00\x00\xa0v\xd2?\x1d\x00\x04\x00E\xc3\x8c\xa4\x1b\x02\x00\x000#\x0c@\x02\x01\x02\x00\xf3n\x9f\xa4\x1b\x02\x00\x00\xf0\x06\x13@8\x01\x02\x00\xac<\xc5\xa4\x1b\x02\x00\x00\x80A\x9f?\x8a\x03\x00\x00}\xfc\xe5\xa4\x1b\x02\x00\x00\x80\x00\xc4?\x19\x02\x02\x00\x126\xff\xa4\x1b\x02\x00\x00 \x1d\xac?\n']
And the head(5)
of the dataframe:
col1 col2 col3
0 2317613314222 1.551823 556
1 2317614400012 1.206112 131609
2 2317615429391 1.022747 131888
3 2317621608598 2.082569 131643
4 2317622053589 1.260681 271
One problem was that spark.sparkContext.binaryFiles()
is meant for several files. In this case, spark.sparkContext.binaryRecord()
should be used.
For my case:
rdd = spark.sparkContext.binaryRecords(file,16)
def func1(x):
dt = numpy.dtype([('col1', numpy.int64), ('col2', numpy.float32), ('col3', numpy.int32)])
df = pandas.DataFrame(
numpy.frombuffer(x, dtype=dt),
columns=dt.names
)
return df.to_dict("index")[0]
result = rdd.map(func1)
columns = types.StructType([
types.StructField('col1', types.LongType(), True),
types.StructField('col2', types.FloatType(), True),
types.StructField('col3', types.IntegerType(), True)
])
df = result.toDF(schema=columns)
The module struct
can also be used instead of numpy
.