Search code examples
amazon-web-servicesscalaakkaavroakka-stream

How can I parse stream of avro byteString in akka stream


I am having Avro files in the s3 bucket and trying to stream and parse into a case class. I have the schema to parse but don't know how to proceed with it.

I use s3.download to download and stream the file from the s3 bucket and then convert it to utf8string.

Please assist, how can I parse with the schema we have considering the input stream I am getting.


Solution

  • I am gonna answer this question based on what you asked to use the schema to (de)serialize messages using Avro.

    I have the schema to parse but don't know how to proceed with it.

    and assume that you already download messages from s3.buckets. Then I am gonna use my example that I persist messages on PostgreSQL, just for the sake of a working example. But you can assume your s3.bucket connection.

    I am using the com.sksamuel.avro4s lib to create my Avro serializer. Here is the necessary libs to put in the build.sbt:

      val akkaVersion = "2.6.10"
      "com.typesafe.akka" %% "akka-actor" % akkaVersion,
      "com.typesafe.akka" %% "akka-persistence" % akkaVersion,
      "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4",
      "org.xerial.snappy" % "snappy-java" % "1.1.8.2",
      "org.postgresql" % "postgresql" % "42.2.2",
      "com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",
    

    Then you create your serializer, which in my case is the MyFirstAvroSerializer, extending akka.serialization.Serializer. It has a schema that in my case is the case class CompanyRegistry. Basically, you have to implement the methods identifier that has to have a unique ID, toBinary and fromBinary to convert the messages, and the includeManifest which is false because I don't need a manifest.

    import akka.serialization.Serializer
    import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
    import com.typesafe.config.ConfigFactory
    
    import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
    
    case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String)
    case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double)
    
    class MyFirstAvroSerializer extends Serializer {
      val schema = AvroSchema[CompanyRegistry]
      override def identifier: Int = 454874
      override def toBinary(o: AnyRef): Array[Byte] = o match {
        case c: CompanyRegistry =>
          val baos = new ByteArrayOutputStream()
          val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
          avroOutputStream.write(c)
          avroOutputStream.flush()
          avroOutputStream.close()
          baos.toByteArray
        case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
      }
      override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
        val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
        val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
        val companyRegistry = companyRegistryIterator.next()
        avroInputStream.close()
        companyRegistry
      }
      override def includeManifest: Boolean = false
    }
    

    Then you have to configure your project to call this serializer when exchanging akka messages among actors. Configure it on the application.conf by adding a specific config. In my case is the avroSerializable. You set the MyFirstAvroSerializer under the serializers scope and the case classes under the serialization-bindings scope. I also configured for Akka-remote but you can ignore it.

    avroSerializable {
      akka {
        actor {
          provider = remote
          #allow-java-serialization = off
          serializers {
            java = "akka.serialization.JavaSerializer"
            avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
          }
          serialization-bindings {
            "org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
            "java.io.Serializable" = java
          }
        }
        remote {
          artery {
            enabled = on
            transport = aeron-udp
            canonical.hostname = "localhost"
          }
        }
      }
    }
    

    Like I said in the beginning, I am using PostgreSQL. But in your case, it will be the s3 bucket storage configuration. I am gonna leave here just for the sake of completeness and because I call this configuration when I am creating the actor system.

    postgresStore {
      akka.persistence.journal.plugin = "jdbc-journal"
      akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
      akka.actor.allow-java-serialization = on
      # create JDBC configuration to Akka persistence
      akka-persistence-jdbc {
        shared-databases {
          slick {
            profile = "slick.jdbc.PostgresProfile$"
            db {
              numThreads = 10
              driver = "org.postgresql.Driver"
              url = "jdbc:postgresql://localhost:5432/rtjvm"
              user = "docker"
              password = "docker"
            }
          }
        }
      }
      # dbinding the JDBC plugins with the configureation created above
      jdbc-journal {
        use-shared-db = "slick"
      }
      jdbc-snapshot-store {
        use-shared-db = "slick"
      }
    }
    

    Now it is time to create the actor system and a simple actor SimplePersistentActor and send a message over the wire. SimplePersistentActor is just a very simple actor that accepts the messages that I am sending, nothing very fancy.

    object AvroSerialization_Persistence {
      def main(args: Array[String]): Unit = {
        val config = ConfigFactory.load().getConfig("postgresStore")
          .withFallback(ConfigFactory.load("avroSerializable"))
        val system = ActorSystem("postgresStoreSystem", config)
        val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor")
        val companyRegistryMsg = CompanyRegistry(
          "Google",
          Seq(
            BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"),
            BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds")
          ),
          "ads",
          523895
        )
        simplePersistentActor ! companyRegistryMsg
      }
    }