We are trying to publish deltas from a Hive table to Kafka. The table in question is a single partition, single block file of 244 MB. Our cluster is configured for a 256M block size, so we're just about at the max for a single file in this case.
Each time that table is updated, a copy is archived, then we run our delta process.
In the function below, we have isolated the different joins and have confirmed that the inner join performs acceptably (about 3 minutes), but the two antijoin dataframes will not complete -- we keep throwing more resources at the Spark job, but are continuing to see the errors below.
Is there a practical limit on dataframe sizes for this kind of join?
private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset)
extends BasePublisher(spark, sink, source) with Serializable {
val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object
def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = {
val current = spark.read.parquet(dataLocation)
val previous = spark.read.parquet(archiveLocation)
val inserts = current.join(previous, keys, "leftanti")
val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn))
val deletes = previous.join(current, keys, "leftanti")
val upsertCounter = spark.sparkContext.longAccumulator("upserts")
val deleteCounter = spark.sparkContext.longAccumulator("deletes")
logInfo("sending inserts to kafka")
sink.sendDeltasToKafka(inserts, "U", upsertCounter)
logInfo("sending updates to kafka")
sink.sendDeltasToKafka(updates, "U", upsertCounter)
logInfo("sending deletes to kafka")
sink.sendDeltasToKafka(deletes, "D", deleteCounter)
(upsertCounter.value, deleteCounter.value)
}
}
The errors we're seeing seems to indicate that the driver is losing contact with the executors. We have increased the executor memory up to 24G and the network timeout as high as 900s and the heartbeat interval as high as 120s.
17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at ...
Later in the logs:
17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at ...
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver.
The config switches we have been manipulating (without success) are --executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s
The option I failed to consider is to increase my driver resources. I added --driver-memory 4G
and --driver-cores 2
and saw my job complete in about 9 minutes.
It appears that an inner join of these two files (or using the built-in except()
method) puts memory pressure on the executors. Partitioning on one of the key columns seems to help ease that memory pressure, but increases overall time because there is more shuffling involved.
Doing the left-anti join between these two files requires that we have more driver resources. Didn’t expect that.