Skip to main content

Modal Application

For this tutorial, we will create multiple pipelines that do the following:

  • Download audio files
  • Transcribe the audio
  • Summarize the audio
  • Send an email summary

Many of these steps can be done in Dagster, but the transcription step is better suited for a different environment.

An advantage of Dagster is that you are not limited to only executing code with Dagster. In this case, we will use Modal. Modal makes it easy to manage and scale the infrastructure needed to perform distributed computation while maintaining a Pythonic workflow. It works especially well with Dagster, since Dagster can help manage and orchestrate the various components in our pipeline, while Modal can be used to spin up auto-scaling infrastructure in a serverless way.

We will start by explaining the Modal application of our pipeline and then show how we can use it within Dagster.

Within Modal, we need to define the image that will be used by the Modal infrastructure. As mentioned, Modal allows you to define an image and the desired dependencies in a Pythonic way so you can avoid defining a separate Dockerfile. The app image is then supplied to the `modal.App, which we will use later on to decorate our Modal functions:

app_image = (
modal.Image.debian_slim(python_version="3.10")
.apt_install("git")
.pip_install(
"git+https://github.com/openai/whisper.git",
"dacite",
"jiwer",
"ffmpeg-python",
"gql[all]~=3.0.0a5",
"python-multipart~=0.0.9",
"pandas",
"loguru==0.6.0",
"torchaudio==2.1.0",
"python-dotenv",
)
.apt_install("ffmpeg")
.pip_install("ffmpeg-python")
)

app = modal.App(
"whisper-pod-transcriber",
image=app_image,
)

Another benefit of Modal is that it allows us to mount a Cloudflare R2 Bucket like a file system. R2 will serve as the staging ground between Dagster and Modal:

cloud_bucket_mount = modal.CloudBucketMount(
"dagster-modal-demo",
bucket_endpoint_url=os.environ.get("CLOUDFLARE_R2_API"),
secret=modal.Secret.from_dict(
{
"AWS_ACCESS_KEY_ID": os.environ.get("CLOUDFLARE_R2_ACCESS_KEY_ID"),
"AWS_SECRET_ACCESS_KEY": os.environ.get("CLOUDFLARE_R2_SECRET_ACCESS_KEY"),
"AWS_REGION": "auto",
}
),
)

With the image and R2 mount ready, we can define our Modal functions. The first function will transcribe a segment of a podcast. Because Modal scales to fit the needs of the application and allows for parallel processing, we can optimize our application by splitting podcasts that may be several hours into smaller pieces. Modal will manage all of the infrastructure provisioning as needed. As you can see in the decorator, all we need to provide Modal with is our image, the R2 bucket, and our required CPUs (we could use GPUs but the OpenAI Whisper model is relatively small and does not require GPU processing like some other models):

@app.function(
image=app_image,
cpu=2,
timeout=400,
volumes={
"/mount": cloud_bucket_mount,
},
)
def transcribe_segment(
start: float,
end: float,
audio_filepath: pathlib.Path,
model: config.ModelSpec,
):
import tempfile
import time

import ffmpeg
import torch
import whisper # type: ignore

t0 = time.time()
with tempfile.NamedTemporaryFile(suffix=".mp3") as f:
(
ffmpeg.input(str(audio_filepath))
.filter("atrim", start=start, end=end)
.output(f.name)
.overwrite_output()
.run(quiet=True)
)

use_gpu = torch.cuda.is_available()
device = "cuda" if use_gpu else "cpu"
model = whisper.load_model(model.name, device=device, download_root=config.MODEL_DIR)
result = model.transcribe(f.name, language="en", fp16=use_gpu) # type: ignore

logger.info(
f"Transcribed segment {start:.2f} to {end:.2f} ({end - start:.2f}s duration) in {time.time() - t0:.2f} seconds."
)

# Add back offsets.
for segment in result["segments"]:
segment["start"] += start
segment["end"] += start

return result

The next function, transcribe_episode, will split an audio file into smaller segments and then apply the transcribe_segment function. After all the segments have been processed, it will write the transcribed text into JSON files within the R2 bucket:

    segment_gen = split_silences(str(audio_file))

output_text = ""
output_segments = []
for result in transcribe_segment.starmap(
segment_gen, kwargs=dict(audio_filepath=audio_file, model=model)
):
output_text += result["text"]
output_segments += result["segments"]

result = {
"text": output_text,
"segments": output_segments,
"language": "en",
}

logger.info(f"Writing openai/whisper transcription to {result_path}")
with open(result_path, "w") as f:
json.dump(result, f, indent=4)

With the Modal functions in place, we can define the entry point main. This is what Dagster will use to interact with Modal:

@app.local_entrypoint()
def main():
from dagster_pipes import open_dagster_pipes

model = config.DEFAULT_MODEL

with open_dagster_pipes() as context:
audio_path = context.extras.get("audio_file_path")
if not audio_path:
raise Exception("Missing `audio_file_path` extras parameter")

audio_path = "/mount/" + audio_path
transcription_path = audio_path.replace(".mp3", ".json")
transcribe_episode.remote(
audio_file=Path(audio_path),
result_path=Path(transcription_path),
model=model,
)

context.report_asset_materialization(
metadata={
"audio_file": audio_path,
"transcription_file": transcription_path,
}
)

Next steps