I am working on SparkR and want to speed up processing using arrow package in Databricks. However, I am getting following error when I execute collect(df) after SparkR::dapply or gapply:
Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : Error in readBin(con, raw(), as.integer(dataLen), endian = "big") :
invalid 'n' argument
I am using SalesRecords data of 5 million, just for example. The code is as below:
library(SparkR)
SparkR::sparkR.session(sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
library(arrow)
arrow::arrow_available()
dfSchema <- structType(structField("Region", "string")
,structField("Country", "string")
,structField("ItemType", "string")
,structField("SalesChannel", "string")
,structField("OrderPriority", "string")
,structField("OrderDate", "string")
,structField("OrderID", "int")
,structField("ShipDate", "string")
,structField("UnitsSold", "int")
,structField("UnitPrice", "int")
,structField("UnitCost", "int")
,structField("TotalRevenue", "int")
,structField("TotalCost", "int")
,structField("TotalProfit", "int")
)
spark_df <- SparkR::read.df(path="/FileStore/tables/SalesRecords_5m.csv", source="csv", schema=dfSchema)
# Apply an R native function to each partition.
returnSchema <- structType(structField("UnitsSold", "int"))
df <- SparkR::dapply(spark_df
, function(rdf) { data.frame(rdf$UnitsSold + 1) }
, returnSchema
)
collect(df)
When I switch off arrow by setting spark.sql.execution.arrow.sparkr.enabled as false then the entire code runs without any error. Therefore, arrow is not working and how can I fix this error?
Note: I am using following versions: Spark 3.1.1, arrow 4.0.1, R version 4.0.4
Output of sessionInfo() is:
R version 4.0.4 (2021-02-15)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 18.04.5 LTS
Matrix products: default
BLAS: /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.7.1
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.7.1
locale:
[1] LC_CTYPE=C.UTF-8 LC_NUMERIC=C LC_TIME=C.UTF-8
[4] LC_COLLATE=C.UTF-8 LC_MONETARY=C.UTF-8 LC_MESSAGES=C.UTF-8
[7] LC_PAPER=C.UTF-8 LC_NAME=C LC_ADDRESS=C
[10] LC_TELEPHONE=C LC_MEASUREMENT=C.UTF-8 LC_IDENTIFICATION=C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] SparkR_3.1.1
loaded via a namespace (and not attached):
[1] Rcpp_1.0.5 magrittr_2.0.1 tidyselect_1.1.0 bit_4.0.4
[5] xtable_1.8-4 R6_2.5.0 rlang_0.4.9 fastmap_1.0.1
[9] hwriter_1.3.2 tools_4.0.4 arrow_4.0.1 htmltools_0.5.0
[13] bit64_4.0.5 digest_0.6.27 assertthat_0.2.1 Rserve_1.8-7
[17] shiny_1.5.0 purrr_0.3.4 later_1.1.0.1 hwriterPlus_1.0-3
[21] vctrs_0.3.5 promises_1.1.1 glue_1.4.2 mime_0.9
[25] compiler_4.0.4 TeachingDemos_2.10 httpuv_1.5.4
I can offer you a partial solution though I'm not exactly sure of the precise cause.
There is something happening here with a mismatch of variable types - you are trying to create a field of type "int" but the code there actually creates a field of type "double".
If you add an 'L' to the value you are adding, does this help?
df <- SparkR::dapply(spark_df
, function(rdf) { data.frame(rdf$UnitsSold + 1L) }
, returnSchema
)