Search code examples
pythongoogle-cloud-dataflowapache-beam

Dataflow pipe is not starting after defining the pipeline options


I have a below code, which had three methods : zip_extract , get_filepath and data_restructure.

The code should first execute zip_extract which just extracts if there are any zip files in gcp bucket and it will not return anything.

Next it should execute get_file_path which will traverse through the entire bucket and takes all the respective files paths present in it and store it in a list and returns this to data_resturcture.

Data_restructure takes each file path present in that list and checks for whether it is dicom or not and if the file is a dicom it will store in a structure in destination bucket and if the file is not dicom then it will store it in a different hierarchy in destination bucket.

I wrote a dataflow pipeline for this code as below:

    with beam.Pipeline(options=pipeline_options) as p:
        file_paths = (p | "Get File Paths" >> beam.Create(get_file_path()))
        file_paths | "Data Restructure" >> beam.Map(lambda x: data_restructure(x))

, but this is throwing an error message in dataflow log that
The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. Please check the worker logs in Stackdriver Logging. You can also get help with Cloud Dataflow at https://cloud.google.com/dataflow/support."

Main code:

def zip_extract():
    '''
    Function to unzip a folder in a bucket under a specific hierarchy
    '''
    from google.cloud import storage
    client = storage.Client()
    bucket = client.bucket(landing_bucket)   
    blobs_specific = list(bucket.list_blobs(prefix=data_folder))
    for file_name in blobs_specific:        
    
        file_extension = pathlib.Path(file_name.name).suffix 

        try:

            if file_extension==".zip":    
                destination_blob_pathname = file_name.name        
                blob = bucket.blob(destination_blob_pathname)
                zipbytes = io.BytesIO(blob.download_as_string())

                if is_zipfile(zipbytes):
                    with ZipFile(zipbytes, 'r') as myzip:
                        for contentfilename in myzip.namelist():
                            contentfile = myzip.read(contentfilename)             
                            blob = bucket.blob(f'{file_name.name.replace(".zip","")}/{contentfilename}')
                            blob.upload_from_string(contentfile)
                            
            logging.info("Unzip completed")


        except:
            logging.info('Skipping : {} file format found.'.format(file_extension))
            continue

    client.close


def get_file_path():
    '''
    Function to store all the file paths present in landing bucket into a list 
    '''
    zip_extract()
    file_paths = []
    from google.cloud import storage
    client = storage.Client()
    bucket = client.bucket(landing_bucket)
    blobs_specific = list(bucket.list_blobs(prefix=data_folder))

    try:

        for blob in blobs_specific:         
            file_paths.append("gs://{}/".format(landing_bucket)+blob.name)     
        client.close 
        logging.info("List is ready with data")
        return file_paths  
    except Exception as err:
        logging.error("Error while appending data to list : {}".format(err))
        raise       


def data_restructure(line):
    '''
    params line: String which has the file path
    Function to read each file and check if it is a DICOM file or not, if yes,
    store it in Study-Series-SOP hierarchy else store it in Descriptive folder in Intermediate bucket.
    '''
    from google.cloud import storage
    InstanceUID={}
    client = storage.Client()
    destination_bucket = client.bucket(intermediate_bucket)
    cmd = "gsutil cp {} .\local_folder".format(line)
    result = subprocess.run(cmd,shell=True,capture_output=True,text=True)
    file_name=os.listdir(".\local_folder").pop(0) 

    try:
        dicom_data = dcmread(".\local_folder\{}".format(file_name))
        logging.info("Started reading Dicom file")
            
        for element in dicom_data:

            if element.name in ("Study Instance UID","Series Instance UID","SOP Instance UID","Modality"):                   
                InstanceUID[element.name]=element.value

        destination_bucket = client.bucket(intermediate_bucket)
        blob = destination_bucket.blob('Client/Test/DICOM/{}/{}/{}/{}.dcm'.format(list(InstanceUID.values())[1],list(InstanceUID.values())[2],list(InstanceUID.values())[3],list(InstanceUID.values())[0]))
        blob.upload_from_filename(".\local_folder\{}".format(file_name))
        InstanceUID.clear()   
        logging.info("DICOM file {} uploaded into Intermediate Bucket".format(file_name))        
        os.remove(".\local_folder\{}".format(file_name))    

    except Exception as e:
        
        file_extension = file_name.split("/")[-1].split(".")[-1]

        if file_extension != "zip" and "report" not in file_name and file_extension != "":

            blob = destination_bucket.blob('Test/Descriptive/{}'.format(file_name))
            blob.upload_from_filename(".\local_folder\{}".format(file_name))
            logging.info("Stored file into Descriptive folder")
            os.remove(".\local_folder\{}".format(file_name))

        else:

            blob = destination_bucket.blob('Test/Reports/{}'.format(file_name))
            blob.upload_from_filename(".\local_folder\{}".format(file_name))
            logging.info("Stored Report file into Reports folder")
            os.remove(".\local_folder\{}".format(file_name))

    client.close()

def call_main():
    
    parser = argparse.ArgumentParser()
    path_args, pipeline_args = parser.parse_known_args()
    pipeline_options = PipelineOptions(pipeline_args)
    setup_options= pipeline_options.view_as(SetupOptions)
    setup_options.setup_file='./setup.py'
    setup_options.save_main_session=True
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = "dataflow"+re.sub("[^0-9]+", "-", str(datetime.datetime.now()))
    google_cloud_options.service_account_email = "service_email"
    pipeline_options.view_as(StandardOptions).runner = "DataflowRunner"
    google_cloud_options.staging_location = config["staging_location"]
    google_cloud_options.temp_location = config["temp_location"]
    google_cloud_options.region = config["region"]
    pipeline_options.view_as(WorkerOptions).num_workers = 2
    pipeline_options.view_as(WorkerOptions).machine_type = "n1-standard-2"
    pipeline_options.view_as(WorkerOptions).disk_size_gb = 1024
    pipeline_options.view_as(WorkerOptions).network = vpc_name
    pipeline_options.view_as(WorkerOptions).subnetwork = f'regions/{config["region"]}/subnetworks/{subnet_name}'
    pipeline_options.view_as(WorkerOptions).use_public_ips=False
    

    with beam.Pipeline(options=pipeline_options) as p:
        file_paths = (p | "Get File Paths" >> beam.Create(get_file_path()))
        file_paths | "Data Restructure" >> beam.Map(lambda x: data_restructure(x))



if __name__ == '__main__':
    call_main()

setup.py file:


import setuptools

setuptools.setup(
   name='Installing Packages',
   version='1.0.0',
   install_requires=['google-cloud-datastore==1.15.3',
    'google.cloud.storage==1.16.1', 
   'apache-beam[gcp]==2.31.0',  
'google-api-core==1.33.2',
'google-cloud-core==1.7.3',
'google-cloud-logging == 1.15.1',
'pydicom == 2.3.1',
'uuid == 1.30',
'google-cloud-secret-manager',
'psycopg2-binary'],
   packages=setuptools.find_packages())

I'm new to apache_beam and dataflow. Please help me with this. I tried other ways of writing the dataflow pipeline but nothing worked.

Please correct me If I had done anything wrong here.

Kindly tell me if the way I wrote transformations are right or not. If not, please help me the right way.I'm stuck with this not able to progress.

Thanks in advance


Solution

  • This error

    The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. Please check the worker logs in Stackdriver Logging. You can also get help with Cloud Dataflow at https://cloud.google.com/dataflow/support."

    usually happens for issues related to dependency installations (and not related to transforms);

    • You can debug this with looking at the worker startup logs in cloud logging. You are likely to see pip issues with installing dependencies.
    • You can try other forms of dependency management (https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/) - Custom containers would be less error prone.
    • as a side note, there is no need to pin beam sdk version. It will be automatically picked and it can cause errors if you are pinning one version but using a different version locally.