Search code examples
apache-sparkapache-spark-sqlemramazon-emr

Spark: running spark-submit with the correct number of executors


I've set up a basic EMR 3 node cluster, and run spark-submit with an --executor-memory setting of 1G and no other configs.

The script itself is a basic benchmarking task:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
import time

conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# sample data in lineitem table:
# 3|1284483|34508|3|27|39620.34|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |
def mapper(lines):
    x = lines.split("|")
    return Row( rownum=int(x[0]),
        l_orderkey=int(x[0]),
        l_partkey=int(x[1]),
        l_suppkey=int(x[2]),
        l_linenumber=int(x[3]),
        l_quantity=int(x[4]),
        l_extendedprice=float(x[5]),
        l_discount=float(x[6]),
        l_tax=float(x[7]),
        l_returnflag=x[8],
        l_linestatus=x[9],
        l_shipdate=x[10],
        l_commitdate=x[11],
        l_receiptdate=x[12],
        l_shipinstruct=x[13],
        l_shipment=x[14],
        l_comment=x[15],
    )

# ORDERKEY
# PARTKEY
# SUPPKEY
# LINENUMBER
# QUANTITY
# EXTENDEDPRICE
# DISCOUNT
# TAX
# RETURNFLAG
# LINESTATUS
# SHIPDATE
# COMMITDATE
# RECEIPTDATE
# SHIPINSTRUCT
# SHIPMODE
# COMMENT

rdd = sc.textFile("s3://sampletpchdata/10gb/lineitem.tbl.*")

# kick off an initial count
print rdd.count()

sample = rdd.map(mapper)

schemaSample = sqlContext.createDataFrame( sample )
schemaSample.registerTempTable("lineitem")

# run TPCH query 1
results = sqlContext.sql("""
SELECT
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
    l_shipdate <= date_sub(cast('1998-12-01' as date), '60')
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus
""")

# kick off a final count of the results
print results.count()

And while that's going on, I looked at the result of the spark API's executors endpoint, and got this result:

[ {
  "id" : "driver",
  "hostPort" : "10.232.13.130:47656",
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 0,
  "totalTasks" : 0,
  "totalDuration" : 0,
  "totalInputBytes" : 0,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 0,
  "maxMemory" : 7975010304,
  "executorLogs" : { }
}, {
  "id" : "1",
  "hostPort" : "ip-10-232-13-123.us-west-1.compute.internal:58544",
  "rddBlocks" : 0,
  "memoryUsed" : 0,
  "diskUsed" : 0,
  "activeTasks" : 0,
  "failedTasks" : 0,
  "completedTasks" : 641,
  "totalTasks" : 641,
  "totalDuration" : 4998902,
  "totalInputBytes" : 3490792,
  "totalShuffleRead" : 0,
  "totalShuffleWrite" : 395870,
  "maxMemory" : 7790985216,
  "executorLogs" : {
    "stdout" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stdout?start=-4096",
    "stderr" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stderr?start=-4096"
  }
} ]

And unless i'm misunderstanding this result, it appears that in my 3 node cluster, there is only 1 driver and 1 executor. Is this what is happening? If so, shouldn't there be more executors than this? and how do I make that happen?


Solution

  • You'll have to also use --num-executors to choose the number of executors you want to run your code.