How to merge two IO Streams in Python?

General Tech Bugs & Fixes 2 years ago

0 2 0 0 0 tuteeHUB earn credit +10 pts

5 Star Rating 1 Rating

Posted on 16 Aug 2022, this text provides information on Bugs & Fixes related to General Tech. Please note that while accuracy is prioritized, the data presented might not be entirely correct or up-to-date. This information is offered for general knowledge and informational purposes only, and should not be considered as a substitute for professional advice.

Take Quiz To Earn Credits!

Turn Your Knowledge into Earnings.

tuteehub_quiz

Answers (2)

Post Answer
profilepic.png
manpreet Tuteehub forum best answer Best Answer 2 years ago

 

I have created a Wrapper around the Spark-Submit command to be able to generate real time events by parsing the logs. The purpose is to create a Real Time interface showing detailed progress of a Spark Job.

So the wrapper will look like this:

  submitter = SparkSubmitter()
  submitter.submit('/path/to/spark-code.py')
  for log_event in submitter:
    if log_event:
      print('Event:', log_event)

And the output will look like the following:

  Event: StartSparkContextEvent()
  Event: StartWorkEvent()
  Event: FinishWorkEvent()
  Event: StopSparkContextEvent()

Internally, the SparkSubmitter class launches the spark-submit command as a subprocess.Popen process, and then iterators over the stdout stream and returns Events by parsing the logs generated by the process, like this:

  class SparkSubmitter():
    def submit(self, path):
        command = self.build_spark_submit_command(path)
      self.process = Popen(command, stdout=PIPE, stderr=PIPE)

    def __iter__(self):
        return self

    def __next__(self):
        # note: this is a IO-Blocking command
        log = self.process.stdout.readline().decode('utf-8') 
      return self.parse_log_and_return_event(log)

This implementation works well with the Spark Standalone Cluster. But I am having a issue when running on a Yarn Cluster.

In the Yarn Cluster the "Spark Related Logs" are coming in the stderr, instead of stdout. So my class is not able to parse the spark generated logs because it is only trying to read the stdout.

Question 1: Is it possible to read Popen's stdout and stderr as a single stream?

Question 2: As stdout and stderr are both Streams, is it possible to merge both the streams and read them as one?

Question 3: Is it possible to redirect all the logs to only stdout?

profilepic.png
manpreet 2 years ago

The answer to all 3 of your questions is yes, you can use stderr=subprocess.STDOUT as an argument to Popen to redirect output from stderr to stdout:

self.process = Popen(command, stdout=PIPE, stderr=subprocess.STDOUT)

0 views   0 shares

No matter what stage you're at in your education or career, TuteeHub will help you reach the next level that you're aiming for. Simply,Choose a subject/topic and get started in self-paced practice sessions to improve your knowledge and scores.