I want to check, if the CSV file we read in the pipeline of apache beam, satisfies the format I’m expecting it to be in [Ex: field check, type check, null value check, etc.], before performing any transformation.
Performing these checks outside the pipeline for every file will take away the concept of parallelism, so I just wanted to know if it was possible to perform it inside the pipeline.
An example of what the code might look like:
import apache_beam as beam branched=beam.Pipeline() class f_c(beam.DoFn): def process(self, element): if element == feld: return True else: return False input_collection = ( branched | 'Read from text file' >> beam.io.ReadFromText("gs://***.csv") | 'Split rows' >> beam.Map(lambda line: line.split(','))) field_check=(input_collection | 'field function returning True or False' >> beam.ParDo(f_c()) | beam.io.WriteToText('gs://***/op')) branched.run().wait_unitl_finish
Advertisement
Answer
I replicated a small instance of the problem with with just two rules. if the length of 3th column is >10 and value of 4th column is >500 then that record is good else its a bad record.
from apache_beam.io.textio import WriteToText import apache_beam as beam class FilterFn(beam.DoFn): """ The rules I have assumed is to just perform a length check on the third column and value check on forth column if(length of 3th column) >10 and (value of 4th column) is >500 then that record is good """ def process(self, text): #------------------ -----isGoodRow BEGINS------------------------------- def isGoodRow(a_list): if( (len(a_list[2]) > 10) and (int(a_list[3]) >100) ): return True else: return False #------------------- ----isGoodRow ENDS------------------------------- a_list = [] a_list = text.split(",") # this list contains all the column for a periticuar i/p Line bool_result = isGoodRow(a_list) if(bool_result == True): yield beam.TaggedOutput('good', text) else: yield beam.TaggedOutput('bad', text) with beam.Pipeline() as pipeline: split_me = ( pipeline | 'Read from text file' >> beam.io.ReadFromText("/content/sample.txt") | 'Split words and Remove N/A' >> beam.ParDo(FilterFn()).with_outputs('good','bad') ) good_collection = ( split_me.good |"write good o/p" >> beam.io.WriteToText(file_path_prefix='/content/good',file_name_suffix='.txt') ) bad_collection = ( split_me.bad |"write bad o/p" >> beam.io.WriteToText(file_path_prefix='/content/bad',file_name_suffix='.txt') )
I have written a ParDo FilterFn
that Tags output based of isGoodRow
function. You can rewrite the logic with your requirements ex. null checks, data validation etc.
The other option could be to use a Partition Function for this use case.
The sample.txt I used for this example:
1,Foo,FooBarFooBarFooBar,1000 2,Foo,Bar,10 3,Foo,FooBarFooBarFooBar,900 4,Foo,FooBar,800
Output of good.txt
1,Foo,FooBarFooBarFooBar,1000 3,Foo,FooBarFooBarFooBar,900
Output of bad.txt
2,Foo,Bar,10 4,Foo,FooBar,800