Search code examples
jsonscalaakkaakka-streamakka-http

How to unmarshall json response removing unnecessary fields using Akka HTTP


I'm new to Akka HTTP and I want to get rid of unnecessary fields from a JSON response and take only the necessary fields. For example, I use this endpoint to get the response and it contains a bunch of fields. For the moment I only need 'name' and 'versions'. I would like to know how to deserialize this into a case class containing only 'name' and 'versions'. I coded the following lines to get the response as a string.

import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.{Failure, Success}

object SoftwareRegistry extends App {

  implicit val system = ActorSystem("NPMRegistry")
  implicit val materializer = ActorMaterializer()

  import system.dispatcher

  case class NPMPackage(name: String)

  // reading the packages
  val filename = "B:\\Scala\\NPMRegistry\\src\\main\\resources\\packages.txt"
  val bufferedSource = scala.io.Source.fromFile(filename)
  val listOfPackages: List[NPMPackage] = (for (line <- bufferedSource.getLines) yield {
    NPMPackage(line.trim)
  }).toList
  bufferedSource.close()

  // source
  val sourceList = Source(listOfPackages)

  // sink
  val sink = Sink.foreach[NPMPackage] { p =>
    // https request
    val responseFuture: Future[HttpResponse] =
      Http().singleRequest(HttpRequest(uri = s"https://registry.npmjs.org/${p.name}"))
    val x = responseFuture
      .flatMap(_.entity.toStrict(2 seconds))
      .map(_.data.utf8String)
    x.onComplete {
      case Success(res) => println(res)
      case Failure(_) => sys.error("Something went wrong")
    }
  }

  // flow to slow things down and streaming sink to time-delayed operations
  val bufferedFlow = Flow[NPMPackage]
    .buffer(10, overflowStrategy = OverflowStrategy.backpressure)
    .throttle(1, 3 seconds)

  sourceList.async
    .via(bufferedFlow).async
    .to(sink)
    .run()
}

And it prints the following output enter image description here


Solution

  • For parsing json you need to use some library. In akka-http docs they use spray-json. Add the following dependency to your build.sbt with appropriate akkaHttpVersion.

    "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion
    

    Now you need serializers and deserializers for your data. I am using a simple model, change it as needed.

    trait Formatter extends DefaultJsonProtocol {
    
      implicit object jsonFormat extends JsonFormat[Versions] {
        override def read(json: JsValue): Versions = json match {
          case JsObject(fields) =>
            Versions(fields.keys.toList)
        }
    
        override def write(obj: Versions): JsValue = JsonParser(obj.toString)
      }
    
      implicit val formatterPackage: RootJsonFormat[Package] = jsonFormat2(Package)
    
      case class Package(name: String, versions: Versions)
    
      case class Versions(versions: List[String])
    }
    

    Finally sink:

     //needed import with others
     import spray.json._
    
     object SoftwareRegistry extends App  with Formatter {
    
       //existing code
       //---------
    
    
       val sink = Sink.foreach[NPMPackage] { p =>
           // https request
           val responseFuture: Future[HttpResponse] =
             Http().singleRequest(HttpRequest(uri = s"https://registry.npmjs.org/${p.name}"))
           val packages = responseFuture
             .flatMap(
               _.entity
                 .dataBytes
                 .via(JsonFraming.objectScanner(Int.MaxValue))
                 .map(_.utf8String)
                 .map(_.parseJson.convertTo[Package])
                 .toMat(Sink.seq)(Keep.right)
                 .run()
             )
    
           packages.onComplete {
             case Success(res) => println(res)
             case Failure(_) => sys.error("Something went wrong")
           }
       }
    
       //existing code
       //---------
    }