I tried to export the data to CSV in BigQuery, by default it generate the export file with LF as line termination,
export data options(uri = 'gs://<bucket>/<file_name>.txt', format = 'CSV',overwrite = true, header = false, field_delimiter = '~') as select * from <table> order by <col>
Is it possible to export the data with CRLF as line termination by providing some parameters in export data in BigQuery? Kindly share your thoughts
EDIT : requirement is to achieve the expected results through AIRFLOW, if any direct way to achieve this by adding any BQ EXPORT param, that would be great. if not, by using PYTHON Operator or BASH operator in Airflow, would be helpful
There are two ways, we can mitigate this issue.. As I am looking for solution to handle it within Airflow
Solution one -> Python operator with Bash command
path='gs://file_path'
file_name='file_name'
retrieve_files_command = f'gsutil ls {path}{file_name}*.txt'
process = Popen(retrieve_files_command, stdout=PIPE, shell=True)
#retrieve list of file that need to remove LF to CRLF
files, _ = process.communicate()
files = files.decode().split('\n')[:-1]
#loop through each file to remove LF with CRLF and create a new GCS file
for file in files:
filename = file.split('/')[-1]
dt=datetime.now().strftime("%Y%m%d%H%M%S")
modify_files_command = f"gsutil cat {file} | sed -e 's/$/\\r/' | gsutil cp - gs://<destination_path>/file_name_{dt}.txt"
try:
subprocess.run(modify_files_command,shell=True, check=True)
except subprocess.CalledProcessError as e:
print("{file} is not converted due to {e.stderr.decode('utf-8')}")
Solution two -> Python operator with buffer size.. NOTE : Retrieve the list of files as like solution one
buf_size = 4096
for input_file in files:
in_blob = bucket.blob(input_file)
op_prefix_name='destination_path'
dt=datetime.now().strftime("%Y%m%d%H%M%S%f")
op_fn='file_name' + dt
print("input is " + input_file)
print("otp is " + op_prefix_name + op_fn)
op_blob=bucket.blob(op_prefix_name + op_fn)
with in_blob.open('r') as in_file, op_blob.open('wt') as opt_file:
while True:
chunk = in_file.read(buf_size)
if not chunk:
break
opt_file.write(chunk.replace('\n','\r\n'))
In my case, solution two fits into my environment as the Astro may get strucked due to gsutil cat command, due to file size constraint, where as solution two is performant due to buffer size, tweak your buffer size as required