I am trying to write a Pipeline which will Read Data From JDBC(oracle,mssql) , do something and write to bigquery.
I am Struggling in the ReadFromJdbc steps where it was not able to convert it correct schema type.
My Code:
JavaScript
x
40
40
1
import typing
2
3
import apache_beam as beam
4
from apache_beam import coders
5
from apache_beam.io.gcp.spanner import ReadFromSpanner
6
from input.base_source_processor import BaseSourceProcessor
7
from apache_beam.io.jdbc import ReadFromJdbc
8
9
10
class Row(typing.NamedTuple):
11
COUNTRY_ID: str
12
COUNTRY_NAME: str
13
inc_col: str
14
15
16
class RdbmsProcessor(BaseSourceProcessor, abc.ABC):
17
def __init__(self, task):
18
self.task = task
19
20
def expand(self, p_input):
21
row = typing.NamedTuple('row', [('COUNTRY_ID', str), ('COUNTRY_NAME', str), ('inc_col', str)])
22
coders.registry.register_coder(Row, coders.RowCoder)
23
24
data = (p_input
25
| "Read from rdbms" >> ReadFromJdbc(
26
driver_class_name=self.task['rdbms_props']['driver_class_name'],
27
jdbc_url=self.task['rdbms_props']['jdbc_url'],
28
username=self.task['rdbms_props']['username'],
29
password=self.task['rdbms_props']['password'],
30
table_name='"dm-demo".COUNTRIES',
31
classpath=['/home/abhinav_jha_datametica_com/python_df/odbc_jars/ojdbc8.jar']
32
)
33
)
34
35
data | beam.combiners.Count.Globally() | beam.Map(print)
36
37
data | beam.Map(print)
38
39
return data
40
My data has three columns two of which are Varchar and one is timestamp.
Error which i am facing while running from dataflow as well as direct runner
JavaScript
1
20
20
1
ValueError: Failed to decode schema due to an issue with Field proto:
2
name: "COUNTRY_ID"
3
type {
4
logical_type {
5
urn: "beam:logical_type:javasdk:v1"
6
payload: "202SNAPPY00000000010000000100000223525010360U2543550005sr00=org.apache.beam.sdk.io.jdbc.LogicalTypes$VariableLengthStringr<273'6u341257020001I00tmaxt3514xr008242X0020JdbcL31i270246376361367203_313a020004L0010argumentt0022Ljava/lang/Object;L0014ar 0124334t00.Lorg/t31600/013164/sdk/schemas/S051024$Field01034;L0010base0114Dq00~0003L00nidentifier6r00t35730;xpsr00210121100.01211<.Integer223422402443672012078%07$05valuexr002031(hNumber2062542253513224340213020000xp00000007sr006N(01r26224.AutoV01N00_t27404_F21274h9304m364S243227P020010L0025collectionEle!/353230413l9\10t000216"0100L312$;L00nmapKey35S1414map052273524,10metadatat0017)25234util/Map!g(nullablet0023t35!>8/Boolean;L00trowt34310t00$2122430001T(typeNamet00-21220000$0125401/20;xr00,nu01t210Y'3413PLl[3573103%266010114sr0036%3330134204.C5|Ds$EmptyMapY624205Z33434732005300s2202r364,315 r200325234372356020001ZQ230p00p~r00+21223400213140000r0130220000xr001605225!Z20.Enumr340535$pt0005INT32sA35100t0130601t002201051024p~0107\25t0006STRINGt0007VARCHAR00000007"
7
representation {
8
atomic_type: STRING
9
}
10
argument_type {
11
atomic_type: INT32
12
}
13
argument {
14
atomic_value {
15
int32: 7
16
}
17
}
18
}
19
}
20
Any Help regarding this would be greatly appreciated.
Advertisement
Answer
JdbcIO appears to rely on Java-only logical types, so Python cannot deserialize them. This is tracked in https://issues.apache.org/jira/browse/BEAM-13717