Search code examples
akka-streamalpakka

Alpakka KinesisSink : Can not push messages to Stream


I am trying to use the alpakka kinesis connector to send messages to a Kinesis Stream but I have no success with it. I tried the code below but nothing in my stream.

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()


val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
    println(reqEntry)
    reqEntry
}

val entry = new PutRecordsRequestEntry()
    .withData(ByteBuffer.wrap("Hello World".getBytes))
    .withPartitionKey(Random.nextInt.toString)

Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()

// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
  • Using a Sink.foreach(println) instead of KinesisSink prints out the PutRecordsRequestEntry every 1 second => EXPECTED
  • Using KinesisSink, the entry is generated only once.

What Am I doing wrong ?

I am checking my stream with a KinesisSource and reading is working ( tested with another stream)

Also the monitoring dashboard of AWS Kinesis doesnt show any PUT requests.

Note 1: I tried to enable the debug log of alpakka but with no effect

<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>

in my logback.xml + debug on root level


Solution

  • The problem was an access denied / permission on the action on the stream.

    I had to add the akka actor config for logging

    akka {
      loggers = ["akka.event.slf4j.Slf4jLogger"]
      loglevel = "DEBUG"
      stdout-loglevel = "DEBUG"
      logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
      logger-startup-timeout = "30s"
    }
    

    to see debug lines and I actually run in debug and step in each stage.

    It required permission "PutRecords" in the IAM role