Customizing Dagster's built-in loggers
Custom loggers are used to alter the structure of the logs being produced by your Dagster pipelines. For example, JSON logs can be produced to more easily be processed by log management systems. For a list of all built-in loggers, see the API documentation.
It's not currently possible to globally configure the logger for all jobs in a repository.
Prerequisites
To follow the steps in this guide, you'll need:
- A basic understanding of Dagster concepts such as assets, jobs and definitions
- A working knowledge of the Python logging module
Step 1: Add a prebuilt custom logger to your jobs
This step shows how to add an existing custom logger, the _loggers.json_console_logger
, to your jobs. This will override the default _loggers.colored_console_logger
and produce logs in JSON format.
Add the custom logger to your asset jobs
The following example shows how to add the custom logger to your code location definitions and configure an asset job to use it.
import typing
import requests
import dagster as dg
from dagster import json_console_logger
LOGGER_CONFIG = {"loggers": {"console": {"config": {"log_level": "INFO"}}}}
@dg.asset()
def hackernews_topstory_ids(context: dg.AssetExecutionContext) -> list[int]:
"""Get up to 500 top stories from the HackerNews topstories endpoint.
API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
"""
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_500_newstories = requests.get(newstories_url).json()
# Log the number of stories fetched
context.log.info(f"Compute Logger - Got {len(top_500_newstories)} top stories.")
return top_500_newstories
hackernews_topstory_ids_job = dg.define_asset_job(
name="topstory_ids_job",
config=LOGGER_CONFIG,
)
defs = dg.Definitions(
assets=[hackernews_topstory_ids],
jobs=[hackernews_topstory_ids_job],
loggers={"console": json_console_logger},
)
Add the custom logger to your ops-based jobs
Configuring a ops job to use the custom logger slightly differs from the asset job example. The following example shows how:
import requests
import dagster as dg
from dagster import json_console_logger
LOGGER_CONFIG = {"loggers": {"console": {"config": {"log_level": "INFO"}}}}
@dg.op
def get_hackernews_topstory_ids(context: dg.OpExecutionContext):
"""Get up to 500 top stories from the HackerNews topstories endpoint.
API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
"""
newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
top_500_newstories = requests.get(newstories_url).json()
# Log the number of stories fetched
context.log.info(f"Compute Logger - Got {len(top_500_newstories)} top stories.")
return top_500_newstories
@dg.job(logger_defs={"console": json_console_logger}, config=LOGGER_CONFIG)
def hackernews_topstory_ids_job():
get_hackernews_topstory_ids()
defs = dg.Definitions(
jobs=[hackernews_topstory_ids_job],
)
Expected json_console_logger
output
The json_console_logger
will emit an exhaustive single line JSON document containing the full log record, including the Dagster metadata fields.
Here's an example of the output for reference, formatted for readability:
{
"args": [],
"created": 1725455358.2311811,
"dagster_meta": {
"dagster_event": null,
"dagster_event_batch_metadata": null,
"job_name": "hackernews_topstory_ids_job",
"job_tags": {
".dagster/grpc_info": "{\"host\": \"localhost\", \"socket\": \"/var/folders/5b/t062dlpj3j716l4w1d3yq6vm0000gn/T/tmpds_hvzm9\"}",
"dagster/preset_name": "default",
"dagster/solid_selection": "*"
},
"log_message_id": "3850cfb8-f9fb-458a-a986-3efd26e4b859",
"log_timestamp": "2024-09-04T13:09:18.225289",
"op_name": "get_hackernews_topstory_ids",
"orig_message": "Compute Logger - Got 500 top stories.",
"resource_fn_name": null,
"resource_name": null,
"run_id": "11528a21-38d5-43e7-8b13-993e47ce7f7d",
"step_key": "get_hackernews_topstory_ids"
},
"exc_info": null,
"exc_text": null,
"filename": "log_manager.py",
"funcName": "emit",
"levelname": "INFO",
"levelno": 20,
"lineno": 272,
"module": "log_manager",
"msecs": 231.0,
"msg": "hackernews_topstory_ids_job - 11528a21-38d5-43e7-8b13-993e47ce7f7d - get_hackernews_topstory_ids - Compute Logger - Got 500 top stories.",
"name": "dagster",
"pathname": "/home/dagster/workspace/quickstart-etl/.venv/lib/python3.11/site-packages/dagster/_core/log_manager.py",
"process": 35373,
"processName": "SpawnProcess-2:1",
"relativeCreated": 813.4410381317139,
"stack_info": null,
"thread": 8584465408,
"threadName": "MainThread"
}
Changing the logger configuration in the Dagster UI
You can also change the logger configuration in the Dagster UI. This is useful if you want to change the logger configuration without changing the code, to use the custom logger on a manual asset materialization launch, or change the verbosity of the logs. Add the following lines to your config.yaml
:
loggers:
console:
config:
log_level: DEBUG
Step 2: Write your custom logger
In this example, we'll create a logger implementation that produces comma separated values from selected fields in the
log record. Other examples can be found in the codebase, in the built-in loggers such as json_console_logger
.
import logging
import sys
from datetime import datetime
import dagster as dg
@dg.logger(
{
"log_level": dg.Field(str, is_required=False, default_value="INFO"),
"name": dg.Field(str, is_required=False, default_value="dagster"),
},
description="A comma separated console logger.",
)
def readable_console_logger(init_context: dg.InitLoggerContext) -> logging.Logger:
level = init_context.logger_config["log_level"]
name = init_context.logger_config["name"]
klass = logging.getLoggerClass()
logger_ = klass(name, level=level)
handler = logging.StreamHandler(stream=sys.stdout)
class CommaSeparatedRecordFormatter(logging.Formatter):
def format(self, record: logging.LogRecord):
dagster_meta = record.__dict__["dagster_meta"]
fields = [
datetime.fromtimestamp(record.created).isoformat(),
record.name,
record.levelname,
dagster_meta.get("run_id", "-"),
dagster_meta.get("job_name", "-"),
dagster_meta.get("op_name", "-"),
dagster_meta.get("orig_message", record.msg),
]
return ",".join(fields)
handler.setFormatter(CommaSeparatedRecordFormatter())
logger_.addHandler(handler)
return logger_
Sample output:
2024-09-04T09:29:33.643818,dagster,INFO,cc76a116-4c8f-400f-9c4d-c42b66cdee3a,topstory_ids_job,hackernews_topstory_ids,Compute Logger - Got 500 top stories.
The available fields emitted by the logger are defined in the LogRecord
object.
In addition, Dagster specific information can be found in the dagster_meta
attribute of the log record. The previous
example already some of these attributes.
It contains the following fields:
dagster_event
: stringdagster_event_batch_metadata
: stringjob_name
: stringjob_tags
: a dictionary of stringslog_message_id
: stringlog_timestamp
: stringop_name
: stringrun_id
: stringstep_key
: string