Defining dependencies between partitioned assets
Now that you've seen how to model partitioned assets in different ways, you may want to define dependencies between the partitioned assets, or even between unpartitioned assets.
Partitioned assets in Dagster can have dependencies on other partitioned assets, allowing you to create complex data pipelines where the output of one partitioned asset feeds into another. Here's how it works:
- A downstream asset can depend on one or more partitions of an upstream asset
- The partitioning schemes don't need to be identical, but they should be compatible
Default partition dependency rules
A few rules govern default partition-to-partition dependencies:
- When the upstream asset and downstream asset have the same
PartitionsDefinition
, each partition in the downstream asset will depend on the same partition in the upstream asset. - When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset will depend on all partitions in the upstream asset that intersect its time window.
For example, if an asset with a DailyPartitionsDefinition
depends on an asset with an HourlyPartitionsDefinition
, then partition 2024-04-12
of the daily asset would depend on 24 partitions of the hourly asset: 2024-04-12-00:00
through 2024-04-12-23:00
.
Overriding default dependency rules
Default partition dependency rules can be overridden by providing a PartitionMapping
when specifying a dependency on an asset. How this is accomplished depends on the type of dependency the asset has.
Basic asset dependencies
To override partition dependency rules for basic asset dependencies, you can use AssetDep
to specify the partition dependency on an upstream asset:
from dagster import (
AssetDep,
DailyPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")
@asset(partitions_def=partitions_def)
def events(): ...
@asset(
partitions_def=partitions_def,
deps=[
AssetDep(
events,
partition_mapping=TimeWindowPartitionMapping(
start_offset=-1, end_offset=-1
),
)
],
)
def yesterday_event_stats(): ...
Managed-loading asset dependencies
To override partition dependency rules for managed-loading asset dependencies, you can use a PartitionMapping
to specify that each partition of an asset should depend on a partition in an upstream asset.
In the following code, we use a TimeWindowPartitionMapping
to specify that each partition of a daily-partitioned asset should depend on the prior day's partition in an upstream asset:
from dagster import (
AssetIn,
DailyPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
)
partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")
@asset(partitions_def=partitions_def)
def events(): ...
@asset(
partitions_def=partitions_def,
ins={
"events": AssetIn(
partition_mapping=TimeWindowPartitionMapping(
start_offset=-1, end_offset=-1
),
)
},
)
def yesterday_event_stats(events): ...
For a list of available PartitionMappings
, see the API docs.
Examples
Dependencies between different time-based partitions
The following example creates two partitions: daily_sales_data
and daily_sales_summary
, which can be executed at the same time in a single schedule.
Show example
import datetime
import os
import pandas as pd
import dagster as dg
# Create the PartitionDefinition,
# which will create a range of partitions from
# 2024-01-01 to the day before the current time
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
# Define the partitioned asset
@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
date = context.partition_key
# Simulate fetching daily sales data
df = pd.DataFrame(
{
"date": [date] * 10,
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
}
)
os.makedirs("data/daily_sales", exist_ok=True)
filename = f"data/daily_sales/sales_{date}.csv"
df.to_csv(filename, index=False)
context.log.info(f"Daily sales data written to {filename}")
@dg.asset(
partitions_def=daily_partitions, # Use the daily partitioning scheme
deps=[daily_sales_data], # Define dependency on `daily_sales_data` asset
)
def daily_sales_summary(context):
partition_date_str = context.partition_key
# Read the CSV file for the given partition date
filename = f"data/daily_sales/sales_{partition_date_str}.csv"
df = pd.read_csv(filename)
# Summarize daily sales
summary = {
"date": partition_date_str,
"total_sales": df["sales"].sum(),
}
context.log.info(f"Daily sales summary for {partition_date_str}: {summary}")
# Create a partitioned asset job
daily_sales_job = dg.define_asset_job(
name="daily_sales_job",
selection=[daily_sales_data, daily_sales_summary],
)
# Create a schedule to run the job daily
@dg.schedule(
job=daily_sales_job,
cron_schedule="0 1 * * *", # Run at 1:00 AM every day
)
def daily_sales_schedule(context):
"""Process previous day's sales data."""
# Calculate the previous day's date
previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1)
date = previous_day.strftime("%Y-%m-%d")
return dg.RunRequest(
run_key=date,
partition_key=date,
)
# Define the Definitions object
defs = dg.Definitions(
assets=[daily_sales_data, daily_sales_summary],
jobs=[daily_sales_job],
schedules=[daily_sales_schedule],
)
However, sometimes you might want to define dependencies between different time-based partitions. For example, you might want to aggregate daily data into a weekly report.
Consider the following example:
import datetime
import os
import pandas as pd
import dagster as dg
# Create the PartitionDefinition
daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
weekly_partitions = dg.WeeklyPartitionsDefinition(start_date="2024-01-01")
# Define the partitioned asset
@dg.asset(
partitions_def=daily_partitions,
automation_condition=dg.AutomationCondition.on_cron(cron_schedule="0 1 * * *"),
)
def daily_sales_data(context: dg.AssetExecutionContext):
date = context.partition_key
# Simulate fetching daily sales data
df = pd.DataFrame({"date": [date], "sales": [1000]})
os.makedirs("data/daily_sales", exist_ok=True)
filename = f"data/daily_sales/sales_{date}.csv"
df.to_csv(filename, index=False)
context.log.info(f"Daily sales data written to {filename}")
@dg.asset(
partitions_def=weekly_partitions,
automation_condition=dg.AutomationCondition.eager(),
deps=[daily_sales_data],
)
def weekly_sales_summary(context: dg.AssetExecutionContext):
week = context.partition_key
partition_key_range = context.asset_partition_key_range_for_input(
"daily_sales_data"
)
start_date = partition_key_range.start
end_date = partition_key_range.end
context.log.info(f"start_date: {start_date}, end_date: {end_date}")
df = pd.DataFrame()
for date in pd.date_range(start_date, end_date):
filename = f"data/daily_sales/sales_{date.strftime('%Y-%m-%d')}.csv"
df = pd.concat([df, pd.read_csv(filename)])
context.log.info(f"df: {df}")
weekly_summary = {
"week": week,
"total_sales": df["sales"].sum(),
}
context.log.info(f"weekly sales summary for {week}: {weekly_summary}")
# Define the Definitions object
defs = dg.Definitions(
assets=[daily_sales_data, weekly_sales_summary],
)
In this example:
-
We have a
daily_sales_data
asset partitioned by day, which will be executed daily. -
The
weekly_sales_summary
asset depends on thedaily_sales_data
asset, which will be executed weekly.- In this asset, the weekly partition depends on all its parent partitions (all seven days of the week). We use
context.asset_partition_key_range_for_input("daily_sales_data")
to get a range of partition keys, which includes the start and end of the week.
- In this asset, the weekly partition depends on all its parent partitions (all seven days of the week). We use
-
To automate the execution of these assets:
- First, we specify
automation_condition=AutomationCondition.eager()
to theweekly_sales_summary
asset. This ensures it runs weekly after all seven daily partitions ofdaily_sales_data
are up-to-date. - Second, we specify
automation_condition=AutomationCondition.cron(cron_schedule="0 1 * * *")
to thedaily_sales_data
asset. This ensures it runs daily.
- First, we specify
We recommend using automation conditions instead of schedules for code with complex dependency logic, such as the example above. Automation conditions specify when an asset should run, which allows you to define execution criteria without needing to add custom scheduling logic.