Search code examples
pythonpickleapache-beamdataflowdill

File being overwritten after closed when running a beam pipeline on DataFlow


I have created a beam pipeline p to be run on dataflow and want to write something to a file before running my pipeline. My code is:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import time

pipeline_options = PipelineOptions(runner='DirectRunner')
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

myString = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."

myFile3984573498534 = open('myfile2398457erity348t67349856734986739846.txt','w+')
myFile3984573498534.write(myString*100)
myFile3984573498534.close()

time.sleep(1)

r = p.run()

The file is being correctly written but then it is being overwritten to be blank as soon as p.run() is called. Can anyone explain why this is happening?

NOTES:

  • Changing the file name and the files variable name doesn't affect the outcome.
  • I have inserted the time.sleep(1) so that the file can be viewed to be written before p.run() is called and the file is overwritten to be blank. This is not necessary and can be changed/removed.

Solution

  • The problem comes as a result of the pipeline_options.view_as(SetupOptions).save_main_session = True line.

    When the pipeline runs, beam will use dill.dump_session to serialize the main session and save it to a file. It will then use dill.load_session to load that same file and deserialize it to recreate the main session. It will the reserialize the main session again using dill.dump_session to send off to the runner. The reason for serializing, deserializing, and then reserializing the main session is to fix an inconsistency in the serialization, as raised in https://github.com/uqfoundation/dill/issues/195. This means that all runners will have this issue.

    The main session in this case contains the myFile3984573498534 file object. When this is deserialized, it will re-open the file in the same way that you opened it initially, using the w+ mode. This will immediately overwrite the file. This file is then closed, and the pipeline ends with the file blank.

    The best fix for this is to open the file in r+ mode so that the file is opened in read mode during deserialization of the main session, resulting in it not being modified.

    If you need to open the file in w+ mode, you should delete the variable storing the file after closing the file, i.e. del(myFile3984573498534) after myFile3984573498534.close() but before running the pipeline. This prevents the variable from being serialized because it no longer exists, resulting in the file not being modified.