Search code examples
pythonjdbcapache-beam

Error beam:logical_type:javasdk:v1 while using Apache Beam io.jdbc.ReadFromJdbc


I'm trying to read from a postgres table using Apache beam's python SDK. I have installed the Java SDK as the documentation says. I'm using the latest release. My code stands as follows:

import logging
import typing

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions
from past.builtins import unicode


def run(argv=None):
    beam_options = PipelineOptions()

    ExampleRow = typing.NamedTuple('ExampleRow', [('id', int), ('name', unicode)])

    coders.registry.register_coder(ExampleRow, coders.RowCoder)

    with beam.Pipeline(options=beam_options) as p:
        result = (
            p
            | 'Read from jdbc' >> ReadFromJdbc(
                                    table_name='jdbc_external_test_read',
                                    driver_class_name='org.postgresql.Driver',
                                    jdbc_url='jdbc:postgresql://localhost:5432/example',
                                    username='postgres',
                                    password='postgres')
        )


if __name__ == '__main__':
    logging.getLogger(__name__).setLevel(logging.INFO)
    run()

But when I run it I get the error ValueError: No logical type registered for URN 'beam:logical_type:javasdk:v1'


Solution

  • This issue is caused because the VARCHAR field is returned as an Apache Beam Logical Type in it's schema. Logical types are denoted by their URN's, in this case beam:logical_type:javasdk:v1. For logical types, a "decoder" has to be registered for corresponding URN to read the value. You can do that as follows:

    from apache_beam.typehints.schemas import LogicalType
    
    @LogicalType.register_logical_type
    class db_str(LogicalType):
        @classmethod
        def urn(cls):
            return "beam:logical_type:javasdk:v1"
    
        @classmethod
        def language_type(cls):
            return str
    
        def to_language_type(self, value):
            return str(value)
    
        def to_representation_type(self, value):
            return str(value)
    

    This has to be done before running the pipeline, so the logical type will be recognized as string and converted to string.