@asset(key_prefix=["jaffle_shop"], group_name="staging")defcustomers_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")return data
@asset(key_prefix=["jaffle_shop"], group_name="staging")deforders_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")return data
Let's take a closer look at the arguments we've provided:
key_prefix - When the assets are materialized, Dagster will store them in DuckDB in the schema defined by the last value in key_prefix. In this case, that's jaffle_shop. The tables will have the same names as the assets that produced them, which are customers_raw and orders_raw.
Because these tables will become the source data for the stg_customers.sql and stg_orders.sql models in the dbt project, the names of the assets must match the table names specified in /tutorial_template/jaffle_shop/models/sources.yml, which you configured in part one of this tutorial.
group_name - When Dagster loads the dbt models as assets, the assets will be placed in an asset group based on the name of the folder (staging) containing the models. Because we want the assets we add to be included in the same group, we defined this as staging.
At this point, the /tutorial_template/tutorial_dbt_dagster/assets__init__.py file should look like this:
import pandas as pd
from dagster_dbt import load_assets_from_dbt_project
from dagster import asset, file_relative_path
@asset(key_prefix=["jaffle_shop"], group_name="staging")defcustomers_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")return data
@asset(key_prefix=["jaffle_shop"], group_name="staging")deforders_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")return data
DBT_PROJECT_PATH = file_relative_path(__file__,"../../jaffle_shop")
DBT_PROFILES = file_relative_path(__file__,"../../jaffle_shop/config")
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, key_prefix=["jaffle_shop"])
Step 2: Add an I/O manager to the Dagster repository#
To materialize the assets, we need to tell Dagster how to handle the assets' inputs and outputs. We can do this using an I/O manager.
In this step, we'll update the Dagster repository to supply the duckdb_io_manager to the assets in the repository. This resource contains an I/O manager that, when assets are materialized, allows:
Upstream assets (customers_raw, orders_raw) to load data into DuckDB. In this example, the duckdb_io_manager uses DuckDBPandasTypeHandler to store the pandas DataFrames used in our assets as CSVs and load them into DuckDB.
Downstream assets to read data from DuckDB. We'll add the downstream asset in the next section.
In /tutorial_template/tutorial_dbt_dagster/repository.py, replace its contents with the following:
import os
from dagster_dbt import dbt_cli_resource
from tutorial_dbt_dagster import assets
from tutorial_dbt_dagster.assets import DBT_PROFILES, DBT_PROJECT_PATH
from dagster_duckdb_pandas import duckdb_pandas_io_manager
from dagster import load_assets_from_package_module, repository, with_resources
@repositorydeftutorial_dbt_dagster():return with_resources(
load_assets_from_package_module(assets),{"dbt": dbt_cli_resource.configured({"project_dir": DBT_PROJECT_PATH,"profiles_dir": DBT_PROFILES,},),"io_manager": duckdb_pandas_io_manager.configured({"database": os.path.join(DBT_PROJECT_PATH,"tutorial.duckdb")}),},)
Now that you've created assets, resources, and a repository, it's time to materialize the assets! Materializing an asset runs the op it contains and saves the results to persistent storage. In this tutorial, we're saving asset outputs to DuckDB.
In Dagit, click the Reload definitions button. This ensures that Dagit picks up the changes you made in the previous steps.
At this point, the customers_raw and orders_raw assets should display above stg_customers and stg_orders as upstream dependencies:
Click the Materialize all button near the top right corner of the page, which will launch a run to materialize the assets. When finished, the Materialized and Latest Run attributes in the asset will be populated:
After the run completes, you can:
Click the asset to open a sidebar containing info about the asset, including its last materialization stats and a link to view the Asset details page
Click the ID of the Latest Run - in the above image, that's 651489a2 - in an asset to view the Run details page. This page contains detailed info about the run, including timing information, errors, and logs.
At this point, you've built and materialized two upstream Dagster assets, providing source data to your dbt models. In the last section of the tutorial, we'll show you how to add a downstream asset to the pipeline.