Skip to main content

Creating sensors that react to run statuses

If you want to act on the status of a run, Dagster provides a way to create a sensor that reacts to run statuses. You can use run_status_sensor with a specified DagsterRunStatus to decorate a function that will run when the given status occurs. This can be used to launch other runs, send alerts to a monitoring service on run failure, or report a run success.

Here is an example of a run status sensor that launches a run of status_reporting_job if a run is successful:

@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=status_reporting_job,
)
def report_status_sensor(context):
# this condition prevents the sensor from triggering status_reporting_job again after it succeeds
if context.dagster_run.job_name != status_reporting_job.name:
run_config = {
"ops": {
"status_report": {"config": {"job_name": context.dagster_run.job_name}}
}
}
return RunRequest(run_key=None, run_config=run_config)
else:
return SkipReason("Don't report status of status_reporting_job")

request_job is the job that will be run when the RunRequest is returned.

Note that in report_status_sensor we conditionally return a RunRequest. This ensures that when report_status_sensor runs status_reporting_job it doesn't enter an infinite loop where the success of status_reporting_job triggers another run of status_reporting_job, which triggers another run, and so on.

Here is an example of a sensor that reports job success in a Slack message:

from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus


@run_status_sensor(run_status=DagsterRunStatus.SUCCESS)
def my_slack_on_run_success(context: RunStatusSensorContext):
slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"])

slack_client.chat_postMessage(
channel="#alert-channel",
text=f'Job "{context.dagster_run.job_name}" succeeded.',
)

When a run status sensor is triggered by a run but doesn't return anything, Dagster will report an event back to the run to indicate that the sensor ran.

Once you have written your sensor, you can add the sensor to a Definitions object so it can be enabled and used the same as other sensors:

from dagster import Definitions


defs = Definitions(jobs=[my_sensor_job], sensors=[my_slack_on_run_success])