Search code examples
pythonstreamspark-submit

Popen: redirect stderr and stdout to single stream


I have created a Wrapper around the Spark-Submit command to be able to generate real time events by parsing the logs. The purpose is to create a Real Time interface showing detailed progress of a Spark Job.

So the wrapper will look like this:

  submitter = SparkSubmitter()
  submitter.submit('/path/to/spark-code.py')
  for log_event in submitter:
    if log_event:
      print('Event:', log_event)

And the output will look like the following:

  Event: StartSparkContextEvent()
  Event: StartWorkEvent()
  Event: FinishWorkEvent()
  Event: StopSparkContextEvent()

Internally, the SparkSubmitter class launches the spark-submit command as a subprocess.Popen process, and then iterators over the stdout stream and returns Events by parsing the logs generated by the process, like this:

  class SparkSubmitter():
    def submit(self, path):
        command = self.build_spark_submit_command(path)
      self.process = Popen(command, stdout=PIPE, stderr=PIPE)

    def __iter__(self):
        return self

    def __next__(self):
        # note: this is a IO-Blocking command
        log = self.process.stdout.readline().decode('utf-8') 
      return self.parse_log_and_return_event(log)

This implementation works well with the Spark Standalone Cluster. But I am having a issue when running on a Yarn Cluster.

In the Yarn Cluster the "Spark Related Logs" are coming in the stderr, instead of stdout. So my class is not able to parse the spark generated logs because it is only trying to read the stdout.

Question 1: Is it possible to read Popen's stdout and stderr as a single stream?

Question 2: As stdout and stderr are both Streams, is it possible to merge both the streams and read them as one?

Question 3: Is it possible to redirect all the logs to only stdout?


Solution

  • The answer to all 3 of your questions is yes, you can use stderr=subprocess.STDOUT as an argument to Popen to redirect output from stderr to stdout:

    self.process = Popen(command, stdout=PIPE, stderr=subprocess.STDOUT)