Skip to content
Advertisement

Apache Beam Python: returning conditional statement using ParDo class

I want to check, if the CSV file we read in the pipeline of apache beam, satisfies the format I’m expecting it to be in [Ex: field check, type check, null value check, etc.], before performing any transformation.

Performing these checks outside the pipeline for every file will take away the concept of parallelism, so I just wanted to know if it was possible to perform it inside the pipeline.

An example of what the code might look like:

import apache_beam as beam

branched=beam.Pipeline()

class f_c(beam.DoFn):
    def process(self, element):
        if element == feld:
            return True
        else:
            return False

input_collection = ( 
branched 
    | 'Read from text file' >> beam.io.ReadFromText("gs://***.csv")
    | 'Split rows' >> beam.Map(lambda line: line.split(',')))

field_check=(input_collection
    | 'field function returning True or False' >> beam.ParDo(f_c())
    | beam.io.WriteToText('gs://***/op'))

branched.run().wait_unitl_finish

Advertisement

Answer

I replicated a small instance of the problem with with just two rules. if the length of 3th column is >10 and value of 4th column is >500 then that record is good else its a bad record.

    from apache_beam.io.textio import WriteToText
    import apache_beam as beam
    
    class FilterFn(beam.DoFn):
      """
      The rules I have assumed is to just perform a length check on the third column and 
      value check on forth column
      if(length of 3th column) >10 and (value of 4th column) is >500 then that record is good
      """
      def process(self, text):
    #------------------ -----isGoodRow BEGINS-------------------------------
        def isGoodRow(a_list):
          if( (len(a_list[2]) > 10) and (int(a_list[3]) >100) ):
            return True
          else:
            return False
    #------------------- ----isGoodRow ENDS-------------------------------
        a_list = []
        a_list = text.split(",") # this list contains all the column for a periticuar i/p Line
        bool_result = isGoodRow(a_list)
        if(bool_result == True):
          yield beam.TaggedOutput('good', text)
        else:
          yield beam.TaggedOutput('bad', text)
    
    with beam.Pipeline() as pipeline:
      split_me = (
          pipeline
          | 'Read from text file' >> beam.io.ReadFromText("/content/sample.txt")
          | 'Split words and Remove N/A' >> beam.ParDo(FilterFn()).with_outputs('good','bad')
          )
      good_collection = (
          split_me.good 
          |"write good o/p" >> beam.io.WriteToText(file_path_prefix='/content/good',file_name_suffix='.txt')
          ) 
      bad_collection = (
          split_me.bad 
          |"write bad o/p" >> beam.io.WriteToText(file_path_prefix='/content/bad',file_name_suffix='.txt')
          )

I have written a ParDo FilterFn that Tags output based of isGoodRow function. You can rewrite the logic with your requirements ex. null checks, data validation etc. The other option could be to use a Partition Function for this use case.

The sample.txt I used for this example:

1,Foo,FooBarFooBarFooBar,1000
2,Foo,Bar,10
3,Foo,FooBarFooBarFooBar,900
4,Foo,FooBar,800

Output of good.txt

1,Foo,FooBarFooBarFooBar,1000
3,Foo,FooBarFooBarFooBar,900

Output of bad.txt

2,Foo,Bar,10
4,Foo,FooBar,800
User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement