Search code examples
apache-sparkcassandraspark-cassandra-connector

why I got the error: "Size exceed Integer.MAX_VALUE" when using spark+cassandra?


I have 7 cassandra nodes (5 nodes with 32 cores and 32G memory, and 4 nodes with 4 cores and 64G memory), and deployed the spark workers on this cluster and spark's master was in the 8th node. And I used spark-cassandra-connector for them. Now my cassandra has almost 1 billion records with 30 fields, I write the scala including the following snippet:

def startOneCache(): DataFrame = {
val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "192.168.0.184")
  .set("spark.cassandra.auth.username", "username")
  .set("spark.cassandra.auth.password", "password")
  .set("spark.driver.maxResultSize", "4G")
  .set("spark.executor.memory", "12G")
  .set("spark.cassandra.input.split.size_in_mb","64")

val sc = new SparkContext("spark://192.168.0.131:7077", "statistics", conf)
val cc = new CassandraSQLContext(sc)
val rdd: DataFrame = cc.sql("select user_id,col1,col2,col3,col4,col5,col6
,col7,col8 from user_center.users").limit(100000192)
val rdd_cache: DataFrame = rdd.cache()

rdd_cache.count()
return rdd_cache
}

In the spark's master I use spark-submit to run the above code,when executing the statement: rdd_cache.count(), I got a ERROR in the one worker node: 192.168.0.185:

16/03/08 15:38:57 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 221 ms
16/03/08 15:43:49 WARN MemoryStore: Not enough space to cache rdd_6_0 in memory! (computed 4.6 GB so far)
16/03/08 15:43:49 INFO MemoryStore: Memory use = 61.9 KB (blocks) + 4.6 GB (scratch space shared across 1 tasks(s)) = 4.6 GB. Storage limit = 6.2 GB.
16/03/08 15:43:49 WARN CacheManager: Persisting partition rdd_6_0 to disk instead.
16/03/08 16:13:11 ERROR Executor: Managed memory leak detected; size = 4194304 bytes, TID = 24002
16/03/08 16:13:11 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 24002)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

I simply thought the final error Size exceeds Integer.MAX_VALUE is caused by the warn: 16/03/08 15:43:49 WARN MemoryStore: Not enough space to cache rdd_6_0 in memory! (computed 4.6 GB so far) before it, but I don't know why, or whether I should set a larger than .set("spark.executor.memory", "12G") , what should I do for correcting this?


Solution

  • No Spark shuffle block can be greater than 2 GB.

    Spark uses ByteBuffer as abstraction for storing blocks and its size is limited by Integer.MAX_VALUE (2 billions).

    Low number of partitions can lead to high shuffle block size. To solve this issue try to increase the number of partitions using rdd.repartition() or rdd.coalesce() or.

    If this doesn't help, it means that at least one of your partitions is still too big and you may need to use some more sophisticated approach to make it smaller - for example use randomness to equalize distribution of RDD data between individual partitions.