Dagster
Dagster Reference
Section titled “Dagster Reference”Comprehensive reference for Dagster — the data orchestration platform used by bioloupe-data.
1. Overview
Section titled “1. Overview”Dagster is a data orchestration platform built around Software-Defined Assets — the philosophy that pipelines should be defined in terms of the data artifacts they produce, not the tasks they run. Instead of writing “fetch data, then transform, then load,” you declare “this table exists, here’s how to compute it, and here are its dependencies.”
Core principles:
- Assets over tasks — model your pipeline as a graph of data assets, not a DAG of tasks
- Testability — assets are plain Python functions, directly invocable in tests
- Observability — built-in UI showing asset lineage, materialization history, and metadata
- Environment flexibility — swap resources (DB connections, API clients) between dev/staging/prod
2. Core Concepts
Section titled “2. Core Concepts”2.1 Assets
Section titled “2.1 Assets”An asset is a persistent data artifact (table, file, ML model) defined by a Python function decorated with @dg.asset.
Basic asset
Section titled “Basic asset”import dagster as dg
@dg.assetdef raw_orders() -> None: # Fetch and persist order data ...Asset with dependencies, groups, and owners
Section titled “Asset with dependencies, groups, and owners”@dg.assetdef daily_sales() -> None: ...
@dg.asset(deps=[daily_sales], group_name="sales")def weekly_sales() -> None: ...
@dg.asset( deps=[weekly_sales], owners=["bighead@hooli.com", "team:data"],)def weekly_sales_report(context: dg.AssetExecutionContext): context.log.info("Generating report")deps— upstream assets this asset depends ongroup_name— logical grouping in the UIowners— who is responsible for this asset
Code versioning
Section titled “Code versioning”Prevents redundant computation when code hasn’t changed:
@dg.asset(code_version="1")def asset_with_version(): with open("data/output.json", "w") as f: json.dump(100, f)Multi-part keys (key_prefix)
Section titled “Multi-part keys (key_prefix)”Hierarchical namespacing for assets:
@dg.asset(key_prefix=["staging", "sales"])def orders(): return [1, 2, 3]
@dg.asset(ins={"orders": dg.AssetIn(key_prefix=["staging", "sales"])})def enriched_orders(orders): return orders + [4]Context access
Section titled “Context access”@dg.assetdef context_asset(context: dg.AssetExecutionContext): context.log.info(f"Run ID: {context.run.run_id}")Multi-assets
Section titled “Multi-assets”Produce multiple assets from a single computation:
@dg.multi_asset(specs=[dg.AssetSpec("asset_one"), dg.AssetSpec("asset_two")])def my_multi_asset(): yield dg.MaterializeResult(asset_key="asset_one", metadata={"num_rows": 10}) yield dg.MaterializeResult(asset_key="asset_two", metadata={"num_rows": 24})Graph-backed assets (with retry policies)
Section titled “Graph-backed assets (with retry policies)”Compose multiple ops into a single asset:
@dg.op( retry_policy=dg.RetryPolicy( max_retries=5, delay=0.2, backoff=dg.Backoff.EXPONENTIAL, jitter=dg.Jitter.PLUS_MINUS, ))def step_one() -> int: return 42
@dg.opdef step_two(num: int): return num ** 2
@dg.graph_assetdef complex_asset(): return step_two(step_one())2.2 Asset Checks
Section titled “2.2 Asset Checks”Data quality validations that run against assets. They do not consume Dagster+ credits.
Single check
Section titled “Single check”import pandas as pdimport dagster as dg
@dg.assetdef orders(): orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]}) orders_df.to_csv("orders.csv")
@dg.asset_check(asset=orders)def orders_id_has_no_nulls(): orders_df = pd.read_csv("orders.csv") num_null_order_ids = orders_df["order_id"].isna().sum() return dg.AssetCheckResult(passed=bool(num_null_order_ids == 0))Blocking checks (prevent downstream materialization on failure)
Section titled “Blocking checks (prevent downstream materialization on failure)”@dg.asset_check(asset=orders, blocking=True)def orders_id_has_no_nulls(): orders_df = pd.read_csv("orders.csv") num_null_order_ids = orders_df["order_id"].isna().sum() return dg.AssetCheckResult(passed=bool(num_null_order_ids == 0))Multi-asset checks
Section titled “Multi-asset checks”from collections.abc import Iterable
@dg.multi_asset_check( specs=[ dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"), dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"), ])def orders_check() -> Iterable[dg.AssetCheckResult]: orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum() yield dg.AssetCheckResult( check_name="orders_id_has_no_nulls", passed=bool(num_null_order_ids == 0), asset_key="orders", )
num_null_item_ids = orders_df["item_id"].isna().sum() yield dg.AssetCheckResult( check_name="items_id_has_no_nulls", passed=bool(num_null_item_ids == 0), asset_key="orders", )Separate schedules for assets and checks
Section titled “Separate schedules for assets and checks”asset_job = dg.define_asset_job( "asset_job", selection=dg.AssetSelection.assets(orders).without_checks(),)check_job = dg.define_asset_job( "check_job", selection=dg.AssetSelection.checks_for_assets(orders),)
asset_schedule = dg.ScheduleDefinition(job=asset_job, cron_schedule="0 0 * * *")check_schedule = dg.ScheduleDefinition(job=check_job, cron_schedule="0 6 * * *")2.3 Resources
Section titled “2.3 Resources”Resources provide access to external systems (databases, APIs, cloud services). They are defined by subclassing ConfigurableResource and injected into assets via function parameters.
Defining a resource
Section titled “Defining a resource”import dagster as dgimport requestsfrom requests import Response
class MyConnectionResource(dg.ConfigurableResource): username: str
def request(self, endpoint: str) -> Response: return requests.get( f"https://my-api.com/{endpoint}", headers={"user-agent": "dagster"}, )Using a resource in an asset
Section titled “Using a resource in an asset”from typing import Any
@dg.assetdef data_from_service(my_conn: MyConnectionResource) -> dict[str, Any]: return my_conn.request("/fetch_data").json()Registering resources in Definitions
Section titled “Registering resources in Definitions”defs = dg.Definitions( assets=[data_from_service], resources={ "my_conn": MyConnectionResource(username="admin"), },)Environment variables
Section titled “Environment variables”Use EnvVar to read secrets at runtime (visible in UI as env var references, not plaintext):
defs = dg.Definitions( resources={ "my_conn": MyConnectionResource(username=dg.EnvVar("API_USERNAME")), },)Key APIs
Section titled “Key APIs”| API | Purpose |
|---|---|
ConfigurableResource | Base class for defining resources |
EnvVar | Read config from environment variables |
ResourceParam | Annotate a parameter as a resource |
build_init_resource_context() | Create context for testing resources |
build_resources() | Initialize resources outside execution |
2.4 Automation
Section titled “2.4 Automation”Dagster offers five automation methods: Schedules, Declarative Automation, Sensors, Asset Sensors, and GraphQL triggers.
Schedules
Section titled “Schedules”Run assets on a cron schedule:
import dagster as dg
@dg.assetdef customer_data(): ...
@dg.assetdef sales_report(): ...
daily_schedule = dg.ScheduleDefinition( name="daily_refresh", cron_schedule="0 0 * * *", # midnight daily target=[customer_data, sales_report],)
defs = dg.Definitions(schedules=[daily_schedule])With timezone:
daily_schedule = dg.ScheduleDefinition( job=daily_refresh_job, cron_schedule="0 0 * * *", execution_timezone="America/New_York",)Auto-schedule from partitioned job:
daily_partition = dg.DailyPartitionsDefinition(start_date="2024-05-20")
@dg.asset(partitions_def=daily_partition)def daily_asset(): ...
partitioned_asset_job = dg.define_asset_job( "partitioned_job", selection=[daily_asset])
asset_partitioned_schedule = dg.build_schedule_from_partitioned_job( partitioned_asset_job,)Declarative Automation (AutomationCondition)
Section titled “Declarative Automation (AutomationCondition)”Set conditions directly on assets. Requires enabling default_automation_condition_sensor in the UI.
on_cron — execute on schedule after upstream updates:
@dg.asset( deps=["upstream"], automation_condition=dg.AutomationCondition.on_cron("@hourly"),)def hourly_asset() -> None: ...eager — update whenever any upstream dependency changes:
@dg.asset( deps=["upstream"], automation_condition=dg.AutomationCondition.eager(),)def eager_asset() -> None: ...on_missing — materialize once all upstream partitions exist:
@dg.asset( deps=[upstream], automation_condition=dg.AutomationCondition.on_missing(), partitions_def=dg.DailyPartitionsDefinition("2025-01-01"),)def downstream() -> None: ...Sensors
Section titled “Sensors”Event-driven triggers that poll for conditions:
@dg.sensor(job=my_job)def my_sensor(context: dg.SensorEvaluationContext): # Check for new files, API events, etc. if new_data_available(): return dg.RunRequest(run_key="unique_key")Sensors with dynamic partitions:
@dg.sensor(job=regional_sales_job)def all_regions_sensor(context: dg.SensorEvaluationContext): all_regions = ["us", "eu", "jp", "ca", "uk", "au"] return dg.SensorResult( run_requests=[dg.RunRequest(partition_key=region) for region in all_regions], dynamic_partitions_requests=[region_partitions.build_add_request(all_regions)], )2.5 Partitions
Section titled “2.5 Partitions”Partitions split assets into independently materializable chunks — essential for incremental processing.
Recommendation: Keep partition count under 100,000 per asset for UI performance.
Time-based partitions
Section titled “Time-based partitions”daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(partitions_def=daily_partitions)def daily_sales_data(context: dg.AssetExecutionContext) -> None: date = context.partition_key df = pd.DataFrame({ "date": [date] * 10, "sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000], }) os.makedirs("data/daily_sales", exist_ok=True) df.to_csv(f"data/daily_sales/sales_{date}.csv", index=False)Other time-based definitions: WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, HourlyPartitionsDefinition.
Static partitions
Section titled “Static partitions”region_partitions = dg.StaticPartitionsDefinition(["us", "eu", "jp"])
@dg.asset(partitions_def=region_partitions)def regional_sales_data(context: dg.AssetExecutionContext) -> None: region = context.partition_key ...Dynamic partitions
Section titled “Dynamic partitions”Partitions created at runtime (e.g., new regions discovered via API):
region_partitions = dg.DynamicPartitionsDefinition(name="regions")
@dg.asset(partitions_def=region_partitions)def regional_sales_data(context: dg.AssetExecutionContext) -> None: region = context.partition_key ...Multi-dimensional partitions
Section titled “Multi-dimensional partitions”two_dimensional = dg.MultiPartitionsDefinition({ "date": dg.DailyPartitionsDefinition(start_date="2024-01-01"), "region": dg.StaticPartitionsDefinition(["us", "eu", "jp"]),})
@dg.asset(partitions_def=two_dimensional)def daily_regional_sales(context: dg.AssetExecutionContext) -> None: keys = context.partition_key.keys_by_dimension date = keys["date"] region = keys["region"] ...Custom calendar (exclude holidays)
Section titled “Custom calendar (exclude holidays)”from datetime import datetime
market_holidays = [ datetime.strptime(d, "%Y-%m-%d") for d in ["2024-01-01", "2024-07-04", "2024-12-25"]]
market_calendar = dg.TimeWindowPartitionsDefinition( start=datetime(2024, 1, 1), cron_schedule="0 0 * * 1-5", # weekdays only fmt="%Y-%m-%d", exclusions=market_holidays,)Scheduling partitioned jobs
Section titled “Scheduling partitioned jobs”import datetime
@dg.schedule(job=daily_sales_job, cron_schedule="0 1 * * *")def daily_sales_schedule(context): previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1) date = previous_day.strftime("%Y-%m-%d") return dg.RunRequest(run_key=date, partition_key=date)2.6 I/O Managers
Section titled “2.6 I/O Managers”I/O managers separate data processing logic from storage logic. Assets return data; the I/O manager decides where and how to persist it.
When to use
Section titled “When to use”- Assets stored in consistent locations with predictable paths
- You want to swap storage backends between environments (DuckDB local, Postgres prod)
- Upstream data fits in memory
When NOT to use
Section titled “When NOT to use”- SQL-based table creation (just run the SQL directly in the asset)
- Assets exceeding memory
- Pipelines managing I/O independently
Basic pattern
Section titled “Basic pattern”import pandas as pdimport dagster as dg
@dg.assetdef raw_sales_data() -> pd.DataFrame: return pd.read_csv("https://example.com/sales.csv")
@dg.assetdef clean_sales_data(raw_sales_data: pd.DataFrame) -> pd.DataFrame: return raw_sales_data.fillna({"amount": 0.0})
@dg.assetdef sales_summary(clean_sales_data: pd.DataFrame) -> pd.DataFrame: return clean_sales_data.groupby(["owner"])["amount"].sum().reset_index()Configure storage via the I/O manager resource:
from dagster_duckdb_pandas import DuckDBPandasIOManager
defs = dg.Definitions( assets=[raw_sales_data, clean_sales_data, sales_summary], resources={ "io_manager": DuckDBPandasIOManager( database="sales.duckdb", schema="public", ) },)Built-in I/O managers exist for: S3, Azure ADLS2, GCS, BigQuery, Snowflake, DuckDB, and Postgres — supporting Pandas, PySpark, and Polars DataFrames.
2.7 Logging
Section titled “2.7 Logging”Dagster captures two types of logs:
- Structured event logs — enriched with metadata, filterable in UI
- Raw compute logs — stdout/stderr streams
Using context.log
Section titled “Using context.log”@dg.assetdef my_asset(context: dg.AssetExecutionContext): context.log.info("Processing started") context.log.warning("Missing optional field") context.log.error("Failed to connect")Configuring log levels
Section titled “Configuring log levels”In run config (YAML):
loggers: console: config: log_level: ERRORSupports standard Python levels: DEBUG, INFO, WARNING, ERROR, CRITICAL.
3. Key Patterns
Section titled “3. Key Patterns”3.1 Asset Dependencies and Lineage
Section titled “3.1 Asset Dependencies and Lineage”Dependencies create a directed acyclic graph visible in the Dagster UI:
@dg.assetdef raw_data(): ...
@dg.asset(deps=[raw_data])def cleaned_data(): ...
@dg.asset(deps=[cleaned_data])def analytics(): ...For data passed between assets (not just ordering), use function parameters:
@dg.assetdef raw_data() -> pd.DataFrame: return pd.read_csv("data.csv")
@dg.assetdef cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame: return raw_data.dropna()3.2 Incremental Materialization
Section titled “3.2 Incremental Materialization”Use partitions to process only new data:
daily = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(partitions_def=daily)def incremental_table(context: dg.AssetExecutionContext): date = context.partition_key # Only process this date's data new_rows = fetch_data_for_date(date) upsert_to_database(new_rows)3.3 Retry Policies
Section titled “3.3 Retry Policies”@dg.asset( retry_policy=dg.RetryPolicy( max_retries=3, delay=1, backoff=dg.Backoff.EXPONENTIAL, ))def flaky_api_asset(): return call_unreliable_api()Or at the op level within a graph asset:
@dg.op( retry_policy=dg.RetryPolicy( max_retries=5, delay=0.2, backoff=dg.Backoff.EXPONENTIAL, jitter=dg.Jitter.PLUS_MINUS, ))def unreliable_step() -> int: ...3.4 External Assets
Section titled “3.4 External Assets”Track data produced outside Dagster without managing its computation:
raw_clickstream = dg.AssetSpec("raw_clickstream") # produced by an external system
@dg.asset(deps=[raw_clickstream])def processed_clickstream(): # Dagster doesn't materialize raw_clickstream, but knows it's a dependency ...4. Integrations
Section titled “4. Integrations”4.1 PostgreSQL
Section titled “4.1 PostgreSQL”Install: pip install dagster-postgres (or uv add dagster-postgres)
The dagster-postgres library provides:
- PostgreSQL-backed storage for Dagster’s own metadata (run storage, event log, schedule storage)
- I/O managers for reading/writing asset data to Postgres tables
Latest version: 1.12.19
For bioloupe-data, Postgres is primarily used as both the orchestration backend and the target for asset materialization.
4.2 OpenAI / LLM
Section titled “4.2 OpenAI / LLM”Install: pip install dagster-openai (or uv add dagster-openai)
from dagster_openai import OpenAIResourcefrom dagster import EnvVar
# Resource definitionopenai = OpenAIResource(api_key=EnvVar("OPENAI_API_KEY"))Using in an asset:
from dagster_openai import OpenAIResourcefrom dagster import AssetExecutionContext, Definitions, EnvVar, asset, define_asset_job
@asset(compute_kind="OpenAI")def openai_asset(context: AssetExecutionContext, openai: OpenAIResource): with openai.get_client(context) as client: client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Say this is a test."}], )
defs = Definitions( assets=[openai_asset], resources={ "openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")), },)Key notes:
- Usage metadata (tokens, costs) automatically logged in asset metadata
- Dagster+ users see LLM usage in the Insights dashboard
- Use
openai.get_client(context)as a context manager for automatic tracking
4.3 Dagster Pipes (TypeScript / External Processes)
Section titled “4.3 Dagster Pipes (TypeScript / External Processes)”Dagster Pipes invokes code outside of Python (TypeScript, Rust, shell scripts) while retaining Dagster’s scheduling, reporting, and observability.
Dagster side (Python)
Section titled “Dagster side (Python)”import shutilimport dagster as dg
@dg.assetdef wrapper_asset( context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient,) -> None: return pipes_subprocess_client.run( command=[shutil.which("node"), "scripts/process.ts"], context=context, ).get_results()
defs = dg.Definitions( assets=[wrapper_asset], resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},)External side (reports back to Dagster)
Section titled “External side (reports back to Dagster)”For Python external processes, use dagster-pipes:
from dagster_pipes import PipesContext, open_dagster_pipes
def main(): with open_dagster_pipes(): context = PipesContext.get() context.log.info("Processing data") context.report_asset_materialization( metadata={"total_orders": 2} )
if __name__ == "__main__": main()For non-Python (TypeScript/Node), the external process can write structured JSON to stdout/stderr following the Pipes protocol, or use environment variables set by the PipesSubprocessClient to communicate results.
5. Testing
Section titled “5. Testing”Dagster assets are plain Python functions — call them directly in tests.
Direct invocation (simplest)
Section titled “Direct invocation (simplest)”import dagster as dg
@dg.assetdef loaded_file() -> str: with open("path.txt") as file: return file.read()
def test_loaded_file(): assert loaded_file() == "contents"Testing with dependencies
Section titled “Testing with dependencies”Pass upstream values as arguments:
@dg.assetdef processed_file(loaded_file: str) -> str: return loaded_file.strip()
def test_processed_file(): assert processed_file(" contents ") == "contents"Testing with config
Section titled “Testing with config”class FilepathConfig(dg.Config): path: str
@dg.assetdef loaded_file(config: FilepathConfig) -> str: with open(config.path) as file: return file.read()
def test_loaded_file(): assert loaded_file(FilepathConfig(path="path1.txt")) == "contents1"Mocking resources
Section titled “Mocking resources”from unittest import mock
@dg.assetdef loaded_file(file_manager: S3FileManager) -> str: return file_manager.read_data(S3FileHandle("bucket", "path.txt"))
def test_loaded_file(): mocked = mock.Mock(spec=S3FileManager) mocked.read_data.return_value = "contents" assert loaded_file(mocked) == "contents"Testing with context (partitioned assets)
Section titled “Testing with context (partitioned assets)”@dg.asset(partitions_def=dg.DailyPartitionsDefinition("2024-01-01"))def daily_data(context: dg.AssetExecutionContext) -> str: return f"data for {context.partition_key}"
def test_daily_data(): context = dg.build_asset_context(partition_key="2024-08-16") assert daily_data(context) == "data for 2024-08-16"Running tests
Section titled “Running tests”pytest tests/6. Deployment
Section titled “6. Deployment”6.1 Local Development
Section titled “6.1 Local Development”dagster devStarts the Dagster webserver and daemon locally. The UI is available at http://localhost:3000.
6.2 Dagster+ (Cloud)
Section titled “6.2 Dagster+ (Cloud)”Managed orchestration platform. Two deployment flavors:
- Serverless — fully managed, code runs in Dagster’s environment
- Hybrid — you maintain execution infrastructure, Dagster manages the control plane (UI, metadata, scheduling)
Features beyond OSS:
- Insights — cost optimization, performance analytics
- Alerts — Slack, PagerDuty, email for failures and SLA violations
- RBAC — role-based access control, audit logging
- Asset Catalog — search, lineage, column-level tracking
- Branch Deployments — staging environments per git branch
- Compliance — SOC 2 Type II, HIPAA, GDPR
6.3 Self-Hosted (OSS)
Section titled “6.3 Self-Hosted (OSS)”Deploy Dagster yourself via Docker, Kubernetes, or bare metal. You manage the webserver, daemon, and database backend.
7. Pricing (Dagster+)
Section titled “7. Pricing (Dagster+)”| Feature | Solo | Starter | Pro |
|---|---|---|---|
| Price | $10/mo | $100/mo | Contact sales |
| Credits/mo | 7,500 | 30,000 | Unlimited |
| Users | 1 | Up to 3 | Unlimited |
| Code locations | 1 | 5 | Unlimited |
| Deployments | 1 | 1 | Unlimited |
| Free trial | 30 days | 30 days | — |
All plans include: asset-based orchestration, branch deployments, event-based automations, alerts, partitions, backfills, asset quality/freshness checks, metadata and lineage.
Pro-only: column-level lineage, catalog focus-mode, custom metrics, cost tracking (BigQuery/Snowflake), team management, audit logs, SAML.
Compute pricing: $0.005/compute-minute (Serverless, Solo/Starter). Pro uses flat-fee pricing.
Overage: $0.03/credit beyond plan allotment.
Solo at $10/mo is the right starting tier for bioloupe-data — 1 user, 1 code location, 7,500 credits/month.
8. Best Practices
Section titled “8. Best Practices”Asset granularity
Section titled “Asset granularity”- One table = one asset. If a function produces a single table/file, it should be one
@asset. - Use
@multi_assetonly when a single computation genuinely produces multiple artifacts. - Keep assets focused — a 200-line asset doing fetch + clean + transform + load should be split.
Resource patterns
Section titled “Resource patterns”- Define resources as
ConfigurableResourcesubclasses with typed fields. - Use
EnvVarfor secrets — never hardcode credentials. - Create environment-specific
Definitions: dev resources point to local/test databases, prod resources point to RDS.
Error handling
Section titled “Error handling”- Use
retry_policyon assets hitting flaky external services. - Use
blocking=Trueon asset checks to prevent bad data from propagating downstream. - Log context with
context.log.info()/context.log.error()for debuggability.
When to use ops vs assets
Section titled “When to use ops vs assets”- Use assets (default) — for any persistent data artifact: tables, files, models, embeddings.
- Use ops — only for side-effect-only work (sending emails, triggering webhooks) that doesn’t produce a data artifact, or when composing steps within a
@graph_asset. - Rule of thumb: if the result should appear in the asset graph, use
@asset. If it’s purely operational, use@op.
Partitions strategy
Section titled “Partitions strategy”- Use
DailyPartitionsDefinitionfor data that arrives daily (most common). - Use
StaticPartitionsDefinitionfor fixed categories (regions, data sources). - Use
DynamicPartitionsDefinitionwhen categories are discovered at runtime. - Keep partitions under 100,000 per asset.
- Combine with
build_schedule_from_partitioned_jobfor automatic scheduling.
Project structure
Section titled “Project structure”bioloupe-data/ dagster/ __init__.py assets/ __init__.py ingestion.py # @asset: raw data from APIs/files enrichment.py # @asset: LLM enrichment, geocoding analytics.py # @asset: aggregations, summaries resources/ __init__.py postgres.py # ConfigurableResource for RDS openai.py # OpenAI resource wrapper checks/ __init__.py data_quality.py # @asset_check definitions definitions.py # Definitions(...) entry point tests/ test_assets.py test_resources.py