Search code examples
pythongoogle-cloud-dataflowapache-beamapache-beam-io

How to Read data from Jdbc and write to bigquery using Apache Beam Python Sdk


I am trying to write a Pipeline which will Read Data From JDBC(oracle,mssql) , do something and write to bigquery.

I am Struggling in the ReadFromJdbc steps where it was not able to convert it correct schema type.

My Code:

import typing

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.gcp.spanner import ReadFromSpanner
from input.base_source_processor import BaseSourceProcessor
from apache_beam.io.jdbc import ReadFromJdbc


class Row(typing.NamedTuple):
   COUNTRY_ID: str
   COUNTRY_NAME: str
   inc_col: str


class RdbmsProcessor(BaseSourceProcessor, abc.ABC):
   def __init__(self, task):
       self.task = task

   def expand(self, p_input):
       row = typing.NamedTuple('row', [('COUNTRY_ID', str), ('COUNTRY_NAME', str), ('inc_col', str)])
       coders.registry.register_coder(Row, coders.RowCoder)

       data = (p_input
               | "Read from rdbms" >> ReadFromJdbc(
                   driver_class_name=self.task['rdbms_props']['driver_class_name'],
                   jdbc_url=self.task['rdbms_props']['jdbc_url'],
                   username=self.task['rdbms_props']['username'],
                   password=self.task['rdbms_props']['password'],
                   table_name='"dm-demo".COUNTRIES',
                   classpath=['/home/abhinav_jha_datametica_com/python_df/odbc_jars/ojdbc8.jar']
               )
               )

       data | beam.combiners.Count.Globally() | beam.Map(print)

       data | beam.Map(print)

       return data

My data has three columns two of which are Varchar and one is timestamp. Here is how my data looks like

Error which i am facing while running from dataflow as well as direct runner

ValueError: Failed to decode schema due to an issue with Field proto:
name: "COUNTRY_ID"
type {
  logical_type {
    urn: "beam:logical_type:javasdk:v1"
    payload: "\202SNAPPY\000\000\000\000\001\000\000\000\001\000\000\002\235\250\010\360U\254\355\000\005sr\000=org.apache.beam.sdk.io.jdbc.LogicalTypes$VariableLengthString\r<\273\'6u\341\257\002\000\001I\000\tmax\t\035\014xr\0008\242X\000\020JdbcL\031i\270\246\376\361\367\203_\313a\002\000\004L\000\010argumentt\000\022Ljava/lang/Object;L\000\014a\r \001\243\034t\000.Lorg/\t\316\000/\001\3164/sdk/schemas/S\005\010\024$Field\0010\034;L\000\010base\001\014Dq\000~\000\003L\000\nidentifier6r\000\t\357\030;xpsr\000\021\001\211\000.\001\211<.Integer\022\342\240\244\367\201\2078%\007$\005valuexr\000\020\031(hNumber\206\254\225\035\013\224\340\213\002\000\000xp\000\000\000\007sr\0006N(\001\r\262\024.AutoV\001N\000_\t\274\004_F\021\274h9\304m\364S\243\227P\002\000\010L\000\025collectionEle!/\035\323\004\013l9\\\010t\0000\216\"\001\000L\0312$;L\000\nmapKey\035S\014\014map\005\227\035\024,\010metadatat\000\017)\252\034util/Map!g(nullablet\000\023\t\035!>8/Boolean;L\000\trow\t\343\010t\000$\212\243\000\001T(typeNamet\000-\2122\000\000$\001\254\001/\020;xr\000,nu\001\t\210Y\'\034\013PLl[\357\3103%\266\001\001\014sr\000\036%\333\001\342\004.C5|Ds$EmptyMapY6\024\205Z\334\347\320\0053\000s22\002\r\364,\315 r\200\325\234\372\356\002\000\001ZQ2\030p\000p~r\000+\212\234\000\021\314\000\000\r\001\030\022\000\000xr\000\016\005\225!Z\020.Enum\r\034\005\035$pt\000\005INT32sA\351\000\t\001\306\001\t\000\022\001\005\010\024p~\001\007\\\025t\000\006STRINGt\000\007VARCHAR\000\000\000\007"
    representation {
      atomic_type: STRING
    }
    argument_type {
      atomic_type: INT32
    }
    argument {
      atomic_value {
        int32: 7
      }
    }
  }
}

Any Help regarding this would be greatly appreciated.


Solution

  • JdbcIO appears to rely on Java-only logical types, so Python cannot deserialize them. This is tracked in https://issues.apache.org/jira/browse/BEAM-13717