I am trying to use the alpakka kinesis connector to send messages to a Kinesis Stream but I have no success with it. I tried the code below but nothing in my stream.
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
println(reqEntry)
reqEntry
}
val entry = new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap("Hello World".getBytes))
.withPartitionKey(Random.nextInt.toString)
Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()
// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
Sink.foreach(println)
instead of KinesisSink
prints out the PutRecordsRequestEntry
every 1 second => EXPECTEDKinesisSink
, the entry is generated only once.What Am I doing wrong ?
I am checking my stream with a KinesisSource
and reading is working ( tested with another stream)
Also the monitoring dashboard of AWS Kinesis doesnt show any PUT requests.
Note 1: I tried to enable the debug log of alpakka but with no effect
<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
in my logback.xml
+ debug on root level
The problem was an access denied / permission on the action on the stream.
I had to add the akka actor config for logging
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
logger-startup-timeout = "30s"
}
to see debug lines and I actually run in debug and step in each stage.
It required permission "PutRecords" in the IAM role