From what I found here, it is possible to use op
s and graph
s to generate asset
s.
However, I would like to use an asset
as an input for an op
. I am exploring it for a following use case:
- I fetch a list of country metadata from an external API and store it in my resource:
@dagster.asset def country_metadata_asset() -> List[Dict]: ...
- I use this asset to define some downstream assets, for example:
@dagster.asset def country_names_asset(country_metadata_asset) -> List[str]: ...
- I would like to use this asset to call another data source to retrieve and validate data and then write it to my resource. It returns a huge amount of rows. That is why I need to do it somehow in batch, and I thought that
graph
withop
s would be a better choice for it. I thought to do something like this:
@dagster.op(out=dagster.DynamicOut()) def load_country_names(country_names_asset): for country_index, country_name in enumerate(country_names_asset): yield dagster.DynamicOutput( country_name, mapping_key=f"{country_index} {country_name}" ) @dagster.graph() def update_data_graph(): country_names = load_country_names() country_names.map(retrieve_and_process_data) @dagster.job() def run_update_job(): update_data_graph()
It seems that my approach does not work, and I am not sure if it is conceptually correct. My questions are:
How to tell dagster that the input for
load_country_names
is an asset? Should I manually materialise it inside op?How to efficiently write augmented data that I return from
retrieve_and_process_data
into my resource? It is not possible to keep data in memory. So I thought to implement it somehow using a customIOManager
, but I am not sure how to do it.
Advertisement
Answer
It seems to me like the augmented data that’s returned from retrieve_and_process_data
can (at least in theory) be represented by an asset.
So we can start from the standpoint that we’d like to create some asset that takes in country_names_asset
, as well as the source data asset (the thing that has a bunch of rows in it, which we can call big_country_data_asset
for now). I think this models the underlying relationships a bit better, independent of how we’re actually implementing things.
The question then is how to write the computation function for this asset in a way that doesn’t require loading the entire contents of country_data_asset
into memory at any point in time. While it’s possible that you could do this with a dynamic graph, which you then wrap in a call to AssetsDefinition.from_graph, I think there’s an easier approach.
Dagster allows you to circumvent the IOManager machinery both when reading an asset as input, as well as when writing an asset as output. In essence, when you set an AssetKey as a non_argument_dep
, this tells Dagster that there is some asset which is upstream of the asset you’re defining, but will be loaded within the body of the asset function (rather than being loaded by Dagster using IOManager machinery).
Similarly, if you set the output type of the function to None, this tells Dagster that the asset you’re defining will be persisted by the logic inside of the function, rather than by an IOManager.
Using both these concepts, we can write an asset which at no point needs to have the entire big_country_data_asset
loaded.
@asset(non_argument_deps={AssetKey("big_country_data_asset")}) def processed_country_data_asset(country_names_asset) -> None: for name in country_names_asset: # assuming this function actually stores data somewhere, # and intrinsically knows how to read from big_country_data_asset retrieve_and_process_data(name)
IOManagers are a very flexible concept however, and it is possible to replicate all of this same batching behavior while using IOManagers (just a bit more convoluted). You’d need to do something like create a SourceAsset(key="big_country_data_asset", io_manager_def=my_custom_io_manager)
, where my_custom_io_manager has a weird load_input function which itself returns a function like:
def load_input(context): def _fn(country_name): # however you actually get these rows rows = query_source_data_for_name(country_name) return rows return _fn
then, you could define your asset like:
@asset def processed_country_data_asset( country_names_asset, big_country_data_asset ) -> None: for name in country_names_asset: # big_country_data_asset has been loaded as a function rows = big_country_data_asset(name) process_data(rows)
You can also handle writing the output of this function in an IOManager using a similar-looking trick: https://github.com/dagster-io/dagster/discussions/9772