Skip to content
Advertisement

How to read multiple JSON files from GCS bucket in google dataflow apache beam python

I’m having a bucket in GCS that contain list of JSON files. I came to extract the list of the file names using

def list_blobs(bucket_name):


    storage_client = storage.Client()

    blobs = storage_client.list_blobs(bucket_name)
    json_paths = []
    for blob in blobs:
        json_paths.append(f"gs://{bucket_name}/{blob.name}")
    return json_paths

Now I want to pass this list of filenames to apache beam to read them. I wrote this code, but it doesn’t seem a good pattern

for i,file in enumerate(list_files):
        print("parsing file:", file)
        concat_data = (p |'Data {}'.format(i) >> ReadFromText(file)
        )
        final_result.append(concat_data)

Have you faced the same issue before?

Advertisement

Answer

In the end I came to use the google-cloud storage as reading API for this.

Listing all elements of the bucket

def list_blobs(bucket_name):
"""Lists all the blobs in the bucket."""

storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket_name)
json_paths = []
for blob in blobs:
    #json_paths.append(f"gs://{bucket_name}/{blob.name}")
    json_paths.append(f"{blob.name}")
return json_paths

and I created this ParDo for reading the content

class ReadFileContent(beam.DoFn):

def setup(self):
    # Called whenever the DoFn instance is deserialized on the worker.
    # This means it can be called more than once per worker because multiple instances of a given DoFn subclass may be created (e.g., due to parallelization, or due to garbage collection after a period of disuse).
    # This is a good place to connect to database instances, open network connections or other resources.
    self.storage_client = storage.Client()

def process(self, file_name, bucket_name):
    bucket = self.storage_client.get_bucket(bucket_name)
    blob = bucket.get_blob(file_name)
    yield blob.download_as_string()

And mu pipeline looked like this:

list_files = list_blobs(bucket_name)

with beam.Pipeline(options=pipeline_options) as p:

    results = (
        p | 'Create' >> beam.Create(list_files)
          | 'Read each file content' >> beam.ParDo(ReadFileContent(), bucket_name)
          | 'next transformation' >> ...
User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement