Search code examples
pythongoogle-cloud-platformunixgoogle-bigqueryairflow

GCP : Bigquery to export data with CRLF as line termination


I tried to export the data to CSV in BigQuery, by default it generate the export file with LF as line termination,

export data options(uri = 'gs://<bucket>/<file_name>.txt', format = 'CSV',overwrite = true, header = false, field_delimiter = '~') as select * from <table> order by <col>

Is it possible to export the data with CRLF as line termination by providing some parameters in export data in BigQuery? Kindly share your thoughts

EDIT : requirement is to achieve the expected results through AIRFLOW, if any direct way to achieve this by adding any BQ EXPORT param, that would be great. if not, by using PYTHON Operator or BASH operator in Airflow, would be helpful


Solution

  • There are two ways, we can mitigate this issue.. As I am looking for solution to handle it within Airflow

    Solution one -> Python operator with Bash command

    path='gs://file_path'
    file_name='file_name'
    retrieve_files_command = f'gsutil ls {path}{file_name}*.txt'
    process = Popen(retrieve_files_command, stdout=PIPE, shell=True)
    
    #retrieve list of file that need to remove LF to CRLF
    files, _ = process.communicate()
    
    files = files.decode().split('\n')[:-1]
    
    #loop through each file to remove LF with CRLF and create a new GCS file
    for file in files:
        filename = file.split('/')[-1]
        dt=datetime.now().strftime("%Y%m%d%H%M%S")
        modify_files_command = f"gsutil cat {file} | sed -e 's/$/\\r/' | gsutil cp - gs://<destination_path>/file_name_{dt}.txt"
        
        try:
            subprocess.run(modify_files_command,shell=True, check=True)
        except subprocess.CalledProcessError as e:
            print("{file} is not converted due to {e.stderr.decode('utf-8')}")
    

    Solution two -> Python operator with buffer size.. NOTE : Retrieve the list of files as like solution one

    buf_size = 4096
    for input_file in files:
        in_blob = bucket.blob(input_file)
        op_prefix_name='destination_path'
        dt=datetime.now().strftime("%Y%m%d%H%M%S%f")
        op_fn='file_name' + dt
        print("input is " + input_file)
        print("otp is " + op_prefix_name + op_fn)
        op_blob=bucket.blob(op_prefix_name + op_fn)
        with in_blob.open('r') as in_file, op_blob.open('wt') as opt_file:
          while True:
              chunk = in_file.read(buf_size)
              if not chunk:
                  break
              opt_file.write(chunk.replace('\n','\r\n'))
    

    In my case, solution two fits into my environment as the Astro may get strucked due to gsutil cat command, due to file size constraint, where as solution two is performant due to buffer size, tweak your buffer size as required