I have a spark structured streaming job that does the following:
- Streams from S3 Folder a file containing json (many json lines... like 12million)
- Filters them to exclude a couple of million
- Call an external HTTP api with each json (using concurrency)
- Write the response data to a Kafka topic
My source S3 folder can have up to 48 or more files, therefore I am using the:
.option("maxFilesPerTrigger", 1)
My EMR cluster is: (1 Master + 2 Slave Nodes) (each is of type: m5.2xlarge)
Each equipped with 8 cores and 32GB of memory.
In my spark job, I want to know what these options should be?
spark-submit \
--master yarn \
--conf spark.dynamicAllocation.enabled=false \
--executor-memory ??g \
--driver-memory ??g \
--executor-cores ?? \
--num-executors ?? \
--queue default \
--deploy-mode cluster \
....
I want to distribute the load equally because I've been playing around with it and it seems like the transactions per sec that I am seeing on the HTTP endpoint is up/down and I think is a direct result of my parameters. I don't want to take the WHOLE cluster too. Any ideas?
Graph shows transactions per min of the HTTP endpoint being called.
it depend of your time requirements, others jobs... firstly you should maybe try with full cluster.
1 master + 2 slave = 3.
cores = 3 * 8 = 24
memory = 3 * 32 = 96
recommended number of core : 5 , we will decrease to 4 to dont have core left.
--executor-cores 4
number of executor = 24/4 = 6 (1 master and 5 executors)
--num-executors 5
executor-memory/driver-memory : (6/96)- ~10% = 14g
final parameter :
spark-submit \
--master yarn \
--conf spark.dynamicAllocation.enabled=false \
--executor-memory 14g \
--driver-memory 14g \
--executor-cores 4 \
--num-executors 5 \
--queue default \
--deploy-mode cluster \
....
you can easly remove some Go from driver to give it to executors..