Search code examples
scalasbtakka-stream

scala-akka:- Passing command line arguments to sbt run to locate .csv files


I have a command line program that calculates statistics from humidity sensor data.

I also have .csv files inside src/main/scala/data

When i do sbt "run data" or sbt "run src/main/scala/data"

Seems like it's not able to locate .csv files and i get the result as 0.

output for sbt "run src/main/scala/data"

Looking for CSV files in directory: src/main/scala/data

output for sbt "run data"

Looking for CSV files in directory: data

Num of processed files: 0

Num of processed measurements: 0

Num of failed measurements: 0

Sensors with highest avg humidity:

sensor-id,min,avg,max

expected output example:-

Num of processed files: 2

Num of processed measurements: 7

Num of failed measurements: 2

Sensors with highest avg humidity:

sensor-id,min,avg,max

s2,78,82,88

s1,10,54,98

s3,NaN,NaN,NaN

Code for reference:-

import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global

object HumiditySensorStatistics {

  case class HumidityData(sum: Double, count: Int) {
    def avg: Option[Double] = if (count > 0) Some(sum / count) else None
  }

  case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])

  def main(args: Array[String]): Unit = {
    val directoryPath = args(0)
    implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    val sensors = mutable.Map[String, HumidityData]()
    var failedMeasurements = 0

    println(s"Looking for CSV files in directory: $directoryPath")

    val fileSource = Source.fromIterator(() => new File(directoryPath).listFiles().iterator)
    val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
      .drop(1) // skip header line
      .map(_.utf8String)
      .map(line => {
        val fields = line.split(",")
        (fields(0), fields(1))
      })
    val sink = Sink.foreach[(String, String)](data => {
      val sensorId = data._1
      val humidity = data._2.toDoubleOption

      if (humidity.isDefined) {
        sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0)) match {
          case HumidityData(sum, count) => HumidityData(sum + humidity.get, count + 1)
        })
      } else {
        failedMeasurements += 1
      }
    })

    measurementSource.runWith(sink).onComplete(_ => {
      val numFilesProcessed = sensors.size
      val numMeasurementsProcessed = sensors.values.map(_.count).sum
      val numFailedMeasurements = failedMeasurements

      println(s"Num of processed files: $numFilesProcessed")
      println(s"Num of processed measurements: $numMeasurementsProcessed")
      println(s"Num of failed measurements: $numFailedMeasurements")

      val statsBySensor = sensors.map {
        case (sensorId, humidityData) =>
          val stats = SensorStats(
            min = Some(humidityData.sum / humidityData.count),
            avg = humidityData.avg,
            max = Some(humidityData.sum / humidityData.count)
          )
          (sensorId, stats)
      }

      println("Sensors with highest avg humidity:")
      println("sensor-id,min,avg,max")
      statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
        case (sensorId, stats) =>
          println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
      }
      
      system.terminate()
    })
  }
}

build.sbt

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.8"

lazy val root = (project in file("."))
  .settings(
    name := "sensor-task"
  )

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % "2.6.16",
)

.csv file data:-

sensor-id humidity
s1 80
s3 NaN
s2 78
s1 98

Solution

  • Both your sbt commands (i.e. sbt "run data" and sbt "run src/main/scala/data") look correct, with the assumption you're running sbt from the Scala project-root with source code under "src/main/scala/" and the csv files under "src/main/scala/data/".

    A couple of observed issues with the code:

    In creating fileSource there is a good possibility new File().listFiles() is getting more files than you intend to include (e.g. non csv files, hidden files, etc), resulting in a single blob after passing through Framing.delimiter() and subsequently dropped by drop(1). In such case, the "sensors" Map will be empty, resulting in all-0's in the output.

    I was able to reproduce the all-0's result using your exact source code and "build.sbt" apparently due to non-csv files (in my test case, file ".DS_Store") included in listFiles().

    Providing specific file selection criteria for listFiles() such as including only "*.csv", like below, should fix the problem:

    val fileSource = Source.fromIterator( () =>
      new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
    )
    

    Another issue is that the computation logic (humidityData.sum / humidityData.count) for the min and max are incorrect, essentially repeating the avg calculation. To calculate them, one could expand the parameters in HumidityData as follows:

    case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {...}
    

    The min/max could then be updated with something like below:

    humidity match {
      case Some(h) =>
        sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
          case HumidityData(sum, count, min, max) =>
            HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
        })
      case None =>
        failedMeasurements += 1
    }
    

    As a side note, I would recommend separating data from code by moving data files away from under "src/main/scala/", and maybe place them under, say, "src/main/resources/data/".

    Testing with the following csv data files ...

    File src/main/resources/data/sensor_data1.csv:
    sensor-id,humidity
    s1,80
    s3,NaN
    s2,78
    s1,98
    
    File src/main/resources/data/sensor_data2.csv:
    sensor-id,humidity
    s1,70
    s3,80
    s2,60
    
    $ sbt "run src/main/resources/data"
    [info] welcome to sbt 1.5.5 (Oracle Corporation Java 1.8.0_181)
    [info] loading settings for project global-plugins from idea.sbt ...
    [info] loading global plugins from /Users/leo/.sbt/1.0/plugins
    [info] loading project definition from /Users/leo/work/so-75459442/project
    [info] loading settings for project root from build.sbt ...
    [info] set current project to sensor-task (in build file:/Users/leo/work/so-75459442/)
    [info] running HumiditySensorStatistics src/main/resources/data
    Looking for CSV files in directory: src/main/resources/data
    Num of processed sensors: 3
    Num of processed measurements: 7
    Num of failed measurements: 1
    Sensors with highest avg humidity:
    sensor-id,min,avg,max
    s3,NaN,NaN,NaN
    s1,70.0,82.66666666666667,98.0
    s2,60.0,69.0,78.0
    [success] Total time: 2 s, completed Feb 16, 2023 10:33:35 AM
    

    Appended is the revised source code.

    import java.io.File
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
    import akka.util.ByteString
    import scala.collection.mutable
    import scala.concurrent.ExecutionContext.Implicits.global
    
    object HumiditySensorStatistics {
    
      case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {
        def avg: Option[Double] = if (count > 0) Some(sum / count) else None
      }
    
      case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])
    
      def main(args: Array[String]): Unit = {
        val directoryPath = args(0)
        implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
        // implicit val materializer: ActorMaterializer = ActorMaterializer()  // Not needed for Akka Stream 2.6+
        val sensors = mutable.Map[String, HumidityData]()
        var failedMeasurements = 0
    
        println(s"Looking for CSV files in directory: $directoryPath")
    
        val fileSource = Source.fromIterator( () =>
          new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
        )
        val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
          .drop(1) // skip header line
          .map(_.utf8String)
          .map(line => {
            val fields = line.split(",")
            (fields(0), fields(1))
          })
        val sink = Sink.foreach[(String, String)](data => {
          val sensorId = data._1
          val humidity = data._2.toDoubleOption
    
          humidity match {
            case Some(h) =>
              sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
                case HumidityData(sum, count, min, max) =>
                  HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
              })
            case None =>
              failedMeasurements += 1
          }
        })
    
        measurementSource.runWith(sink).onComplete(_ => {
          val numSensorsProcessed = sensors.size
          val numMeasurementsProcessed = sensors.values.map(_.count).sum
          val numFailedMeasurements = failedMeasurements
    
          println(s"Num of processed sensors: $numSensorsProcessed")
          println(s"Num of processed measurements: $numMeasurementsProcessed")
          println(s"Num of failed measurements: $numFailedMeasurements")
    
          val statsBySensor = sensors.map {
            case (sensorId, humidityData) =>
              val stats = SensorStats(
                min = Some(humidityData.min),
                avg = humidityData.avg,
                max = Some(humidityData.max)
              )
              (sensorId, stats)
          }
    
          println("Sensors with highest avg humidity:")
          println("sensor-id,min,avg,max")
          statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
            case (sensorId, stats) =>
              println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
          }
    
          system.terminate()
        })
      }
    }