I’m having a bucket in GCS that contain list of JSON files. I came to extract the list of the file names using
JavaScript
x
11
11
1
def list_blobs(bucket_name):
2
3
4
storage_client = storage.Client()
5
6
blobs = storage_client.list_blobs(bucket_name)
7
json_paths = []
8
for blob in blobs:
9
json_paths.append(f"gs://{bucket_name}/{blob.name}")
10
return json_paths
11
Now I want to pass this list of filenames to apache beam to read them. I wrote this code, but it doesn’t seem a good pattern
JavaScript
1
6
1
for i,file in enumerate(list_files):
2
print("parsing file:", file)
3
concat_data = (p |'Data {}'.format(i) >> ReadFromText(file)
4
)
5
final_result.append(concat_data)
6
Have you faced the same issue before?
Advertisement
Answer
In the end I came to use the google-cloud storage as reading API for this.
Listing all elements of the bucket
JavaScript
1
12
12
1
def list_blobs(bucket_name):
2
"""Lists all the blobs in the bucket."""
3
4
storage_client = storage.Client()
5
blobs = storage_client.list_blobs(bucket_name)
6
json_paths = []
7
for blob in blobs:
8
#json_paths.append(f"gs://{bucket_name}/{blob.name}")
9
json_paths.append(f"{blob.name}")
10
return json_paths
11
12
and I created this ParDo for reading the content
JavaScript
1
14
14
1
class ReadFileContent(beam.DoFn):
2
3
def setup(self):
4
# Called whenever the DoFn instance is deserialized on the worker.
5
# This means it can be called more than once per worker because multiple instances of a given DoFn subclass may be created (e.g., due to parallelization, or due to garbage collection after a period of disuse).
6
# This is a good place to connect to database instances, open network connections or other resources.
7
self.storage_client = storage.Client()
8
9
def process(self, file_name, bucket_name):
10
bucket = self.storage_client.get_bucket(bucket_name)
11
blob = bucket.get_blob(file_name)
12
yield blob.download_as_string()
13
14
And mu pipeline looked like this:
JavaScript
1
9
1
list_files = list_blobs(bucket_name)
2
3
with beam.Pipeline(options=pipeline_options) as p:
4
5
results = (
6
p | 'Create' >> beam.Create(list_files)
7
| 'Read each file content' >> beam.ParDo(ReadFileContent(), bucket_name)
8
| 'next transformation' >>
9