Skip to main content

RSS Assets

The Modal application is ready, so we can return to Dagster and define the upstream and downstream assets that will tie our pipeline together. First, we need to download the RSS feed and upload it to the R2 bucket:

    audio_asset_name = f"{feed_definition.name}_audio"

@dg.asset(
name=audio_asset_name,
partitions_def=rss_entry_partition,
compute_kind="python",
)
def _podcast_audio(context: dg.AssetExecutionContext, config: AudioRunConfig, s3: S3Resource):
"""Podcast audio file download from RSS feed and uploaded to R2."""
context.log.info("downloading audio file %s", config.audio_file_url)
audio_key = get_destination(context.partition_key)

metadata = {}

if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=audio_key):
context.log.info("audio file already exists... skipping")
metadata["status"] = "cached"
metadata["key"] = audio_key
else:
audio_bytes = download_bytes(config.audio_file_url)
s3.get_client().put_object(Body=audio_bytes, Bucket=R2_BUCKET_NAME, Key=audio_key)
metadata["status"] = "uploaded"
metadata["size"] = file_size(len(audio_bytes))

return dg.MaterializeResult(metadata=metadata)

Now that the data is in R2, as Modal expects, we can invoke the Modal application via Dagster. We will do this using Dagster Pipes. Pipes provide a wrapper around a subprocess. This is ideal for executing code in other environments, and also allows us to pass Dagster any necessary context or environment variables to Modal. This is particularly helpful for things like the access keys for the R2 Bucket. Using the Dagster ModalClient from the dagster-modal integration, we can invoke the Modal application:

    @dg.asset(
name=f"{feed_definition.name}_transcript",
partitions_def=rss_entry_partition,
deps=[_podcast_audio],
compute_kind="modal",
)
def _podcast_transcription(
context: dg.AssetExecutionContext, modal: ModalClient, s3: S3Resource
) -> dg.MaterializeResult:
"""Podcast transcription using OpenAI's Whipser model on Modal."""
context.log.info("transcript %s", context.partition_key)
audio_key = get_destination(context.partition_key)
transcription_key = audio_key.replace(".mp3", ".json")

if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=transcription_key):
return dg.MaterializeResult(metadata={"status": "cached"})

included_env_vars = [
"CLOUDFLARE_R2_API",
"CLOUDFLARE_R2_ACCESS_KEY_ID",
"CLOUDFLARE_R2_SECRET_ACCESS_KEY",
]
env = {k: v for k, v in os.environ.items() if k in included_env_vars}

return modal.run(
func_ref="modal_project.transcribe",
context=context,
env=env,
extras={"audio_file_path": audio_key},
).get_materialize_result()

Using pipes, Modal will emit events back to Dagster so Dagster can monitor and wait for the Modal application to finish running. Dagster will then continue the orchestration of our assets and move on to the next step after the transcribed text files are uploaded to R2.

The next asset will take those new files and summarize them with OpenAI. After a summary has been created, we can use MaterializeResult to record the summary text and the associated R2 key within the Dagster Catalog:

    @dg.asset(
name=f"{feed_definition.name}_summary",
partitions_def=rss_entry_partition,
deps=[_podcast_transcription],
compute_kind="openai",
)
def _podcast_summary(
context: dg.AssetExecutionContext, s3: S3Resource, openai: OpenAIResource
) -> dg.MaterializeResult:
audio_key = get_destination(context.partition_key)
transcription_key = audio_key.replace(".mp3", ".json")
summary_key = audio_key.replace(".mp3", "-summary.txt")

if object_exists(s3.get_client(), bucket=R2_BUCKET_NAME, key=summary_key):
return dg.MaterializeResult(metadata={"summary": "cached", "summary_key": summary_key})

response = s3.get_client().get_object(Bucket=R2_BUCKET_NAME, Key=transcription_key)

data = json.loads(response.get("Body").read())

with openai.get_client(context) as client:
summary = summarize(client, data.get("text"))

s3.get_client().put_object(
Body=summary.encode("utf-8"), Bucket=R2_BUCKET_NAME, Key=summary_key
)
return dg.MaterializeResult(metadata={"summary": summary, "summary_key": summary_key})

The final step in our DAG pipeline emails the summary to the user with yagmail.

Sensor

We have defined all the assets needed to transcribe and summarize a podcast. Now we want to make sure we are notified of new podcasts as soon as they are available. We can define a sensor in Dagster to check the podcast URL and determine if any new podcasts have been uploaded. To do this, we want to check the etag of the RSS feed. The sensor will be responsible for maintaining a cursor of tags and determining if there have been any podcasts not yet processed. This way, the first time we start our Dagster sensor, it will execute a run for each podcast, but after that initial execution, the sensor will only look for new podcasts that are uploaded:

    @dg.sensor(
name=f"rss_sensor_{feed_definition.name}",
minimum_interval_seconds=DEFAULT_POLLING_INTERVAL,
default_status=dg.DefaultSensorStatus.RUNNING,
job=_job,
)
def _sensor(context: dg.SensorEvaluationContext):
etag = context.cursor
context.log.info("querying feed with cursor etag: %s", etag)
feed = feedparser.parse(feed_definition.url, etag=etag)

num_entries = len(feed.entries)
context.log.info("total number of entries: %s", num_entries)

if num_entries > feed_definition.max_backfill_size:
context.log.info("truncating entries to %s", feed_definition.max_backfill_size)
entries = feed.entries[: feed_definition.max_backfill_size]
else:
entries = feed.entries

partition_key_audio_files = [
(sanitize(entry.id), get_entry_audio_url(entry)) for entry in entries
]

return dg.SensorResult(
run_requests=[
dg.RunRequest(
partition_key=partition_key,
run_config=dg.RunConfig(
ops={audio_asset_name: AudioRunConfig(audio_file_url=audio_file_url)}
),
)
for (partition_key, audio_file_url) in partition_key_audio_files
],
dynamic_partitions_requests=[
rss_entry_partition.build_add_request(
[key for (key, _) in partition_key_audio_files]
)
],
cursor=feed.etag,
)

Factory method

That is all we need for our podcast workflow. We now want to apply this to multiple podcasts. Luckily, the code has already accounted for that. You may have noticed that the assets and sensors we have created are parameterized. This is because all of this work exists within a factory function:

def rss_pipeline_factory(feed_definition: RSSFeedDefinition) -> dg.Definitions:

We use this factory to reuse the components and apply them to multiple RSS feeds. In the next section, we will discuss some of the best practices around the factory pattern in Dagster.

Next steps