Skip to main content

Defining dependencies between partitioned assets

Now that you've seen how to model partitioned assets in different ways, you may want to define dependencies between the partitioned assets, or even between unpartitioned assets.

Partitioned assets in Dagster can have dependencies on other partitioned assets, allowing you to create complex data pipelines where the output of one partitioned asset feeds into another. Here's how it works:

  • A downstream asset can depend on one or more partitions of an upstream asset
  • The partitioning schemes don't need to be identical, but they should be compatible

Default partition dependency rules

A few rules govern default partition-to-partition dependencies:

  • When the upstream asset and downstream asset have the same PartitionsDefinition, each partition in the downstream asset will depend on the same partition in the upstream asset.
  • When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset will depend on all partitions in the upstream asset that intersect its time window.

For example, if an asset with a DailyPartitionsDefinition depends on an asset with an HourlyPartitionsDefinition, then partition 2024-04-12 of the daily asset would depend on 24 partitions of the hourly asset: 2024-04-12-00:00 through 2024-04-12-23:00.

Overriding default dependency rules

Default partition dependency rules can be overridden by providing a PartitionMapping when specifying a dependency on an asset. How this is accomplished depends on the type of dependency the asset has.

Basic asset dependencies

To override partition dependency rules for basic asset dependencies, you can use AssetDep to specify the partition dependency on an upstream asset:

from dagster import (
AssetDep,
DailyPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)

partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")


@asset(partitions_def=partitions_def)
def events(): ...


@asset(
partitions_def=partitions_def,
deps=[
AssetDep(
events,
partition_mapping=TimeWindowPartitionMapping(
start_offset=-1, end_offset=-1
),
)
],
)
def yesterday_event_stats(): ...

Managed-loading asset dependencies

To override partition dependency rules for managed-loading asset dependencies, you can use a PartitionMapping to specify that each partition of an asset should depend on a partition in an upstream asset.

In the following code, we use a TimeWindowPartitionMapping to specify that each partition of a daily-partitioned asset should depend on the prior day's partition in an upstream asset:

from dagster import (
AssetIn,
DailyPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)

partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")


@asset(partitions_def=partitions_def)
def events(): ...


@asset(
partitions_def=partitions_def,
ins={
"events": AssetIn(
partition_mapping=TimeWindowPartitionMapping(
start_offset=-1, end_offset=-1
),
)
},
)
def yesterday_event_stats(events): ...

For a list of available PartitionMappings, see the API docs.

Examples

Dependencies between different time-based partitions

The following example creates two partitions: daily_sales_data and daily_sales_summary, which can be executed at the same time in a single schedule.

Show example
import datetime
import os

import pandas as pd

import dagster as dg

# Create the PartitionDefinition,
# which will create a range of partitions from
# 2024-01-01 to the day before the current time
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")


# Define the partitioned asset
@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
date = context.partition_key
# Simulate fetching daily sales data
df = pd.DataFrame(
{
"date": [date] * 10,
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
}
)

os.makedirs("data/daily_sales", exist_ok=True)
filename = f"data/daily_sales/sales_{date}.csv"
df.to_csv(filename, index=False)

context.log.info(f"Daily sales data written to {filename}")


@dg.asset(
partitions_def=daily_partitions, # Use the daily partitioning scheme
deps=[daily_sales_data], # Define dependency on `daily_sales_data` asset
)
def daily_sales_summary(context):
partition_date_str = context.partition_key
# Read the CSV file for the given partition date
filename = f"data/daily_sales/sales_{partition_date_str}.csv"
df = pd.read_csv(filename)

# Summarize daily sales
summary = {
"date": partition_date_str,
"total_sales": df["sales"].sum(),
}

context.log.info(f"Daily sales summary for {partition_date_str}: {summary}")


# Create a partitioned asset job
daily_sales_job = dg.define_asset_job(
name="daily_sales_job",
selection=[daily_sales_data, daily_sales_summary],
)


# Create a schedule to run the job daily
@dg.schedule(
job=daily_sales_job,
cron_schedule="0 1 * * *", # Run at 1:00 AM every day
)
def daily_sales_schedule(context):
"""Process previous day's sales data."""
# Calculate the previous day's date
previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1)
date = previous_day.strftime("%Y-%m-%d")
return dg.RunRequest(
run_key=date,
partition_key=date,
)


# Define the Definitions object
defs = dg.Definitions(
assets=[daily_sales_data, daily_sales_summary],
jobs=[daily_sales_job],
schedules=[daily_sales_schedule],
)

However, sometimes you might want to define dependencies between different time-based partitions. For example, you might want to aggregate daily data into a weekly report.

Consider the following example:

import datetime
import os

import pandas as pd

import dagster as dg

# Create the PartitionDefinition
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
weekly_partitions = dg.WeeklyPartitionsDefinition(start_date="2024-01-01")


# Define the partitioned asset
@dg.asset(
partitions_def=daily_partitions,
automation_condition=dg.AutomationCondition.on_cron(cron_schedule="0 1 * * *"),
)
def daily_sales_data(context: dg.AssetExecutionContext):
date = context.partition_key
# Simulate fetching daily sales data
df = pd.DataFrame({"date": [date], "sales": [1000]})

os.makedirs("data/daily_sales", exist_ok=True)
filename = f"data/daily_sales/sales_{date}.csv"
df.to_csv(filename, index=False)

context.log.info(f"Daily sales data written to {filename}")


@dg.asset(
partitions_def=weekly_partitions,
automation_condition=dg.AutomationCondition.eager(),
deps=[daily_sales_data],
)
def weekly_sales_summary(context: dg.AssetExecutionContext):
week = context.partition_key
partition_key_range = context.asset_partition_key_range_for_input(
"daily_sales_data"
)
start_date = partition_key_range.start
end_date = partition_key_range.end
context.log.info(f"start_date: {start_date}, end_date: {end_date}")

df = pd.DataFrame()
for date in pd.date_range(start_date, end_date):
filename = f"data/daily_sales/sales_{date.strftime('%Y-%m-%d')}.csv"
df = pd.concat([df, pd.read_csv(filename)])
context.log.info(f"df: {df}")

weekly_summary = {
"week": week,
"total_sales": df["sales"].sum(),
}

context.log.info(f"weekly sales summary for {week}: {weekly_summary}")


# Define the Definitions object
defs = dg.Definitions(
assets=[daily_sales_data, weekly_sales_summary],
)

In this example:

  • We have a daily_sales_data asset partitioned by day, which will be executed daily.

  • The weekly_sales_summary asset depends on the daily_sales_data asset, which will be executed weekly.

    • In this asset, the weekly partition depends on all its parent partitions (all seven days of the week). We use context.asset_partition_key_range_for_input("daily_sales_data") to get a range of partition keys, which includes the start and end of the week.
  • To automate the execution of these assets:

    • First, we specify automation_condition=AutomationCondition.eager() to the weekly_sales_summary asset. This ensures it runs weekly after all seven daily partitions of daily_sales_data are up-to-date.
    • Second, we specify automation_condition=AutomationCondition.cron(cron_schedule="0 1 * * *") to the daily_sales_data asset. This ensures it runs daily.
tip

We recommend using automation conditions instead of schedules for code with complex dependency logic, such as the example above. Automation conditions specify when an asset should run, which allows you to define execution criteria without needing to add custom scheduling logic.