Search code examples
pythonapache-kafkaapache-beamapache-beam-io

ReadFromKafka throws ValueError: Unsupported signal: 2


Currently I try to get the hang of apache beam together with apache kafka.

The Kafka service is running (locally) and I write with the kafka-console-producer some test messages.

First I wrote this Java Codesnippet to test apache beam with a language that I know. And it works as expected.

public class Main {

  public static void main(String[] args) {

    Pipeline pipeline = Pipeline.create();

    Read<Long, String> kafkaReader = KafkaIO.<Long, String>read()
        .withBootstrapServers("localhost:9092")
        .withTopic("beam-test")
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class);

    kafkaReader.withoutMetadata();

    pipeline
        .apply("Kafka", kafkaReader
        ).apply(
          "Extract words", ParDo.of(new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
          public void processElement(ProcessContext c){
              System.out.println("Key:" + c.element().getKV().getKey() + " | Value: " + c.element().getKV().getValue());
            }
        })
    );

    pipeline.run();
  }
}

My goal is to write that same in python and this is what I´m currently at:

def run_pipe():
    
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p
        | 'Kafka Unbounded' >> ReadFromKafka(consumer_config={'bootstrap.servers' : 'localhost:9092'}, topics=['beam-test'])
        | 'Test Print' >> beam.Map(print)
        )

if __name__ == '__main__':
    run_pipe()

Now to the problem. When I try to run the python code, I get the following error:

(app) λ python ArghKafkaExample.py 
Traceback (most recent call last):
  File "ArghKafkaExample.py", line 22, in <module>
    run_pipe()
  File "ArghKafkaExample.py", line 10, in run_pipe
    (p
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 1028, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 572, in __ror__
    result = p.apply(self, pvalueish, label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 648, in apply
    return self.apply(transform, pvalueish)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 691, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 198, in apply
    return m(transform, input, options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 228, in apply_PTransform
    return transform.expand(input)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 322, in expand
    self._expanded_components = self._resolve_artifacts(
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 120, in __exit__
    next(self.gen)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 372, in _service
    yield stub
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 523, in __exit__
    self._service_provider.__exit__(*args)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 74, in __exit__
    self.stop()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 133, in stop
    self.stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 179, in stop_process
    return super(JavaJarServer, self).stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 143, in stop_process
    self._process.send_signal(signal.SIGINT)
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\subprocess.py", line 1434, in send_signal
    raise ValueError("Unsupported signal: {}".format(sig))
ValueError: Unsupported signal: 2

From googling I found out, that it has something to do with program exit codes (like Strg+C), but overall I have absolut no idea what the problem is.

Any advice would be helpful!

Greetings Pascal


Solution

  • Your pipeline code seems correct here. The issue is due to the requirements of the Kafka IO in the Python SDK. From the module documentation:

    These transforms are currently supported by Beam portable runners (for example, portable Flink and Spark) as well as Dataflow runner.

    Transforms provided in this module are cross-language transforms implemented in the Beam Java SDK. During the pipeline construction, Python SDK will connect to a Java expansion service to expand these transforms. To facilitate this, a small amount of setup is needed before using these transforms in a Beam Python pipeline.

    Kafka IO is implemented in Python as a cross-language transform in Java and your pipeline is failing because you haven't set up your environment to execute cross-language transforms. To explain what a cross-language transform is in layman's terms: it means that the Kafka transform is actually executing on the Java SDK rather than the Python SDK, so it can make use of the existing Kafka code on Java.

    There are two barriers preventing your pipeline from working. The easier one to fix is that only the runners I quoted above support cross-language transforms, so if you're running this pipeline with the Direct runner it won't work, you'll want to switch to either the Flink or Spark runner in local mode.

    The more tricky barrier is that you need to start up an Expansion Service to be able to add external transforms to your pipeline. The stacktrace you're getting is happening because Beam is attempting to expand the transform but is unable to connect to the expansion service, and the expansion fails.

    If you still want to try running this with cross-language despite the extra setup, the documentation I linked contains instructions for running an expansion service. At the time I am writing this answer this feature is still new, and there might be blind spots in the documentation. If you run into problems, I encourage you to ask questions on the Apache Beam users mailing list or Apache Beam slack channel.