I have a command line program that calculates statistics from humidity sensor data.
I also have .csv files inside src/main/scala/data
When i do sbt "run data"
or sbt "run src/main/scala/data"
Seems like it's not able to locate .csv files and i get the result as 0.
output for sbt "run src/main/scala/data"
Looking for CSV files in directory: src/main/scala/data
output for sbt "run data"
Looking for CSV files in directory: data
Num of processed files: 0
Num of processed measurements: 0
Num of failed measurements: 0
Sensors with highest avg humidity:
sensor-id,min,avg,max
expected output example:-
Num of processed files: 2
Num of processed measurements: 7
Num of failed measurements: 2
Sensors with highest avg humidity:
sensor-id,min,avg,max
s2,78,82,88
s1,10,54,98
s3,NaN,NaN,NaN
Code for reference:-
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
object HumiditySensorStatistics {
case class HumidityData(sum: Double, count: Int) {
def avg: Option[Double] = if (count > 0) Some(sum / count) else None
}
case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])
def main(args: Array[String]): Unit = {
val directoryPath = args(0)
implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val sensors = mutable.Map[String, HumidityData]()
var failedMeasurements = 0
println(s"Looking for CSV files in directory: $directoryPath")
val fileSource = Source.fromIterator(() => new File(directoryPath).listFiles().iterator)
val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
.drop(1) // skip header line
.map(_.utf8String)
.map(line => {
val fields = line.split(",")
(fields(0), fields(1))
})
val sink = Sink.foreach[(String, String)](data => {
val sensorId = data._1
val humidity = data._2.toDoubleOption
if (humidity.isDefined) {
sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0)) match {
case HumidityData(sum, count) => HumidityData(sum + humidity.get, count + 1)
})
} else {
failedMeasurements += 1
}
})
measurementSource.runWith(sink).onComplete(_ => {
val numFilesProcessed = sensors.size
val numMeasurementsProcessed = sensors.values.map(_.count).sum
val numFailedMeasurements = failedMeasurements
println(s"Num of processed files: $numFilesProcessed")
println(s"Num of processed measurements: $numMeasurementsProcessed")
println(s"Num of failed measurements: $numFailedMeasurements")
val statsBySensor = sensors.map {
case (sensorId, humidityData) =>
val stats = SensorStats(
min = Some(humidityData.sum / humidityData.count),
avg = humidityData.avg,
max = Some(humidityData.sum / humidityData.count)
)
(sensorId, stats)
}
println("Sensors with highest avg humidity:")
println("sensor-id,min,avg,max")
statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
case (sensorId, stats) =>
println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
}
system.terminate()
})
}
}
build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.8"
lazy val root = (project in file("."))
.settings(
name := "sensor-task"
)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.6.16",
)
.csv file data:-
sensor-id | humidity |
---|---|
s1 | 80 |
s3 | NaN |
s2 | 78 |
s1 | 98 |
Both your sbt
commands (i.e. sbt "run data"
and sbt "run src/main/scala/data"
) look correct, with the assumption you're running sbt
from the Scala project-root with source code under "src/main/scala/" and the csv files under "src/main/scala/data/".
A couple of observed issues with the code:
In creating fileSource
there is a good possibility new File().listFiles()
is getting more files than you intend to include (e.g. non csv files, hidden files, etc), resulting in a single blob after passing through Framing.delimiter()
and subsequently dropped by drop(1)
. In such case, the "sensors" Map will be empty, resulting in all-0's in the output.
I was able to reproduce the all-0's result using your exact source code and "build.sbt" apparently due to non-csv files (in my test case, file ".DS_Store") included in listFiles()
.
Providing specific file selection criteria for listFiles()
such as including only "*.csv", like below, should fix the problem:
val fileSource = Source.fromIterator( () =>
new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
)
Another issue is that the computation logic (humidityData.sum / humidityData.count
) for the min
and max
are incorrect, essentially repeating the avg
calculation. To calculate them, one could expand the parameters in HumidityData
as follows:
case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {...}
The min
/max
could then be updated with something like below:
humidity match {
case Some(h) =>
sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
case HumidityData(sum, count, min, max) =>
HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
})
case None =>
failedMeasurements += 1
}
As a side note, I would recommend separating data from code by moving data files away from under "src/main/scala/", and maybe place them under, say, "src/main/resources/data/".
Testing with the following csv data files ...
File src/main/resources/data/sensor_data1.csv:
sensor-id,humidity
s1,80
s3,NaN
s2,78
s1,98
File src/main/resources/data/sensor_data2.csv:
sensor-id,humidity
s1,70
s3,80
s2,60
$ sbt "run src/main/resources/data"
[info] welcome to sbt 1.5.5 (Oracle Corporation Java 1.8.0_181)
[info] loading settings for project global-plugins from idea.sbt ...
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading project definition from /Users/leo/work/so-75459442/project
[info] loading settings for project root from build.sbt ...
[info] set current project to sensor-task (in build file:/Users/leo/work/so-75459442/)
[info] running HumiditySensorStatistics src/main/resources/data
Looking for CSV files in directory: src/main/resources/data
Num of processed sensors: 3
Num of processed measurements: 7
Num of failed measurements: 1
Sensors with highest avg humidity:
sensor-id,min,avg,max
s3,NaN,NaN,NaN
s1,70.0,82.66666666666667,98.0
s2,60.0,69.0,78.0
[success] Total time: 2 s, completed Feb 16, 2023 10:33:35 AM
Appended is the revised source code.
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
object HumiditySensorStatistics {
case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {
def avg: Option[Double] = if (count > 0) Some(sum / count) else None
}
case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])
def main(args: Array[String]): Unit = {
val directoryPath = args(0)
implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
// implicit val materializer: ActorMaterializer = ActorMaterializer() // Not needed for Akka Stream 2.6+
val sensors = mutable.Map[String, HumidityData]()
var failedMeasurements = 0
println(s"Looking for CSV files in directory: $directoryPath")
val fileSource = Source.fromIterator( () =>
new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
)
val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
.drop(1) // skip header line
.map(_.utf8String)
.map(line => {
val fields = line.split(",")
(fields(0), fields(1))
})
val sink = Sink.foreach[(String, String)](data => {
val sensorId = data._1
val humidity = data._2.toDoubleOption
humidity match {
case Some(h) =>
sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
case HumidityData(sum, count, min, max) =>
HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
})
case None =>
failedMeasurements += 1
}
})
measurementSource.runWith(sink).onComplete(_ => {
val numSensorsProcessed = sensors.size
val numMeasurementsProcessed = sensors.values.map(_.count).sum
val numFailedMeasurements = failedMeasurements
println(s"Num of processed sensors: $numSensorsProcessed")
println(s"Num of processed measurements: $numMeasurementsProcessed")
println(s"Num of failed measurements: $numFailedMeasurements")
val statsBySensor = sensors.map {
case (sensorId, humidityData) =>
val stats = SensorStats(
min = Some(humidityData.min),
avg = humidityData.avg,
max = Some(humidityData.max)
)
(sensorId, stats)
}
println("Sensors with highest avg humidity:")
println("sensor-id,min,avg,max")
statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
case (sensorId, stats) =>
println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
}
system.terminate()
})
}
}