Ask AI

Snowflake with PySpark (dagster-snowflake-pyspark)

This library provides an integration with the Snowflake data warehouse and PySpark data processing library.

To use this library, you should first ensure that you have an appropriate Snowflake user configured to access your data warehouse.

Related Guides:

dagster_snowflake_pyspark.SnowflakePySparkIOManager IOManagerDefinition[source]

Config Schema:
database (dagster.StringSource):

Name of the database to use.

account (dagster.StringSource):

Your Snowflake account name. For more details, see the Snowflake documentation.

user (dagster.StringSource):

User login name.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Default Value: None

password (Union[dagster.StringSource, None], optional):

User password.

Default Value: None

warehouse (Union[dagster.StringSource, None], optional):

Name of the warehouse to use.

Default Value: None

role (Union[dagster.StringSource, None], optional):

Name of the role to use.

Default Value: None

private_key (Union[dagster.StringSource, None], optional):

Raw private key to use. See the Snowflake documentation for details. To avoid issues with newlines in the keys, you can base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat rsa_key.p8 | base64

Default Value: None

private_key_path (Union[dagster.StringSource, None], optional):

Path to the private key. See the Snowflake documentation for details.

Default Value: None

private_key_password (Union[dagster.StringSource, None], optional):

The password of the private key. See the Snowflake documentation for details. Required for both private_key and private_key_path if the private key is encrypted. For unencrypted keys, this config can be omitted or set to None.

Default Value: None

store_timestamps_as_strings (Union[dagster.BoolSource, None], optional):

If using Pandas DataFrames, whether to convert time data to strings. If True, time data will be converted to strings when storing the DataFrame and converted back to time data when loading the DataFrame. If False, time data without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False.

Default Value: False

authenticator (Union[dagster.StringSource, None], optional):

Optional parameter to specify the authentication mechanism to use.

Default Value: None

additional_snowflake_connection_args (Union[dict, None], optional):

Additional keyword arguments to pass to the snowflake.connector.connect function. For a full list of available arguments, see the Snowflake documentation. This config will be ignored if using the sqlalchemy connector.

Default Value: None

An I/O manager definition that reads inputs from and writes PySpark DataFrames to Snowflake. When using the SnowflakePySparkIOManager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.

Returns:

IOManagerDefinition

Examples

from dagster_snowflake_pyspark import SnowflakePySparkIOManager
from pyspark.sql import DataFrame
from dagster import Definitions, EnvVar

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": SnowflakePySparkIOManager(
            database="my_database",
            warehouse="my_warehouse", # required for SnowflakePySparkIOManager
            account=EnvVar("SNOWFLAKE_ACCOUNT"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
            ...
        )
    }
)

Note that the warehouse configuration value is required when using the SnowflakePySparkIOManager

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table]
    resources={
        "io_manager" SnowflakePySparkIOManager(database="my_database", schema="my_schema", ...)
    }
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> DataFrame:
    ...

@asset(
    metadata={"schema": "my_schema"}  # will be used as the schema in snowflake
)
def my_other_table() -> DataFrame:
    ...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> DataFrame:
    ...

If none of these is provided, the schema will default to “public”. To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: DataFrame) -> DataFrame:
    # my_table will just contain the data from column "a"
    ...
class dagster_snowflake_pyspark.SnowflakePySparkTypeHandler[source]

Plugin for the Snowflake I/O Manager that can store and load PySpark DataFrames as Snowflake tables.

Examples

from dagster_snowflake import SnowflakeIOManager
from dagster_snowflake_pandas import SnowflakePandasTypeHandler
from dagster_snowflake_pyspark import SnowflakePySparkTypeHandler
from dagster import Definitions, EnvVar

class MySnowflakeIOManager(SnowflakeIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [SnowflakePandasTypeHandler(), SnowflakePySparkTypeHandler()]

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": MySnowflakeIOManager(database="MY_DATABASE", account=EnvVar("SNOWFLAKE_ACCOUNT"), warehouse="my_warehouse", ...)
    }
)

Legacy

dagster_snowflake_pyspark.snowflake_pyspark_io_manager IOManagerDefinition

Config Schema:
database (dagster.StringSource):

Name of the database to use.

account (dagster.StringSource):

Your Snowflake account name. For more details, see the Snowflake documentation.

user (dagster.StringSource):

User login name.

schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

Default Value: None

password (Union[dagster.StringSource, None], optional):

User password.

Default Value: None

warehouse (Union[dagster.StringSource, None], optional):

Name of the warehouse to use.

Default Value: None

role (Union[dagster.StringSource, None], optional):

Name of the role to use.

Default Value: None

private_key (Union[dagster.StringSource, None], optional):

Raw private key to use. See the Snowflake documentation for details. To avoid issues with newlines in the keys, you can base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat rsa_key.p8 | base64

Default Value: None

private_key_path (Union[dagster.StringSource, None], optional):

Path to the private key. See the Snowflake documentation for details.

Default Value: None

private_key_password (Union[dagster.StringSource, None], optional):

The password of the private key. See the Snowflake documentation for details. Required for both private_key and private_key_path if the private key is encrypted. For unencrypted keys, this config can be omitted or set to None.

Default Value: None

store_timestamps_as_strings (Union[dagster.BoolSource, None], optional):

If using Pandas DataFrames, whether to convert time data to strings. If True, time data will be converted to strings when storing the DataFrame and converted back to time data when loading the DataFrame. If False, time data without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False.

Default Value: False

authenticator (Union[dagster.StringSource, None], optional):

Optional parameter to specify the authentication mechanism to use.

Default Value: None

additional_snowflake_connection_args (Union[dict, None], optional):

Additional keyword arguments to pass to the snowflake.connector.connect function. For a full list of available arguments, see the Snowflake documentation. This config will be ignored if using the sqlalchemy connector.

Default Value: None

An I/O manager definition that reads inputs from and writes PySpark DataFrames to Snowflake. When using the snowflake_pyspark_io_manager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.

Returns:

IOManagerDefinition

Examples

from dagster_snowflake_pyspark import snowflake_pyspark_io_manager
from pyspark.sql import DataFrame
from dagster import Definitions

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": snowflake_pyspark_io_manager.configured({
            "database": "my_database",
            "warehouse": "my_warehouse", # required for snowflake_pyspark_io_manager
            "account" : {"env": "SNOWFLAKE_ACCOUNT"},
            "password": {"env": "SNOWFLAKE_PASSWORD"},
            ...
        })
    }
)

Note that the warehouse configuration value is required when using the snowflake_pyspark_io_manager

You can set a default schema to store the assets using the schema configuration value of the Snowflake I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table]
    resources={"io_manager" snowflake_pyspark_io_manager.configured(
        {"database": "my_database", "schema": "my_schema", ...} # will be used as the schema
    )}
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in snowflake
)
def my_table() -> DataFrame:
    ...

@asset(
    metadata={"schema": "my_schema"}  # will be used as the schema in snowflake
)
def my_other_table() -> DataFrame:
    ...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> DataFrame:
    ...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: DataFrame) -> DataFrame:
    # my_table will just contain the data from column "a"
    ...