I'm trying to read from a postgres table using Apache beam's python SDK. I have installed the Java SDK as the documentation says. I'm using the latest release. My code stands as follows:
import logging
import typing
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions
from past.builtins import unicode
def run(argv=None):
beam_options = PipelineOptions()
ExampleRow = typing.NamedTuple('ExampleRow', [('id', int), ('name', unicode)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with beam.Pipeline(options=beam_options) as p:
result = (
p
| 'Read from jdbc' >> ReadFromJdbc(
table_name='jdbc_external_test_read',
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:postgresql://localhost:5432/example',
username='postgres',
password='postgres')
)
if __name__ == '__main__':
logging.getLogger(__name__).setLevel(logging.INFO)
run()
But when I run it I get the error ValueError: No logical type registered for URN 'beam:logical_type:javasdk:v1'
This issue is caused because the VARCHAR
field is returned as an Apache Beam Logical Type in it's schema. Logical types are denoted by their URN's, in this case beam:logical_type:javasdk:v1
. For logical types, a "decoder" has to be registered for corresponding URN to read the value. You can do that as follows:
from apache_beam.typehints.schemas import LogicalType
@LogicalType.register_logical_type
class db_str(LogicalType):
@classmethod
def urn(cls):
return "beam:logical_type:javasdk:v1"
@classmethod
def language_type(cls):
return str
def to_language_type(self, value):
return str(value)
def to_representation_type(self, value):
return str(value)
This has to be done before running the pipeline, so the logical type will be recognized as string and converted to string.