Search code examples
pythonapache-kafkaapache-flinkapache-beam

KafkaRecord cannot be cast to [B


I'm trying to process the data streaming from Apache Kafka using the Python SDK for Apache Beam with the Flink runner. After running Kafka 2.4.0 and Flink 1.8.3, I follow these steps:

1) Compile and run Beam 2.16 with Flink 1.8 runner.

git clone --single-branch --branch release-2.16.0 https://github.com/apache/beam.git beam-2.16.0
cd beam-2.16.0
nohup ./gradlew :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081 &

2) Run the Python pipeline.

from apache_beam import Pipeline
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions


if __name__ == '__main__':
    with Pipeline(options=PipelineOptions([
        '--runner=FlinkRunner',
        '--flink_version=1.8',
        '--flink_master_url=localhost:8081',
        '--environment_type=LOOPBACK',
        '--streaming'
    ])) as pipeline:
        (
            pipeline
            | 'read' >> ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['test'])  # [BEAM-3788] ???
        )
        result = pipeline.run()
        result.wait_until_finish()

3) Publish some data to Kafka.

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>{"hello":"world!"}

The Python script throws this error:

[flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-USER-somejob. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: xxx)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
        at org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:360)
        at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:310)
        at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:173)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:104)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:80)
        at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:78)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
        ... 13 more
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
        at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
        at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
        at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
        at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
        at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.copy(CoderTypeSerializer.java:67)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:341)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:283)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
        at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        ... 1 more
ERROR:root:java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService - Manifest at/tmp/artifacts0k1mnin0/somejob/MANIFEST has 0 artifact locations
[flink-runner-job-invoker] INFO org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService - Removed dir /tmp/artifacts0k1mnin0/job_somejob/
Traceback (most recent call last):
  File "main.py", line 40, in <module>
    run()
  File "main.py", line 37, in run
    result.wait_until_finish()
  File "/home/USER/beam/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py", line 439, in wait_until_finish self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline BeamApp-USER-somejob failed in state FAILED: java.lang.ClassCastException: org.apache.beam.sdk.io.kafka.KafkaRecord cannot be cast to [B

I tried other deserializers available in Kafka but they did not work: Couldn't infer Coder from class org.apache.kafka.common.serialization.StringDeserializer. This error is originating from this piece of code.

Am I doing something wrong?


Solution

  • Disclaimer: this is my first encounter with Apache Beam project.

    It seems that Kafka consumer support is quite fresh thing in Beam (at least in Python interface) according to this JIRA. Apparently, it seems that there is still problem with FlinkRunner combined with this new API. Even though your code is technically correct it will not run correctly on Flink. There is a patch available which seems more like a quickfix than final solution to me. It requires recompilation and thus is not something I would propose using on production. If you are just getting started with technology and don't want to be blocked then feel free to try it out.