Related Guides:
Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.
Default Value: None
Default location for jobs / datasets / tables.
Default Value: None
GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS
will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64
Default Value: None
Resource for interacting with Google BigQuery.
Examples
from dagster import Definitions, asset
from dagster_gcp import BigQueryResource
@asset
def my_table(bigquery: BigQueryResource):
with bigquery.get_client() as client:
client.query("SELECT * FROM my_dataset.my_table")
defs = Definitions(
assets=[my_table],
resources={
"bigquery": BigQueryResource(project="my-project")
}
)
The GCP project to use.
Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.
Default Value: None
The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.
Default Value: None
GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS
will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64
Default Value: None
When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.
Default Value: None
When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).
Default Value: None
Base class for an I/O manager definition that reads inputs from and writes outputs to BigQuery.
Examples
from dagster_gcp import BigQueryIOManager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions, EnvVar
class MyBigQueryIOManager(BigQueryIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [BigQueryPandasTypeHandler()]
@asset(
key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={
"io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
}
)
You can set a default dataset to store the assets using the dataset
configuration value of the BigQuery I/O
Manager. This dataset will be used if no other dataset is specified directly on an asset or op.
defs = Definitions(
assets=[my_table],
resources={
"io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"), dataset="my_dataset")
}
)
On individual assets, you an also specify the dataset where they should be stored using metadata or
by adding a key_prefix
to the asset key. If both key_prefix
and metadata are defined, the metadata will
take precedence.
@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
...
@asset(
# note that the key needs to be "schema"
metadata={"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
...
For ops, the dataset can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
If none of these is provided, the dataset will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata columns
to the
In
or AssetIn
.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
If you cannot upload a file to your Dagster deployment, or otherwise cannot
authenticate with GCP
via a standard method, you can provide a service account key as the gcp_credentials
configuration.
Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS
to point to the file.
After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS
will be
unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve
the base64 encoded with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64
BigQuery Create Dataset.
This op encapsulates creating a BigQuery dataset.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
BigQuery Delete Dataset.
This op encapsulates deleting a BigQuery dataset.
Expects a BQ client to be provisioned in resources as context.resources.bigquery.
Get the last updated timestamps of a list BigQuery table.
Note that this only works on BigQuery tables, and not views.
client (bigquery.Client) – The BigQuery client.
dataset_id (str) – The BigQuery dataset ID.
table_ids (Sequence[str]) – The table IDs to get the last updated timestamp for.
A mapping of table IDs to their last updated timestamps (UTC).
Mapping[str, datetime]
Project name
Default Value: None
Resource for interacting with Google Cloud Storage.
Example
@asset
def my_asset(gcs: GCSResource):
with gcs.get_client() as client:
# client is a google.cloud.storage.Client
...
GCS bucket to store files
Prefix to add to all file paths
Default Value: ‘dagster’
Persistent IO manager using GCS for storage.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.
Assigns each op output to a unique filepath containing run ID, step key, and output name.
Assigns each asset to a single filesystem path, at <base_dir>/<asset_key>
. If the asset key
has multiple components, the final component is used as the name of the file, and the preceding
components as parent directories under the base_dir.
Subsequent materializations of an asset will overwrite previous materializations of that asset.
With a base directory of /my/base/path
, an asset with key
AssetKey(["one", "two", "three"])
would be stored in a file called three
in a directory
with path /my/base/path/one/two/
.
Example usage:
Attach this IO manager to a set of assets.
from dagster import asset, Definitions
from dagster_gcp.gcs import GCSPickleIOManager, GCSResource
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources={
"io_manager": GCSPickleIOManager(
gcs_bucket="my-cool-bucket",
gcs_prefix="my-cool-prefix",
gcs=GCSResource(project="my-cool-project")
),
}
)
Attach this IO manager to your job to make it available to your ops.
from dagster import job
from dagster_gcp.gcs import GCSPickleIOManager, GCSResource
@job(
resource_defs={
"io_manager": GCSPickleIOManager(
gcs=GCSResource(project="my-cool-project")
gcs_bucket="my-cool-bucket",
gcs_prefix="my-cool-prefix"
),
}
)
def my_job():
...
Return a list of updated keys in a GCS bucket.
bucket (str) – The name of the GCS bucket.
prefix (Optional[str]) – The prefix to filter the keys by.
since_key (Optional[str]) – The key to start from. If provided, only keys updated after this key will be returned.
gcs_session (Optional[google.cloud.storage.client.Client]) – A GCS client session. If not provided, a new session will be created.
A list of keys in the bucket, sorted by update time, that are newer than the since_key.
List[str]
Example
@resource
def google_cloud_storage_client(context):
return storage.Client().from_service_account_json("my-service-account.json")
@sensor(job=my_job, required_resource_keys={"google_cloud_storage_client"})
def my_gcs_sensor(context):
since_key = context.cursor or None
new_gcs_keys = get_gcs_keys(
"my-bucket",
prefix="data",
since_key=since_key,
gcs_session=context.resources.google_cloud_storage_client
)
if not new_gcs_keys:
return SkipReason("No new gcs files found for bucket 'my-bucket'.")
for gcs_key in new_gcs_keys:
yield RunRequest(run_key=gcs_key, run_config={
"ops": {
"gcs_files": {
"config": {
"gcs_key": gcs_key
}
}
}
})
last_key = new_gcs_keys[-1]
context.update_cursor(last_key)
Project name
Default Value: None
GCS bucket to store files
Prefix to add to all file paths
Default Value: ‘dagster’
FileManager that provides abstract access to GCS.
Logs op compute function stdout and stderr to GCS.
Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml
such as the following:
compute_logs:
module: dagster_gcp.gcs.compute_log_manager
class: GCSComputeLogManager
config:
bucket: "mycorp-dagster-compute-logs"
local_dir: "/tmp/cool"
prefix: "dagster-test-"
upload_interval: 30
There are more configuration examples in the instance documentation guide: https://docs.dagster.io/deployment/dagster-instance#compute-log-storage
bucket (str) – The name of the GCS bucket to which to log.
local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default:
dagster._seven.get_system_temp_directory()
.
prefix (Optional[str]) – Prefix for the log file keys.
json_credentials_envvar (Optional[str]) – Environment variable that contains the JSON with a private key
and other credentials information. If this is set, GOOGLE_APPLICATION_CREDENTIALS
will be ignored.
Can be used when the private key cannot be used as a file.
upload_interval – (Optional[int]): Interval in seconds to upload partial log files to GCS. By default, will only upload when the capture is complete.
show_url_only – (Optional[bool]): Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when instantiated from config.
Required. Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset/job.
The GCP region.
Required. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused.
Optional. The labels to associate with this cluster. Label keys must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if present, must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated with a cluster.
Default Value: None
Full path to a YAML file containing cluster configuration. See https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for configuration options. Only one of cluster_config_yaml_path, cluster_config_json_path, or cluster_config_dict may be provided.
Default Value: None
Full path to a JSON file containing cluster configuration. See https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for configuration options. Only one of cluster_config_yaml_path, cluster_config_json_path, or cluster_config_dict may be provided.
Default Value: None
Python dictionary containing cluster configuration. See https://cloud.google.com/dataproc/docs/reference/rest/v1/ClusterConfig for configuration options. Only one of cluster_config_yaml_path, cluster_config_json_path, or cluster_config_dict may be provided.
Default Value: None
Resource for connecting to a Dataproc cluster.
Example
@asset
def my_asset(dataproc: DataprocResource):
with dataproc.get_client() as client:
# client is a dagster_gcp.DataprocClient
...
Optional. Maximum time in seconds to wait for the job being completed. Default is set to 1200 seconds (20 minutes).
Default Value: 1200
A Cloud Dataproc job resource.
Cloud Dataproc job status.
Cloud Dataproc job config.
Required. The name of the cluster where the job will be submitted.
Job scheduling options.
Optional. Maximum number of times per hour a driver may be restarted as a result of driver terminating with non-zero code before job is reported failed.A job may be reported as thrashing if driver exits with non-zero code 4 times within 10 minute window.Maximum value is 10.
A Cloud Dataproc job for running Apache Pig (https://pig.apache.org/) queries on YARN.
The HCFS URI of the script that contains the Pig queries.
A list of queries to run on a cluster.
Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. Here is an example of an Cloud Dataproc API snippet that uses a QueryList to specify a HiveJob: “hiveJob”: { “queryList”: { “queries”: [ “query1”, “query2”, “query3;query4”, ] } }
Optional. HCFS URIs of jar files to add to the CLASSPATH of the Pig Client and Hadoop MapReduce (MR) tasks. Can contain Pig UDFs.
Optional. Mapping of query variable names to values (equivalent to the Pig command: name=[value]).
The runtime logging config of the job.
The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’
Optional. A mapping of property names to values, used to configure Pig. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/hadoop/conf/*-site.xml, /etc/pig/conf/pig.properties, and classes in user code.
Optional. Whether to continue executing queries if a query fails. The default value is false. Setting to true can be useful when executing independent parallel queries.
A Cloud Dataproc job for running Apache Hive (https://hive.apache.org/) queries on YARN.
Optional. Whether to continue executing queries if a query fails. The default value is false. Setting to true can be useful when executing independent parallel queries.
The HCFS URI of the script that contains Hive queries.
A list of queries to run on a cluster.
Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. Here is an example of an Cloud Dataproc API snippet that uses a QueryList to specify a HiveJob: “hiveJob”: { “queryList”: { “queries”: [ “query1”, “query2”, “query3;query4”, ] } }
Optional. HCFS URIs of jar files to add to the CLASSPATH of the Hive server and Hadoop MapReduce (MR) tasks. Can contain Hive SerDes and UDFs.
Optional. Mapping of query variable names to values (equivalent to the Hive command: SET name=”value”;).
Optional. A mapping of property names and values, used to configure Hive. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/hadoop/conf/*-site.xml, /etc/hive/conf/hive-site.xml, and classes in user code.
Optional. The labels to associate with this job. Label keys must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if present, must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated with a job.
A Cloud Dataproc job for running Apache Spark (http://spark.apache.org/) applications on YARN.
Optional. HCFS URIs of archives to be extracted in the working directory of Spark drivers and tasks. Supported file types: .jar, .tar, .tar.gz, .tgz, and .zip.
The HCFS URI of the jar file that contains the main class.
Optional. HCFS URIs of jar files to add to the CLASSPATHs of the Spark driver and tasks.
The runtime logging config of the job.
The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’
Optional. A mapping of property names to values, used to configure Spark. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/spark/conf/spark-defaults.conf and classes in user code.
Optional. The arguments to pass to the driver. Do not include arguments, such as –conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.
Optional. HCFS URIs of files to be copied to the working directory of Spark drivers and distributed tasks. Useful for naively parallel tasks.
The name of the driver’s main class. The jar file that contains the class must be in the default CLASSPATH or specified in jar_file_uris.
A Cloud Dataproc job for running Apache Spark SQL (http://spark.apache.org/sql/) queries.
A list of queries to run on a cluster.
Required. The queries to execute. You do not need to terminate a query with a semicolon. Multiple queries can be specified in one string by separating each with a semicolon. Here is an example of an Cloud Dataproc API snippet that uses a QueryList to specify a HiveJob: “hiveJob”: { “queryList”: { “queries”: [ “query1”, “query2”, “query3;query4”, ] } }
The HCFS URI of the script that contains SQL queries.
Optional. Mapping of query variable names to values (equivalent to the Spark SQL command: SET name=”value”;).
Optional. HCFS URIs of jar files to be added to the Spark CLASSPATH.
The runtime logging config of the job.
The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’
Optional. A mapping of property names to values, used to configure Spark SQL’s SparkConf. Properties that conflict with values set by the Cloud Dataproc API may be overwritten.
A Cloud Dataproc job for running Apache PySpark (https://spark.apache.org/docs/0.9.0/python-programming-guide.html) applications on YARN.
Optional. HCFS URIs of jar files to add to the CLASSPATHs of the Python driver and tasks.
The runtime logging config of the job.
The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’
Optional. A mapping of property names to values, used to configure PySpark. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/spark/conf/spark-defaults.conf and classes in user code.
Optional. The arguments to pass to the driver. Do not include arguments, such as –conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission.
Optional. HCFS URIs of files to be copied to the working directory of Python drivers and distributed tasks. Useful for naively parallel tasks.
Optional. HCFS file URIs of Python files to pass to the PySpark framework. Supported file types: .py, .egg, and .zip.
Required. The HCFS URI of the main Python file to use as the driver. Must be a .py file.
Optional. HCFS URIs of archives to be extracted in the working directory of .jar, .tar, .tar.gz, .tgz, and .zip.
Encapsulates the full scoping used to reference a job.
Required. The ID of the Google Cloud Platform project that the job belongs to.
Optional. The job ID, which must be unique within the project.The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or hyphens (-). The maximum length is 100 characters.If not specified by the caller, the job ID will be provided by the server.
A Cloud Dataproc job for running Apache Hadoop MapReduce (https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html) jobs on Apache Hadoop YARN (https://hadoop.apache.org/docs/r2.7.1/hadoop-yarn/hadoop-yarn-site/YARN.html).
Optional. Jar file URIs to add to the CLASSPATHs of the Hadoop driver and tasks.
The runtime logging config of the job.
The per-package log levels for the driver. This may include “root” package name to configure rootLogger. Examples: ‘com.google = FATAL’, ‘root = INFO’, ‘org.apache = DEBUG’
Optional. A mapping of property names to values, used to configure Hadoop. Properties that conflict with values set by the Cloud Dataproc API may be overwritten. Can include properties set in /etc/hadoop/conf/*-site and classes in user code.
Optional. The arguments to pass to the driver. Do not include arguments, such as -libjars or -Dfoo=bar, that can be set as job properties, since a collision may occur that causes an incorrect job submission.
Optional. HCFS (Hadoop Compatible Filesystem) URIs of files to be copied to the working directory of Hadoop drivers and distributed tasks. Useful for naively parallel tasks.
The name of the driver’s main class. The jar file containing the class must be in the default CLASSPATH or specified in jar_file_uris.
Optional. HCFS URIs of archives to be extracted in the working directory of Hadoop drivers and tasks. Supported file types: .jar, .tar, .tar.gz, .tgz, or .zip.
The HCFS URI of the jar file containing the main class. Examples: ‘gs://foo-bucket/analytics-binaries/extract-useful-metrics-mr.jar’ ‘hdfs:/tmp/test-samples/custom-wordcount.jar’ ‘file:///home/usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar’
Required. Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.
whether to create a cluster or use an existing cluster
Default Value: True
GCS bucket to store files
Prefix to add to all file paths
Default Value: ‘dagster’
( deprecated ) > This API will be removed in version 2.0.
Please use GCSPickleIOManager instead..>
Renamed to GCSPickleIOManager. See GCSPickleIOManager for documentation.
Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.
Default Value: None
Default location for jobs / datasets / tables.
Default Value: None
GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS
will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64
Default Value: None
The GCP project to use.
Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.
Default Value: None
The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.
Default Value: None
GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS
will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64
Default Value: None
When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.
Default Value: None
When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).
Default Value: None
( experimental ) > This API may break in future versions, even between dot releases.
Builds an I/O manager definition that reads inputs from and writes outputs to BigQuery.
type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between slices of BigQuery tables and an in-memory type - e.g. a Pandas DataFrame. If only one DbTypeHandler is provided, it will be used as the default_load_type.
default_load_type (Type) – When an input has no type annotation, load it as this type.
IOManagerDefinition
Examples
from dagster_gcp import build_bigquery_io_manager
from dagster_bigquery_pandas import BigQueryPandasTypeHandler
from dagster import Definitions
@asset(
key_prefix=["my_prefix"],
metadata={"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
@asset(
key_prefix=["my_dataset"] # my_dataset will be used as the dataset in BigQuery
)
def my_second_table() -> pd.DataFrame: # the name of the asset will be the table name
...
bigquery_io_manager = build_bigquery_io_manager([BigQueryPandasTypeHandler()])
defs = Definitions(
assets=[my_table, my_second_table],
resources={
"io_manager": bigquery_io_manager.configured({
"project" : {"env": "GCP_PROJECT"}
})
}
)
You can set a default dataset to store the assets using the dataset
configuration value of the BigQuery I/O
Manager. This dataset will be used if no other dataset is specified directly on an asset or op.
defs = Definitions(
assets=[my_table],
resources={
"io_manager": bigquery_io_manager.configured({
"project" : {"env": "GCP_PROJECT"}
"dataset": "my_dataset"
})
}
)
On individual assets, you an also specify the dataset where they should be stored using metadata or
by adding a key_prefix
to the asset key. If both key_prefix
and metadata are defined, the metadata will
take precedence.
@asset(
key_prefix=["my_dataset"] # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:
...
@asset(
# note that the key needs to be "schema"
metadata={"schema": "my_dataset"} # will be used as the dataset in BigQuery
)
def my_other_table() -> pd.DataFrame:
...
For ops, the dataset can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
If none of these is provided, the dataset will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata columns
to the
In
or AssetIn
.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
# my_table will just contain the data from column "a"
...
If you cannot upload a file to your Dagster deployment, or otherwise cannot
authenticate with GCP
via a standard method, you can provide a service account key as the gcp_credentials
configuration.
Dagster willstore this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS
to point to the file.
After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS
will be
unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve
the base64 encoded with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64
Project name
Default Value: None
GCS bucket to store files
Prefix to add to all file paths
Default Value: ‘dagster’
Persistent IO manager using GCS for storage.
Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for GCS and the backing bucket.
Assigns each op output to a unique filepath containing run ID, step key, and output name.
Assigns each asset to a single filesystem path, at <base_dir>/<asset_key>
. If the asset key
has multiple components, the final component is used as the name of the file, and the preceding
components as parent directories under the base_dir.
Subsequent materializations of an asset will overwrite previous materializations of that asset.
With a base directory of /my/base/path
, an asset with key
AssetKey(["one", "two", "three"])
would be stored in a file called three
in a directory
with path /my/base/path/one/two/
.
Example usage:
Attach this IO manager to a set of assets.
from dagster import Definitions, asset
from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return asset1[:5]
defs = Definitions(
assets=[asset1, asset2],
resources={
"io_manager": gcs_pickle_io_manager.configured(
{"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
),
"gcs": gcs_resource.configured({"project": "my-cool-project"}),
},
)
Attach this IO manager to your job to make it available to your ops.
from dagster import job
from dagster_gcp.gcs import gcs_pickle_io_manager, gcs_resource
@job(
resource_defs={
"io_manager": gcs_pickle_io_manager.configured(
{"gcs_bucket": "my-cool-bucket", "gcs_prefix": "my-cool-prefix"}
),
"gcs": gcs_resource.configured({"project": "my-cool-project"}),
},
)
def my_job():
...
FileManager that provides abstract access to GCS.
Implements the FileManager
API.
Required. Project ID for the project which the client acts on behalf of. Will be passed when creating a dataset / job. If not passed, falls back to the default inferred from the environment.
Required. The cluster name. Cluster names within a project must be unique. Names of deleted clusters can be reused.
The cluster config.
Optional. The config settings for Compute Engine resources in an instance group, such as a master or worker group.
Optional. The Compute Engine accelerator configuration for these instances.Beta Feature: This feature is still under development. It may be changed before final release.
Optional. The number of VM instances in the instance group. For master instance groups, must be set to 1.
Specifies the config of disk options for a group of VM instances.
Optional. Number of attached SSDs, from 0 to 4 (default is 0). If SSDs are not attached, the boot disk is used to store runtime logs and HDFS (https://hadoop.apache.org/docs/r1.2.1/hdfs_user_guide.html) data. If one or more SSDs are attached, this runtime bulk data is spread across them, and the boot disk contains only basic config and installed binaries.
Optional. Size in GB of the boot disk (default is 500GB).
Optional. Type of the boot disk (default is “pd-standard”). Valid values: “pd-ssd” (Persistent Disk Solid State Drive) or “pd-standard” (Persistent Disk Hard Disk Drive).
Specifies the resources used to actively manage an instance group.
Optional. Specifies that this instance group contains preemptible instances.
Optional. The Compute Engine image resource used for cluster instances. It can be specified or may be inferred from SoftwareConfig.image_version.
Optional. The Compute Engine machine type used for cluster instances.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 n1-standard-2Auto Zone Exception: If you are using the Cloud Dataproc Auto Zone Placement feature, you must use the short name of the machine type resource, for example, n1-standard-2.
Optional. The config settings for Compute Engine resources in an instance group, such as a master or worker group.
Optional. The Compute Engine accelerator configuration for these instances.Beta Feature: This feature is still under development. It may be changed before final release.
Optional. The number of VM instances in the instance group. For master instance groups, must be set to 1.
Specifies the config of disk options for a group of VM instances.
Optional. Number of attached SSDs, from 0 to 4 (default is 0). If SSDs are not attached, the boot disk is used to store runtime logs and HDFS (https://hadoop.apache.org/docs/r1.2.1/hdfs_user_guide.html) data. If one or more SSDs are attached, this runtime bulk data is spread across them, and the boot disk contains only basic config and installed binaries.
Optional. Size in GB of the boot disk (default is 500GB).
Optional. Type of the boot disk (default is “pd-standard”). Valid values: “pd-ssd” (Persistent Disk Solid State Drive) or “pd-standard” (Persistent Disk Hard Disk Drive).
Specifies the resources used to actively manage an instance group.
Optional. Specifies that this instance group contains preemptible instances.
Optional. The Compute Engine image resource used for cluster instances. It can be specified or may be inferred from SoftwareConfig.image_version.
Optional. The Compute Engine machine type used for cluster instances.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 n1-standard-2Auto Zone Exception: If you are using the Cloud Dataproc Auto Zone Placement feature, you must use the short name of the machine type resource, for example, n1-standard-2.
Encryption settings for the cluster.
Optional. The Cloud KMS key name to use for PD disk encryption for all instances in the cluster.
Security related configuration, including Kerberos.
Specifies Kerberos related configuration.
Optional. The Cloud Storage URI of a KMS encrypted file containing the password to the user provided truststore. For the self-signed certificate, this password is generated by Dataproc.
Optional. Flag to indicate whether to Kerberize the cluster.
Optional. The Cloud Storage URI of the truststore file used for SSL encryption. If not provided, Dataproc will provide a self-signed certificate.
Optional. The remote realm the Dataproc on-cluster KDC will trust, should the user enable cross realm trust.
Required. The Cloud Storage URI of a KMS encrypted file containing the root principal password.
Required. The uri of the KMS key used to encrypt various sensitive files.
Optional. The KDC (IP or hostname) for the remote trusted realm in a cross realm trust relationship.
Optional. The Cloud Storage URI of a KMS encrypted file containing the shared password between the on-cluster Kerberos realm and the remote trusted realm, in a cross realm trust relationship.
Optional. The lifetime of the ticket granting ticket, in hours. If not specified, or user specifies 0, then default value 10 will be used.
Optional. The Cloud Storage URI of the keystore file used for SSL encryption. If not provided, Dataproc will provide a self-signed certificate.
Optional. The Cloud Storage URI of a KMS encrypted file containing the password to the user provided key. For the self-signed certificate, this password is generated by Dataproc.
Optional. The Cloud Storage URI of a KMS encrypted file containing the password to the user provided keystore. For the self-signed certificate, this password is generated by Dataproc.
Optional. The admin server (IP or hostname) for the remote trusted realm in a cross realm trust relationship.
Optional. The Cloud Storage URI of a KMS encrypted file containing the master key of the KDC database.
Optional. Commands to execute on each node after config is completed. By default, executables are run on master and all worker nodes. You can test a node’s role metadata to run an executable on a master or worker node, as shown below using curl (you can also use wget): ROLE=$(curl -H Metadata-Flavor:Google http://metadata/computeMetadata/v1/instance/attributes/dataproc-role) if [[ “${ROLE}” == ‘Master’ ]]; then … master specific actions … else … worker specific actions … fi
Optional. A Google Cloud Storage bucket used to stage job dependencies, config files, and job driver console output. If you do not specify a staging bucket, Cloud Dataproc will determine a Cloud Storage location (US, ASIA, or EU) for your cluster’s staging bucket according to the Google Compute Engine zone where your cluster is deployed, and then create and manage this project-level, per-location bucket (see Cloud Dataproc staging bucket).
Optional. The config settings for Compute Engine resources in an instance group, such as a master or worker group.
Optional. The Compute Engine accelerator configuration for these instances.Beta Feature: This feature is still under development. It may be changed before final release.
Optional. The number of VM instances in the instance group. For master instance groups, must be set to 1.
Specifies the config of disk options for a group of VM instances.
Optional. Number of attached SSDs, from 0 to 4 (default is 0). If SSDs are not attached, the boot disk is used to store runtime logs and HDFS (https://hadoop.apache.org/docs/r1.2.1/hdfs_user_guide.html) data. If one or more SSDs are attached, this runtime bulk data is spread across them, and the boot disk contains only basic config and installed binaries.
Optional. Size in GB of the boot disk (default is 500GB).
Optional. Type of the boot disk (default is “pd-standard”). Valid values: “pd-ssd” (Persistent Disk Solid State Drive) or “pd-standard” (Persistent Disk Hard Disk Drive).
Specifies the resources used to actively manage an instance group.
Optional. Specifies that this instance group contains preemptible instances.
Optional. The Compute Engine image resource used for cluster instances. It can be specified or may be inferred from SoftwareConfig.image_version.
Optional. The Compute Engine machine type used for cluster instances.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 projects/[project_id]/zones/us-east1-a/machineTypes/n1-standard-2 n1-standard-2Auto Zone Exception: If you are using the Cloud Dataproc Auto Zone Placement feature, you must use the short name of the machine type resource, for example, n1-standard-2.
Common config settings for resources of Compute Engine cluster instances, applicable to all instances in the cluster.
Optional. The Compute Engine network to be used for machine communications. Cannot be specified with subnetwork_uri. If neither network_uri nor subnetwork_uri is specified, the “default” network of the project is used, if it exists. Cannot be a “Custom Subnet Network” (see Using Subnetworks for more information).A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/regions/global/default projects/[project_id]/regions/global/default default
Optional. The zone where the Compute Engine cluster will be located. On a create request, it is required in the “global” region. If omitted in a non-global Cloud Dataproc region, the service will pick a zone in the corresponding Compute Engine region. On a get request, zone will always be present.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/zones/[zone] projects/[project_id]/zones/[zone] us-central1-f
The Compute Engine metadata entries to add to all instances (see Project and instance metadata (https://cloud.google.com/compute/docs/storing-retrieving-metadata#project_and_instance_metadata)).
Optional. If true, all instances in the cluster will only have internal IP addresses. By default, clusters are not restricted to internal IP addresses, and will have ephemeral external IP addresses assigned to each instance. This internal_ip_only restriction can only be enabled for subnetwork enabled networks, and all off-cluster dependencies must be configured to be accessible without external IP addresses.
Optional. The URIs of service account scopes to be included in Compute Engine instances. The following base set of scopes is always included: https://www.googleapis.com/auth/cloud.useraccounts.readonly https://www.googleapis.com/auth/devstorage.read_write https://www.googleapis.com/auth/logging.writeIf no scopes are specified, the following defaults are also provided: https://www.googleapis.com/auth/bigquery https://www.googleapis.com/auth/bigtable.admin.table https://www.googleapis.com/auth/bigtable.data https://www.googleapis.com/auth/devstorage.full_control
The Compute Engine tags to add to all instances (see Tagging instances).
Optional. The service account of the instances. Defaults to the default Compute Engine service account. Custom service accounts need permissions equivalent to the following IAM roles: roles/logging.logWriter roles/storage.objectAdmin(see https://cloud.google.com/compute/docs/access/service-accounts#custom_service_accounts for more information). Example: [account_id]@[project_id].iam.gserviceaccount.com
Optional. The Compute Engine subnetwork to be used for machine communications. Cannot be specified with network_uri.A full URL, partial URI, or short name are valid. Examples: https://www.googleapis.com/compute/v1/projects/[project_id]/regions/us-east1/subnetworks/sub0 projects/[project_id]/regions/us-east1/subnetworks/sub0 sub0
Specifies the selection and config of software inside the cluster.
Optional. The properties to set on daemon config files.Property keys are specified in prefix:property format, for example core:hadoop.tmp.dir. The following are supported prefixes and their mappings: capacity-scheduler: capacity-scheduler.xml core: core-site.xml distcp: distcp-default.xml hdfs: hdfs-site.xml hive: hive-site.xml mapred: mapred-site.xml pig: pig.properties spark: spark-defaults.conf yarn: yarn-site.xmlFor more information, see Cluster properties.
The set of optional components to activate on the cluster.
Optional. The version of software inside the cluster. It must be one of the supported Cloud Dataproc Versions, such as “1.2” (including a subminor version, such as “1.2.29”), or the “preview” version. If unspecified, it defaults to the latest Debian version.
Optional. The labels to associate with this cluster. Label keys must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if present, must contain 1 to 63 characters, and must conform to RFC 1035 (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated with a cluster.