Using Dagster with Snowflake#

This tutorial focuses on how to store and load Dagster's software-defined assets (SDAs) in Snowflake.

By the end of the tutorial, you will:

  • Configure a Snowflake I/O manager
  • Create a table in Snowflake using a Dagster asset
  • Make a Snowflake table available in Dagster
  • Load Snowflake tables in downstream assets

Prerequisites#

To complete this tutorial, you'll need:

  • To install the dagster-snowflake and dagster-snowflake-pandas libraries:

    pip install dagster-snowflake dagster-snowflake-pandas
    
  • To gather the following information, which is required to use the Snowflake I/O manager:

    • Snowflake account name: You can find this by logging into Snowflake and getting the account name from the URL:

    • Snowflake credentials: This includes a username and a password. The Snowflake I/O manager can read these values from environment variables. In this guide, we store the username and password as SNOWFLAKE_USER and SNOWFLAKE_PASSWORD, respectively.

      export SNOWFLAKE_USER=<your username>
      export SNOWFLAKE_PASSWORD=<your password>
      

      Refer to the Using environment variables and secrets guide for more info.


Step 1: Configure the Snowflake I/O manager#

The Snowflake I/O manager requires some configuration to connect to your Snowflake instance. The account, user and password are required to connect with Snowflake. Additionally, you need to specify a database to where all the tables should be stored.

You can also provide some optional configuration to further customize the Snowflake I/O manager. You can specify a warehouse and schema where data should be stored, and a role for the I/O manager.

from dagster_snowflake_pandas import snowflake_pandas_io_manager

from dagster import repository, with_resources


@repository
def flowers_analysis_repository():
    return with_resources(
        [iris_dataset],
        resource_defs={
            "io_manager": snowflake_pandas_io_manager.configured(
                {
                    "account": "abc1234.us-east-1",  # required
                    "user": {"env": "SNOWFLAKE_USER"},  # required
                    "password": {"env": "SNOWFLAKE_PASSWORD"},  # required
                    "database": "FLOWERS",  # required
                    "role": "writer",  # optional, defaults to the default role for the account
                    "warehouse": "PLANTS",  # optional, defaults to default warehouse for the account
                    "schema": "IRIS,",  # optional, defaults to PUBLIC
                }
            )
        },
    )

With this configuration, if you materialized an asset called iris_dataset, the Snowflake I/O manager would be permissioned with the role writer and would store the data in the FLOWERS.IRIS.IRIS_DATASET table in the PLANTS warehouse.

Finally, in the @repository, we assign the snowflake_pandas_io_manager to the io_manager key. io_manager is a reserved key to set the default I/O manager for your assets.

For more info about each of the configuration values, refer to the build_snowflake_io_manager API documentation.


Step 2: Create tables in Snowflake#

The Snowflake I/O manager can create and update tables for your Dagster defined assets, but you can also make existing Snowflake tables available to Dagster.

Store a Dagster asset as a table in Snowflake#

To store data in Snowflake using the Snowflake I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the Snowflake I/O Manager in your repository, like in Step 1: Configure the Snowflake I/O manager, and Dagster will handle storing and loading your assets in Snowflake.

import pandas as pd

from dagster import asset


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
        names=[
            "Sepal length (cm)",
            "Sepal width (cm)",
            "Petal length (cm)",
            "Petal width (cm)",
            "Species",
        ],
    )

In this example, we first define our asset. Here, we are fetching the Iris dataset as a Pandas DataFrame and renaming the columns. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame.

When Dagster materializes the iris_dataset asset using the configuration from Step 1: Configure the Snowflake I/O manager, the Snowflake I/O manager will create the table FLOWERS.IRIS.IRIS_DATASET if it does not exist and replace the contents of the table with the value returned from the iris_dataset asset.


Step 3: Load Snowflake tables in downstream assets#

Once you have created an asset or source asset that represents a table in Snowflake, you will likely want to create additional assets that work with the data. Dagster and the Snowflake I/O manager allow you to load the data stored in Snowflake tables into downstream assets

import pandas as pd

from dagster import asset

# this example uses the iris_dataset asset from Step 2


@asset
def iris_cleaned(iris_dataset: pd.DataFrame):
    return iris_dataset.dropna().drop_duplicates()

In this example, we want to provide the iris_dataset asset from the Store a Dagster asset as a table in Snowflake example to the iris_cleaned asset.

In iris_cleaned, the iris_dataset parameter tells Dagster that the value for the iris_dataset asset should be provided as input to iris_cleaned. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.

When materializing these assets, Dagster will use the snowflake_pandas_io_manager to fetch the FLOWERS.IRIS.IRIS_DATASET as a Pandas DataFrame and pass this DataFrame as the iris_dataset parameter to iris_cleaned. When iris_cleaned returns a Pandas DataFrame, Dagster will use the snowflake_pandas_io_manager to store the DataFrame as the FLOWERS.IRIS.IRIS_CLEANED table in Snowflake.


Completed code example#

When finished, your code should look like the following:

import pandas as pd
from dagster_snowflake_pandas import snowflake_pandas_io_manager

from dagster import SourceAsset, asset, repository, with_resources

iris_harvest_data = SourceAsset(key="iris_harvest_data")


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data",
        names=[
            "Sepal length (cm)",
            "Sepal width (cm)",
            "Petal length (cm)",
            "Petal width (cm)",
            "Species",
        ],
    )


@asset
def iris_cleaned(iris_dataset: pd.DataFrame):
    return iris_dataset.dropna().drop_duplicates()


@repository
def flowers_analysis_repository():
    return with_resources(
        [iris_dataset, iris_harvest_data, iris_cleaned],
        resource_defs={
            "io_manager": snowflake_pandas_io_manager.configured(
                {
                    "account": "abc1234.us-east-1",
                    "user": {"env": "SNOWFLAKE_USER"},
                    "password": {"env": "SNOWFLAKE_PASSWORD"},
                    "database": "FLOWERS",
                    "schema": "IRIS,",
                }
            )
        },
    )

For more information on software-defined assets, refer to the Assets tutorial or the Assets concept documentation.

For more information on I/O managers, refer to the I/O manager concept documentation.