Search code examples
rapache-sparkdatabrickssparkrapache-arrow

How to fix Error in readBin() using arrow package with SparkR?


I am working on SparkR and want to speed up processing using arrow package in Databricks. However, I am getting following error when I execute collect(df) after SparkR::dapply or gapply:

Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : 
  invalid 'n' argument

I am using SalesRecords data of 5 million, just for example. The code is as below:

library(SparkR)

SparkR::sparkR.session(sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

library(arrow)

arrow::arrow_available()

dfSchema <- structType(structField("Region", "string")
                       ,structField("Country", "string")
                       ,structField("ItemType", "string")
                       ,structField("SalesChannel", "string")
                       ,structField("OrderPriority", "string")
                       ,structField("OrderDate", "string")
                       ,structField("OrderID", "int")
                       ,structField("ShipDate", "string")
                       ,structField("UnitsSold", "int")
                       ,structField("UnitPrice", "int")
                       ,structField("UnitCost", "int")
                       ,structField("TotalRevenue", "int")
                       ,structField("TotalCost", "int")
                       ,structField("TotalProfit", "int")
                      )
spark_df <- SparkR::read.df(path="/FileStore/tables/SalesRecords_5m.csv", source="csv", schema=dfSchema)

# Apply an R native function to each partition.
returnSchema <-  structType(structField("UnitsSold", "int"))

df <- SparkR::dapply(spark_df
                    , function(rdf) { data.frame(rdf$UnitsSold + 1) }
                    , returnSchema
                   )

collect(df)

When I switch off arrow by setting spark.sql.execution.arrow.sparkr.enabled as false then the entire code runs without any error. Therefore, arrow is not working and how can I fix this error?

Note: I am using following versions: Spark 3.1.1, arrow 4.0.1, R version 4.0.4

Output of sessionInfo() is:

R version 4.0.4 (2021-02-15)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 18.04.5 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.7.1
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.7.1

locale:
 [1] LC_CTYPE=C.UTF-8       LC_NUMERIC=C           LC_TIME=C.UTF-8       
 [4] LC_COLLATE=C.UTF-8     LC_MONETARY=C.UTF-8    LC_MESSAGES=C.UTF-8   
 [7] LC_PAPER=C.UTF-8       LC_NAME=C              LC_ADDRESS=C          
[10] LC_TELEPHONE=C         LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C   

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] SparkR_3.1.1

loaded via a namespace (and not attached):
 [1] Rcpp_1.0.5         magrittr_2.0.1     tidyselect_1.1.0   bit_4.0.4         
 [5] xtable_1.8-4       R6_2.5.0           rlang_0.4.9        fastmap_1.0.1     
 [9] hwriter_1.3.2      tools_4.0.4        arrow_4.0.1        htmltools_0.5.0   
[13] bit64_4.0.5        digest_0.6.27      assertthat_0.2.1   Rserve_1.8-7      
[17] shiny_1.5.0        purrr_0.3.4        later_1.1.0.1      hwriterPlus_1.0-3 
[21] vctrs_0.3.5        promises_1.1.1     glue_1.4.2         mime_0.9          
[25] compiler_4.0.4     TeachingDemos_2.10 httpuv_1.5.4 

Solution

  • I can offer you a partial solution though I'm not exactly sure of the precise cause.

    There is something happening here with a mismatch of variable types - you are trying to create a field of type "int" but the code there actually creates a field of type "double".

    If you add an 'L' to the value you are adding, does this help?

    df <- SparkR::dapply(spark_df
                        , function(rdf) { data.frame(rdf$UnitsSold + 1L) }
                        , returnSchema
                       )