I want to check, if the CSV file we read in the pipeline of apache beam, satisfies the format I’m expecting it to be in [Ex: field check, type check, null value check, etc.], before performing any transformation.
Performing these checks outside the pipeline for every file will take away the concept of parallelism, so I just wanted to know if it was possible to perform it inside the pipeline.
An example of what the code might look like:
import apache_beam as beam
branched=beam.Pipeline()
class f_c(beam.DoFn):
def process(self, element):
if element == feld:
return True
else:
return False
input_collection = (
branched
| 'Read from text file' >> beam.io.ReadFromText("gs://***.csv")
| 'Split rows' >> beam.Map(lambda line: line.split(',')))
field_check=(input_collection
| 'field function returning True or False' >> beam.ParDo(f_c())
| beam.io.WriteToText('gs://***/op'))
branched.run().wait_unitl_finish
Advertisement
Answer
I replicated a small instance of the problem with with just two rules. if the length of 3th column is >10 and value of 4th column is >500 then that record is good else its a bad record.
from apache_beam.io.textio import WriteToText
import apache_beam as beam
class FilterFn(beam.DoFn):
"""
The rules I have assumed is to just perform a length check on the third column and
value check on forth column
if(length of 3th column) >10 and (value of 4th column) is >500 then that record is good
"""
def process(self, text):
#------------------ -----isGoodRow BEGINS-------------------------------
def isGoodRow(a_list):
if( (len(a_list[2]) > 10) and (int(a_list[3]) >100) ):
return True
else:
return False
#------------------- ----isGoodRow ENDS-------------------------------
a_list = []
a_list = text.split(",") # this list contains all the column for a periticuar i/p Line
bool_result = isGoodRow(a_list)
if(bool_result == True):
yield beam.TaggedOutput('good', text)
else:
yield beam.TaggedOutput('bad', text)
with beam.Pipeline() as pipeline:
split_me = (
pipeline
| 'Read from text file' >> beam.io.ReadFromText("/content/sample.txt")
| 'Split words and Remove N/A' >> beam.ParDo(FilterFn()).with_outputs('good','bad')
)
good_collection = (
split_me.good
|"write good o/p" >> beam.io.WriteToText(file_path_prefix='/content/good',file_name_suffix='.txt')
)
bad_collection = (
split_me.bad
|"write bad o/p" >> beam.io.WriteToText(file_path_prefix='/content/bad',file_name_suffix='.txt')
)
I have written a ParDo FilterFn
that Tags output based of isGoodRow
function. You can rewrite the logic with your requirements ex. null checks, data validation etc.
The other option could be to use a Partition Function for this use case.
The sample.txt I used for this example:
1,Foo,FooBarFooBarFooBar,1000
2,Foo,Bar,10
3,Foo,FooBarFooBarFooBar,900
4,Foo,FooBar,800
Output of good.txt
1,Foo,FooBarFooBarFooBar,1000
3,Foo,FooBarFooBarFooBar,900
Output of bad.txt
2,Foo,Bar,10
4,Foo,FooBar,800