I’m having a bucket in GCS that contain list of JSON files. I came to extract the list of the file names using
def list_blobs(bucket_name): storage_client = storage.Client() blobs = storage_client.list_blobs(bucket_name) json_paths = [] for blob in blobs: json_paths.append(f"gs://{bucket_name}/{blob.name}") return json_paths
Now I want to pass this list of filenames to apache beam to read them. I wrote this code, but it doesn’t seem a good pattern
for i,file in enumerate(list_files): print("parsing file:", file) concat_data = (p |'Data {}'.format(i) >> ReadFromText(file) ) final_result.append(concat_data)
Have you faced the same issue before?
Advertisement
Answer
In the end I came to use the google-cloud storage as reading API for this.
Listing all elements of the bucket
def list_blobs(bucket_name): """Lists all the blobs in the bucket.""" storage_client = storage.Client() blobs = storage_client.list_blobs(bucket_name) json_paths = [] for blob in blobs: #json_paths.append(f"gs://{bucket_name}/{blob.name}") json_paths.append(f"{blob.name}") return json_paths
and I created this ParDo for reading the content
class ReadFileContent(beam.DoFn): def setup(self): # Called whenever the DoFn instance is deserialized on the worker. # This means it can be called more than once per worker because multiple instances of a given DoFn subclass may be created (e.g., due to parallelization, or due to garbage collection after a period of disuse). # This is a good place to connect to database instances, open network connections or other resources. self.storage_client = storage.Client() def process(self, file_name, bucket_name): bucket = self.storage_client.get_bucket(bucket_name) blob = bucket.get_blob(file_name) yield blob.download_as_string()
And mu pipeline looked like this:
list_files = list_blobs(bucket_name) with beam.Pipeline(options=pipeline_options) as p: results = ( p | 'Create' >> beam.Create(list_files) | 'Read each file content' >> beam.ParDo(ReadFileContent(), bucket_name) | 'next transformation' >> ...