Search code examples
scalaapache-sparkamazon-s3emr

Spark and Amazon EMR: S3 Connections not being closed


My application loops through the lines of a text file containing S3 directories, reads them in, performs ETL processes, and then writes back out to S3, it's failed in the same place several times (after about 80 loops) so I'm thinking that Spark's not closing my S3 connections and my allocated connection pool is getting exhausted. The error is:

16/09/14 19:30:49 INFO AmazonHttpClient: Unable to execute HTTP request: Timeout waiting for connection from pool
com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:226)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:195)
    at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.getConnection(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:423)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
    at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1015)
    at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:991)
    at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:212)
    at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
    at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780)
    at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1428)
    at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:313)
    at org.apache.spark.sql.execution.datasources.DataSource.hasMetadata(DataSource.scala:289)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:324)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:452)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:458)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:451)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
    at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26)
    at com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:26)
    at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:116)
    at com.webpt.SparkMaxwell$$anonfun$main$1.apply(SparkMaxwell.scala:105)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at com.webpt.SparkMaxwell$.main(SparkMaxwell.scala:105)
    at com.webpt.SparkMaxwell.main(SparkMaxwell.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  

Relevant code:

val primaryKeyFile = sparkSession.sparkContext.textFile(keys)

    for(line <- primaryKeyFile.collect()){

      //keys text file is of syntax tableName (tab) key,key,key
      val (tableName, tableKeys) = (line.split("\t")(0).toLowerCase(), line.split("\t")(1).toLowerCase.split(",").toSeq)
        //if the table exists in master
        if (FileSystem.get(new URI(s"$lake/$tableName/$yesterday"), sparkSession.sparkContext.hadoopConfiguration)
          .exists(new Path(s"$lake/$tableName/$yesterday"))) {
          if (!distinctTables.contains(tableName)) {
            //if exists only in master
            //read in the master data from yesterday and copy it to today after changing columns to lower case
            val masterDF = sparkSession.sqlContext.read.avro(s"$lake/$tableName/$yesterday/*").repartition(1500)
            masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today")
          }
          else {
            //if exists both in master and input
            //filter out all unrelated data out of the testFreeDF
            val soloDF = testFreeDF.filter(lower(col("table")).equalTo(tableName))
            //Replace the encrypted data column with a decrypted data column, then turn each row into strings so sqlContext.read.json can read the json string into a DF
            val decryptedDF = sparkSession.sqlContext.read.json(soloDF.withColumn("data", decryptUDF(soloDF("data"))).select("data").rdd.map(row => row.toString()))
            //lowercase the column names by selecting the column names in lower case
            val lowerCaseDF = decryptedDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*)

            //read in masterDF and create list of columns in lower case so we can check for schema changes

            val masterDF = sparkSession.sqlContext.read.avro(s"$lake/$tableName/$yesterday/*").repartition(1500)
            val lowerCaseDFColumns = lowerCaseDF.columns
            val lowerCaseMasterDFColumns = masterDF.columns.map(column => column.toLowerCase)

            //if the columns are the same..
            if (lowerCaseDFColumns.toSet == lowerCaseMasterDFColumns.toSet) {
              //union the two tables adding a new old/new column, input data with value of 2, master data with value of 1
              val finalFrame: DataFrame = lowerCaseDF.withColumn("old/new", lit("2")).union(masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).withColumn("old/new", lit("1"))).repartition(1500)

              masterDF.unpersist()
              lowerCaseDF.unpersist()

              val mergeWindowFunction = Window.partitionBy(tableKeys.head, tableKeys.tail: _*).orderBy(desc("old/new"))
              //call window function, partitions by the primary keys and orders by old/new, for each partition the highest old/new is kept, meaning duplicates in master are dropped
              finalFrame.withColumn("rownum", row_number.over(mergeWindowFunction)).where("rownum = 1")
                .drop("rownum").drop("old/new").write.mode("append").avro(s"$lake/$tableName/$today")

              finalFrame.unpersist()

            }
            //if we have different columns then we need to adjust, in the end we want to keep all columns
            else {

              //create select statements for each dataframe that maintains existing columns, and adds null columns for the columns they don't have
              val masterExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field =>
                //if the field already exists in master schema, we add the name to our select statement
                if (lowerCaseMasterDFColumns.contains(field)) {
                  col(field.toLowerCase)
                }
                //else, we hardcode a null column in for that name
                else {
                  lit(null).alias(field.toLowerCase)
                }
              )

              //same thing for input, maybe we could get this down to one map()
              val inputExprs = lowerCaseDFColumns.union(lowerCaseMasterDFColumns).distinct.map(field =>
                //if the field already exists in master schema, we add the name to our select statement
                if (lowerCaseDFColumns.contains(field)) {
                  col(field.toLowerCase)
                }
                //else, we hardcode a null column in for that name
                else {
                  lit(null).alias(field.toLowerCase)
                }
              )

              val mergeWindowFunction = Window.partitionBy(tableKeys.head, tableKeys.tail: _*).orderBy(desc("old/new"))

              //same process, we just use our select statements beforehand
              masterDF.select(masterExprs: _*).withColumn("old/new", lit("2"))
                .union(lowerCaseDF.select(inputExprs: _*).withColumn("old/new", lit("1"))).repartition(1500)
                .withColumn("rownum", row_number.over(mergeWindowFunction)).where("rownum = 1").drop("rownum")
                .drop("old/new").write.mode("append").avro(s"$lake/$tableName/$today")

              masterDF.unpersist()
              lowerCaseDF.unpersist()
            }

          }
        }
        else {
          if (distinctTables.contains(tableName)) {
            //if the input doesn't exist in master, we filter out unrelated data, decrypt it, set the columns to be lower case, and then write it to master
            val soloDF = testFreeDF.filter(lower(col("table")).equalTo(tableName))
            val decryptedDF = sparkSession.sqlContext.read.json(soloDF.withColumn("data", decryptUDF(soloDF("data"))).select("data").rdd.map(row => row.toString()))
            val lowerCaseDF = decryptedDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*)
            lowerCaseDF.select(decryptedDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today")
          }
        }
    }

The line that it's failing on is:

masterDF.select(masterDF.columns.map(name => col(name).as(name.toLowerCase())): _*).write.mode("append").avro(s"$lake/$tableName/$today")  

So what can I do? Is there any way to ensure that Spark is closing connections after reads/writes to S3?


Solution

  • This is caused by a bug in spark-avro, which should be fixed in the newly-published 3.0.1 release.