Dagster & Sling
This integration allows you to use Sling to extract and load data from popular data sources to destinations with high performance and ease.
note
This integration is currently experimental.
Installation
pip install dagster-sling
Example
from dagster_sling import SlingConnectionResource, SlingResource, sling_assets
import dagster as dg
source = SlingConnectionResource(
name="MY_PG",
type="postgres",
host="localhost", # type: ignore
port=5432, # type: ignore
database="my_database", # type: ignore
user="my_user", # type: ignore
password=dg.EnvVar("PG_PASS"), # type: ignore
)
target = SlingConnectionResource(
name="MY_SF",
type="snowflake",
host="hostname.snowflake", # type: ignore
user="username", # type: ignore
database="database", # type: ignore
password=dg.EnvVar("SF_PASSWORD"), # type: ignore
role="role", # type: ignore
)
@sling_assets(
replication_config={
"SOURCE": "MY_PG",
"TARGET": "MY_SF",
"defaults": {
"mode": "full-refresh",
"object": "{stream_schema}_{stream_table}",
},
"streams": {
"public.accounts": None,
"public.users": None,
"public.finance_departments": {"object": "departments"},
},
}
)
def my_sling_assets(context, sling: SlingResource):
yield from sling.replicate(context=context)
defs = dg.Definitions(
assets=[my_sling_assets],
resources={
"sling": SlingResource(
connections=[
source,
target,
]
)
},
)
About dlt
Sling provides an easy-to-use YAML configuration layer for loading data from files, replicating data between databases, exporting custom SQL queries to cloud storage, and much more.