Search code examples
airflowgoogle-cloud-composerairflow-2.xairflow-taskflow

GCSToGCSOperator not moving files in same bucket


I am trying to move files from my "new" folder in my-bucket to "test" folder in the same bucket using GCSToGCSOperator, I am passing a list of files to the source bucket but when I run dag I don't see files moving. I used xcom push to get list of file names from the folder new in my bucket

def segregate_files(ti):
    source_bucket = "my-bucket"
    PROJECT_ID="project"
    destination_bucket = "my-bucket"
    
    
    source_files=ti.xcom_pull(task_ids='list_file')
    client = storage.Client(project=PROJECT_ID)
    bucket = client.get_bucket(source_bucket)
    
    
    
    print(source_files) #['new/abc.txt','new/bcd.txt','new/abc_bcd.txt']


    new_files = [file for file in data if 'abc' in file]
    print(new_files) #['new/abc.txt','new/abc_bcd.txt']
    
    
    if new_files:
        task_id = 'move_new_files'
        move_abc_files = GCSToGCSOperator(
            task_id=task_id,
            source_bucket=source_bucket,
            source_objects=new_files,
            destination_bucket=destination_bucket,
            destination_object="test/",
            move_object=True,
            dag=dag
        )

bucket_files = GCSListObjectsOperator(
        task_id='list_file',
        bucket='my-bucket',
        prefix='new/',
        delimiter='.txt',
        do_xcom_push=True,
        dag=dag
    )

Solution

  • As a workaround you can consider the below code as an example:

    import os
    from google.cloud import storage
    
    source_bucket_name = 'my-bucket'
    source_folder_name = 'new'
    destination_bucket_name = 'my-bucket'
    destination_folder_name = 'old'
    
    
    def copy_files(ti):
        file_list=ti.xcom_pull(task_ids='listing_files')
        storage_client = storage.Client()
        source_bucket = storage_client.get_bucket(source_bucket_name)
        destination_bucket = storage_client.get_bucket(destination_bucket_name)
    
        for filename in file_list:
            source_blob = source_bucket.blob(f'{filename}')
            output_filename=os.path.basename(filename)
            destination_blob = destination_bucket.blob(f'{destination_folder_name}/{output_filename}')
            destination_blob.upload_from_string(source_blob.download_as_string())
    
        print(f'Files copied successfully.{file_list}')
        
    
    with DAG(dag_id='final',start_date=datetime(2021, 4, 5, 15, 0),schedule_interval='@daily',catchup=False) as dag:
        t1 = BashOperator(task_id='started', bash_command='echo starting…')
        t2 = GoogleCloudStorageListOperator(task_id='listing_files',bucket='my-bucket',prefix='new/',delimiter='.txt')
        t3 = PythonOperator(task_id="copying_files",python_callable=copy_files)
        t4 = BashOperator(task_id='end', bash_command='echo end')
    
        t1>>t2>>t3>>t4    
    

    DAG :

    enter image description here

    NB: I tested the above code for txt and py files. It worked fine for me.