Skip to main content

Using resources in sensors

Dagster's resources system can be used with sensors to make it easier to call out to external systems and to make components of a sensor easier to plug in for testing purposes.

To specify resource dependencies, annotate the resource as a parameter to the sensor's function. Resources are provided by attaching them to your Definitions call.

Here, a resource is provided which provides access to an external API. The same resource could be used in the job or assets that the sensor triggers.

from dagster import (
sensor,
RunRequest,
SensorEvaluationContext,
ConfigurableResource,
job,
Definitions,
RunConfig,
)
import requests
from typing import List

class UsersAPI(ConfigurableResource):
url: str

def fetch_users(self) -> list[str]:
return requests.get(self.url).json()

@job
def process_user(): ...

@sensor(job=process_user)
def process_new_users_sensor(
context: SensorEvaluationContext,
users_api: UsersAPI,
):
last_user = int(context.cursor) if context.cursor else 0
users = users_api.fetch_users()

num_users = len(users)
for user_id in users[last_user:]:
yield RunRequest(
run_key=user_id,
tags={"user_id": user_id},
)

context.update_cursor(str(num_users))

defs = Definitions(
jobs=[process_user],
sensors=[process_new_users_sensor],
resources={"users_api": UsersAPI(url="https://my-api.com/users")},
)

For more information on resources, refer to the Resources documentation. To see how to test schedules with resources, refer to the section on testing sensors with resources in "Testing sensors".