Search code examples
scalaapache-sparkoptimizationgeotools

spark whole textiles - many small files


I want to ingest many small text files via spark to parquet. Currently, I use wholeTextFiles and perform some parsing additionally.

To be more precise - these small text files are ESRi ASCII Grid files each with a maximum size of around 400kb. GeoTools are used to parse them as outlined below.

Do you see any optimization possibilities? Maybe something to avoid the creation of unnecessary objects? Or something to better handle the small files. I wonder if it is better to only get the paths of the files and manually read them instead of using String -> ByteArrayInputStream.

case class RawRecords(path: String, content: String)
case class GeometryId(idPath: String, value: Double, geo: String)
@transient lazy val extractor = new PolygonExtractionProcess()
@transient lazy val writer = new WKTWriter()

def readRawFiles(path: String, parallelism: Int, spark: SparkSession) = {
    import spark.implicits._
    spark.sparkContext
      .wholeTextFiles(path, parallelism)
      .toDF("path", "content")
      .as[RawRecords]
      .mapPartitions(mapToSimpleTypes)
  }

def mapToSimpleTypes(iterator: Iterator[RawRecords]): Iterator[GeometryId] = iterator.flatMap(r => {
    val extractor = new PolygonExtractionProcess()

    // http://docs.geotools.org/latest/userguide/library/coverage/arcgrid.html
    val readRaster = new ArcGridReader(new ByteArrayInputStream(r.content.getBytes(StandardCharsets.UTF_8))).read(null)

    // TODO maybe consider optimization of known size instead of using growable data structure
    val vectorizedFeatures = extractor.execute(readRaster, 0, true, null, null, null, null).features
    val result: collection.Seq[GeometryId] with Growable[GeometryId] = mutable.Buffer[GeometryId]()

    while (vectorizedFeatures.hasNext) {
      val vectorizedFeature = vectorizedFeatures.next()
      val geomWKTLineString = vectorizedFeature.getDefaultGeometry match {
        case g: Geometry => writer.write(g)
      }
      val geomUserdata = vectorizedFeature.getAttribute(1).asInstanceOf[Double]
      result += GeometryId(r.path, geomUserdata, geomWKTLineString)
    }
    result
  })

Solution

  • I have suggestions:

    1. use wholeTextFile -> mapPartitions -> convert to Dataset. Why? If you make mapPartitions on Dataset, then all rows are converted from internal format to object - it causes additional serialization.
    2. Run Java Mission Control and sample your application. It will show all compilations and times of execution of methods
    3. Maybe you can use binaryFiles, it will give you Stream, so you can parse it without additional reading in mapPartitions