Using dbt with Dagster, part three: Create and materialize upstream assets#

This is part three of the Using dbt with Dagster software-defined assets tutorial.

By this point, you've set up a dbt project and loaded dbt models into Dagster as assets.

In this step, you'll:


Step 1: Create upstream Dagster assets#

To fetch the data the dbt models require, we'll write two Dagster assets: one for customers and one for orders.

In /tutorial_template/tutorial_dbt_dagster/assets/__init__.py:

  1. Replace the import section at the top of the file with the following:

    import pandas as pd
    from dagster_dbt import load_assets_from_dbt_project
    
    from dagster import asset, file_relative_path
    

    In this step, we've added the pandas library and Dagster's asset.

  2. Add the following code before load_assets_from_dbt_project:

    @asset(key_prefix=["jaffle_shop"], group_name="staging")
    def customers_raw() -> pd.DataFrame:
        data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")
        return data
    
    @asset(key_prefix=["jaffle_shop"], group_name="staging")
    def orders_raw() -> pd.DataFrame:
        data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")
        return data
    

Let's take a closer look at the arguments we've provided:

  • key_prefix - When the assets are materialized, Dagster will store them in DuckDB in the schema defined by the last value in key_prefix. In this case, that's jaffle_shop. The tables will have the same names as the assets that produced them, which are customers_raw and orders_raw.

    Because these tables will become the source data for the stg_customers.sql and stg_orders.sql models in the dbt project, the names of the assets must match the table names specified in /tutorial_template/jaffle_shop/models/sources.yml, which you configured in part one of this tutorial.

  • group_name - When Dagster loads the dbt models as assets, the assets will be placed in an asset group based on the name of the folder (staging) containing the models. Because we want the assets we add to be included in the same group, we defined this as staging.

At this point, the /tutorial_template/tutorial_dbt_dagster/assets__init__.py file should look like this:

import pandas as pd
from dagster_dbt import load_assets_from_dbt_project

from dagster import asset, file_relative_path


@asset(key_prefix=["jaffle_shop"], group_name="staging")
def customers_raw() -> pd.DataFrame:
   data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")
   return data


@asset(key_prefix=["jaffle_shop"], group_name="staging")
def orders_raw() -> pd.DataFrame:
   data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")
   return data


DBT_PROJECT_PATH = file_relative_path(__file__, "../../jaffle_shop")
DBT_PROFILES = file_relative_path(__file__, "../../jaffle_shop/config")

dbt_assets = load_assets_from_dbt_project(
    project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, key_prefix=["jaffle_shop"]
)

Step 2: Add an I/O manager to the Dagster repository#

To materialize the assets, we need to tell Dagster how to handle the assets' inputs and outputs. We can do this using an I/O manager.

In this step, we'll update the Dagster repository to supply the duckdb_io_manager to the assets in the repository. This resource contains an I/O manager that, when assets are materialized, allows:

  • Upstream assets (customers_raw, orders_raw) to load data into DuckDB. In this example, the duckdb_io_manager uses DuckDBPandasTypeHandler to store the pandas DataFrames used in our assets as CSVs and load them into DuckDB.
  • Downstream assets to read data from DuckDB. We'll add the downstream asset in the next section.

In /tutorial_template/tutorial_dbt_dagster/repository.py, replace its contents with the following:

import os

from dagster_dbt import dbt_cli_resource
from tutorial_dbt_dagster import assets
from tutorial_dbt_dagster.assets import DBT_PROFILES, DBT_PROJECT_PATH

from dagster_duckdb_pandas import duckdb_pandas_io_manager

from dagster import load_assets_from_package_module, repository, with_resources


@repository
def tutorial_dbt_dagster():
    return with_resources(
        load_assets_from_package_module(assets),
        {
            "dbt": dbt_cli_resource.configured(
                {
                    "project_dir": DBT_PROJECT_PATH,
                    "profiles_dir": DBT_PROFILES,
                },
            ),
            "io_manager": duckdb_pandas_io_manager.configured(
                {"database": os.path.join(DBT_PROJECT_PATH, "tutorial.duckdb")}
            ),
        },
    )


Step 3: Materialize the assets using Dagit#

Now that you've created assets, resources, and a repository, it's time to materialize the assets! Materializing an asset runs the op it contains and saves the results to persistent storage. In this tutorial, we're saving asset outputs to DuckDB.

  1. In Dagit, click the Reload definitions button. This ensures that Dagit picks up the changes you made in the previous steps.

    At this point, the customers_raw and orders_raw assets should display above stg_customers and stg_orders as upstream dependencies:

    Asset graph in Dagit, showing dbt models and unmaterialized assets
  2. Click the Materialize all button near the top right corner of the page, which will launch a run to materialize the assets. When finished, the Materialized and Latest Run attributes in the asset will be populated:

    Asset graph in Dagit, showing materialized assets

After the run completes, you can:

  • Click the asset to open a sidebar containing info about the asset, including its last materialization stats and a link to view the Asset details page
  • Click the ID of the Latest Run - in the above image, that's 651489a2 - in an asset to view the Run details page. This page contains detailed info about the run, including timing information, errors, and logs.

What's next?#

At this point, you've built and materialized two upstream Dagster assets, providing source data to your dbt models. In the last section of the tutorial, we'll show you how to add a downstream asset to the pipeline.