Search code examples
scalaapache-flinkamazon-kinesis

Flink - producing to kinesis not working


I'm trying to run a simple program which reads from one kinesis stream, does a trivial transformation, and writes the result to another kinesis stream.

Running locally on Flink 1.4.0 (this is the version supported on EMR currently, so no way of upgrading).

Here is the code:

def main(args: Array[String]) {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val consumerConfig = new Properties()
  consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
  consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

  val kinesisMaps = env.addSource(new FlinkKinesisConsumer[String](
    "source-stream", new SimpleStringSchema, consumerConfig))

  val jsonMaps = kinesisMaps.map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, String]] }
  val values = jsonMaps.map(jsonMap => jsonMap("field_name"))

  values.print()

  val producerConfig = new Properties()
  producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")

  val kinesisProducer = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
  kinesisProducer.setFailOnError(true)
  kinesisProducer.setDefaultStream("target-stream")
  kinesisProducer.setDefaultPartition("0")

  values.addSink(kinesisProducer)

  // execute program
  env.execute("Flink Kinesis")
}

If I comment out the producing code, the program runs as expected and prints the correct values.

As soon as I add the producer code, I get the following exception:

org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:477)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:248)
    at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:608)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:569)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:264)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:210)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Any idea what's the cause of this?


Solution

  • Apparently, this is an issue with the old version of Amazon KPL which is used in Flink 1.4.

    There are at least two possible solutions for this:

    1. Upgrade to Flink version 1.5. You can still use it on EMR, if you install it as described here, under the section Custom EMR Installation: https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html

    2. When building the Kinesis connector for Flink 1.4, you can build it with newer AWS dependencies: I've cherry-picked the aws dependency changes in pom.xml of the connector from 1.5, and built the connector with them. Looks like it's working as expected.