Search code examples
scalaapache-sparkbinaryfiles

Reading binary file of doubles in spark produces wrong output


I have a binary file that was generated in C. The file has a fixed record length of 26000 doubles and contains 1067 records with no separators. I need to read it in spark and get the double values. I also have a Python code that gets the values of the doubles and using Files.readAllBytes from java in the spark-shell I also get those values, so based on the Python output I expect the first 1000 doubles of the first record to have the value -3509.580466022612. Taken from spark-shell:

import java.nio.ByteBuffer
import java.nio.ByteOrder
val doubleByteSize = 8
val recordLength = 1000 * 26 * doubleByteSize
val bytesRdd = sc.binaryRecords("file:///myBinaryFile.val", recordLength)
val arrayOfRecords = bytesRdd.collect
val firstRecord = arrayOfRecords(0)
// group 8 bytes together to transform them to doubles
val listOfDoubles = firstRecord.grouped(doubleByteSize).toList
// I get 1000 times the same double but isn't -3509.580466022612 it's 1.1848107264484659E181
val result = listOfDoubles.map(arrayOfBytes => ByteBuffer.wrap(arrayOfBytes).getDouble)
// try with little endian and it is wrong again -6.045003065652023E-27
val result2 = listOfDoubles.map(arrayOfBytes => ByteBuffer.wrap(arrayOfBytes).order(ByteOrder.LITTLE_ENDIAN).getDouble)

The number of records looks correct (arrayOfRecords.length = 1068), the number of bytes per record looks good to me (firstRecord.length = 208000) and the first 1000 doubles contains the same value, everything as expected but the double value (1.1848107264484659E181) is not the expected one (-3509.580466022612). I tried to change it to little endian but the number is still wrong (-6.045003065652023E-27). Python code:

def str_all(data):
 ret_str = ""
 for d in data:
    ret_str+= " " + str(d)
 return ret_str

def main():

 num_sim = 1000
 num_ts = 26

 record_index = 0

 deal_data = array('d')
 with open("/myBinaryFile.val","rb") as data_file:
    data_file.seek(SIZEOFDOUBLE*record_index*num_sim*num_ts)
    deal_data.fromfile(data_file,num_sim*num_ts)
 ts_index = 0
 deal_range = slice(ts_index*num_sim,(ts_index+1)*num_sim)  
 # it prints 1000 times -3509.580466022612
 print(str_all(deal_data[deal_range]))

Simple java code to read the binary (from spark-shell) gets the expected value:

val byteArray = Files.readAllBytes(Paths.get("/mybinaryFile.val"))
// gets the correct value -3509.580466022612
ByteBuffer.wrap(byteArray).order(ByteOrder.LITTLE_ENDIAN).getDouble

Anyone has any idea on what's going on here?

Thanks in advance.

Spark version 1.6.0, Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67) Python version 2.6


Solution

  • The problem was not related with the binary data itself, when I was doing:

    val arrayOfRecords = bytesRdd.collect
    val firstRecord = arrayOfRecords(0)
    

    I wasn't getting the Array ordered, changing that solved the issue:

    val firstRecord = bytesRdd.first
    

    Looks like collect doesn't preserve the order. Thanks The Archetypal Paul for your time and help.