I am having Avro files in the s3 bucket and trying to stream and parse into a case class. I have the schema to parse but don't know how to proceed with it.
I use s3.download
to download and stream the file from the s3 bucket and then convert it to utf8string.
Please assist, how can I parse with the schema we have considering the input stream I am getting.
I am gonna answer this question based on what you asked to use the schema to (de)serialize messages using Avro.
I have the schema to parse but don't know how to proceed with it.
and assume that you already download messages from s3.buckets. Then I am gonna use my example that I persist messages on PostgreSQL, just for the sake of a working example. But you can assume your s3.bucket connection.
I am using the com.sksamuel.avro4s lib to create my Avro serializer. Here is the necessary libs to put in the build.sbt
:
val akkaVersion = "2.6.10"
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.4",
"org.xerial.snappy" % "snappy-java" % "1.1.8.2",
"org.postgresql" % "postgresql" % "42.2.2",
"com.github.dnvriend" %% "akka-persistence-jdbc" % "3.4.0",
Then you create your serializer, which in my case is the MyFirstAvroSerializer
, extending akka.serialization.Serializer
. It has a schema that in my case is the case class CompanyRegistry
. Basically, you have to implement the methods identifier
that has to have a unique ID, toBinary
and fromBinary
to convert the messages, and the includeManifest
which is false because I don't need a manifest.
import akka.serialization.Serializer
import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream, AvroSchema}
import com.typesafe.config.ConfigFactory
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
case class BankAccount(iban: String, bankCode: String, amount: Double, currency: String)
case class CompanyRegistry(name: String, accounts: Seq[BankAccount], activityCode: String, marketCap: Double)
class MyFirstAvroSerializer extends Serializer {
val schema = AvroSchema[CompanyRegistry]
override def identifier: Int = 454874
override def toBinary(o: AnyRef): Array[Byte] = o match {
case c: CompanyRegistry =>
val baos = new ByteArrayOutputStream()
val avroOutputStream = AvroOutputStream.binary[CompanyRegistry].to(baos).build() // schema
avroOutputStream.write(c)
avroOutputStream.flush()
avroOutputStream.close()
baos.toByteArray
case _ => throw new IllegalArgumentException(s"we only support CompanyRegistry for Avro")
}
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
val avroInputStream = AvroInputStream.binary[CompanyRegistry].from(new ByteArrayInputStream(bytes)).build(schema)
val companyRegistryIterator: Iterator[CompanyRegistry] = avroInputStream.iterator
val companyRegistry = companyRegistryIterator.next()
avroInputStream.close()
companyRegistry
}
override def includeManifest: Boolean = false
}
Then you have to configure your project to call this serializer when exchanging akka messages among actors. Configure it on the application.conf
by adding a specific config. In my case is the avroSerializable
. You set the MyFirstAvroSerializer
under the serializers
scope and the case classes under the serialization-bindings
scope. I also configured for Akka-remote but you can ignore it.
avroSerializable {
akka {
actor {
provider = remote
#allow-java-serialization = off
serializers {
java = "akka.serialization.JavaSerializer"
avro = "org.github.felipegutierrez.explore.akka.classic.remote.serialization.MyFirstAvroSerializer"
}
serialization-bindings {
"org.github.felipegutierrez.explore.akka.classic.remote.serialization.CompanyRegistry" = avro
"java.io.Serializable" = java
}
}
remote {
artery {
enabled = on
transport = aeron-udp
canonical.hostname = "localhost"
}
}
}
}
Like I said in the beginning, I am using PostgreSQL. But in your case, it will be the s3 bucket storage configuration. I am gonna leave here just for the sake of completeness and because I call this configuration when I am creating the actor system.
postgresStore {
akka.persistence.journal.plugin = "jdbc-journal"
akka.persistence.snapshot-store.plugin = "jdbc-snapshot-store"
akka.actor.allow-java-serialization = on
# create JDBC configuration to Akka persistence
akka-persistence-jdbc {
shared-databases {
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
numThreads = 10
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/rtjvm"
user = "docker"
password = "docker"
}
}
}
}
# dbinding the JDBC plugins with the configureation created above
jdbc-journal {
use-shared-db = "slick"
}
jdbc-snapshot-store {
use-shared-db = "slick"
}
}
Now it is time to create the actor system and a simple actor SimplePersistentActor
and send a message over the wire. SimplePersistentActor
is just a very simple actor that accepts the messages that I am sending, nothing very fancy.
object AvroSerialization_Persistence {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.load().getConfig("postgresStore")
.withFallback(ConfigFactory.load("avroSerializable"))
val system = ActorSystem("postgresStoreSystem", config)
val simplePersistentActor = system.actorOf(SimplePersistentActor.props("avro-actor"), "personAvroActor")
val companyRegistryMsg = CompanyRegistry(
"Google",
Seq(
BankAccount("US-1234", "google-bank", 4.3, "gazillion dollars"),
BankAccount("GB-4321", "google-bank", 0.5, "trillion pounds")
),
"ads",
523895
)
simplePersistentActor ! companyRegistryMsg
}
}