Search code examples
pythongoogle-cloud-platformapache-beamdataflowdata-pipeline

Implement a custom coder for apache_beam Python SDK version > 2.24


i've been working on my data engineering stuff using apache_beam sdk for python. I used the 2.24 version. I have some issue with a custom coder class i created when upgrading the apache_beam version to 2.31. The custom coder class name is IgnoreUnicode. So, this is my pipeline code:

branchessap_data = (p | 'ReadData branchessap' >> beam.io.ReadFromText(branchessap, skip_header_lines =1, coder=IgnoreUnicode())
                            | 'SplitData branchessap' >> beam.Map(lambda x: x.split('|'))
                            | 'FormatToDict branchessap' >> beam.Map(lambda x: {"branch_id": x[0], "branch_sap": x[1], "branch_name": x[2], "branch_profile": x[3]})
                            | 'ChangeDataType branchessap' >> beam.Map(convert_types_branchessap)
                            | 'DELETE UNWANTED DATA BRANCHESSAP' >> beam.Map(del_unwanted_cols_branchessap)
    )

And this is the IgnoreUnicode class i use to override the default coder from apache_beam:

# CLASS CHANGE FRENCH CHARACTERS
class IgnoreUnicode(Coder):
    def encode(self, value):
        return value.encode('utf-8','ignore')

    def decode(self, value):
        return value.decode('utf-8','ignore')

    def is_deterministic(self):
        return True

These code works fine with the apache_beam version 2.24. However, if i upgrade it to the version above 2.24, it gives me an error like this (in this case i was using version 2.31):

enter image description here

is there any alternative solution how to implement custom coder in version above of 2.24?


Solution

  • It looks like this is an unfortunate combination of the way sources were restructured and having your PCoder defined in __main__. I suggest one of either two workarounds:

    (1) Move the definition of IgnoreUnicode to a proper module that gets imported rather than __main__, or

    (2) Read the file with BytesCoder, and follow that with a

    `beam.Map(lambda line: line.decode('utf-8','ignore'))`.
    

    (Personally, I prefer the latter, as it's preferable not to have coders that mutate the data.)