Search code examples
pythonpostgresqlboto3amazon-rdsgzip

Copying malformed gzipped S3 object to RDS or efficiently changing line terminator


I have an S3 object from a third-party data provider who has (by whatever mechanism) inconsistently written this object. The header row is terminated by a newline \n but every subsequent row is terminated by a carriage return+line feed \r\n. This object is gzipped csv. I want to copy this to our S3 bucket from theirs, and then copy that object into a PostgreSQL RDS table using the aws_s3 extensions. These extensions can usually handle gzipped csv files, but because the header row has a different line terminator than the remaining rows, a copy fails with the following error even with copy options '(format csv, delimiter '','', header true)':

psycopg2.errors.BadCopyFileFormat: unquoted carriage return found in data
HINT:  Use quoted CSV field to represent carriage return.

Ultimately, I gather from the PostgreSQL source code that this is because of the inconsistency in line terminators; the PostgreSQL engine looks to the first row to determine line terminator even with header true.

So my first question is, is my assessment here true, or is there a way to instruct PostgreSQL COPY to handle a csv file where the header has a newline character at the end but a carriage return ending all other lines?

Assuming that it cannot, I now need to normalize the line terminator before I write this object out to S3. From boto3's get_object I have a bunch of bytes:

resp = s3_client.get_object(Bucket="the-source-bucket", Key="location/of/the_object.csv.gz")
body_bytes = resp["Body"].read()

And I want to replace any instance of \r\n with just \n before I do something like

s3_client.put_object(
  Bucket="the-destination-bucket", 
  Key="/location/of/the/output.csv[.gz?]", 
  Body=<body_bytes with the correct line terminator everywhere>
)

What is the most efficient way to do this?


Solution

  • As pointed out in the comments, RDS is not able to deal with inconsistent line terminators, so I do have to normalize them beforehand. Something like this is what I ended up doing:

    resp = s3_client.get_object(
      Bucket="the-source-bucket",
      Key="location/of/the_object.csv.gz")
    )
    body = resp["Body"].read()
    
    # This line splits on any line terminator,
    # whether or not it is consistent between lines
    lines = gzip.decompress(body).splitlines()
    
    s3_client.put_object(
      ContentEncoding="gzip",
      # Now we write the object with a consistent line terminator
      Body=gzip.compress(b"\n".join(lines)),
      Bucket="the-destination-bucket",
      Key="location/of/the_object.csv.gz"
    )