Skip to content

Dagster

Comprehensive reference for Dagster — the data orchestration platform used by bioloupe-data.


Dagster is a data orchestration platform built around Software-Defined Assets — the philosophy that pipelines should be defined in terms of the data artifacts they produce, not the tasks they run. Instead of writing “fetch data, then transform, then load,” you declare “this table exists, here’s how to compute it, and here are its dependencies.”

Core principles:

  • Assets over tasks — model your pipeline as a graph of data assets, not a DAG of tasks
  • Testability — assets are plain Python functions, directly invocable in tests
  • Observability — built-in UI showing asset lineage, materialization history, and metadata
  • Environment flexibility — swap resources (DB connections, API clients) between dev/staging/prod

An asset is a persistent data artifact (table, file, ML model) defined by a Python function decorated with @dg.asset.

import dagster as dg
@dg.asset
def raw_orders() -> None:
# Fetch and persist order data
...

Asset with dependencies, groups, and owners

Section titled “Asset with dependencies, groups, and owners”
@dg.asset
def daily_sales() -> None:
...
@dg.asset(deps=[daily_sales], group_name="sales")
def weekly_sales() -> None:
...
@dg.asset(
deps=[weekly_sales],
owners=["bighead@hooli.com", "team:data"],
)
def weekly_sales_report(context: dg.AssetExecutionContext):
context.log.info("Generating report")
  • deps — upstream assets this asset depends on
  • group_name — logical grouping in the UI
  • owners — who is responsible for this asset

Prevents redundant computation when code hasn’t changed:

@dg.asset(code_version="1")
def asset_with_version():
with open("data/output.json", "w") as f:
json.dump(100, f)

Hierarchical namespacing for assets:

@dg.asset(key_prefix=["staging", "sales"])
def orders():
return [1, 2, 3]
@dg.asset(ins={"orders": dg.AssetIn(key_prefix=["staging", "sales"])})
def enriched_orders(orders):
return orders + [4]
@dg.asset
def context_asset(context: dg.AssetExecutionContext):
context.log.info(f"Run ID: {context.run.run_id}")

Produce multiple assets from a single computation:

@dg.multi_asset(specs=[dg.AssetSpec("asset_one"), dg.AssetSpec("asset_two")])
def my_multi_asset():
yield dg.MaterializeResult(asset_key="asset_one", metadata={"num_rows": 10})
yield dg.MaterializeResult(asset_key="asset_two", metadata={"num_rows": 24})

Compose multiple ops into a single asset:

@dg.op(
retry_policy=dg.RetryPolicy(
max_retries=5,
delay=0.2,
backoff=dg.Backoff.EXPONENTIAL,
jitter=dg.Jitter.PLUS_MINUS,
)
)
def step_one() -> int:
return 42
@dg.op
def step_two(num: int):
return num ** 2
@dg.graph_asset
def complex_asset():
return step_two(step_one())

Data quality validations that run against assets. They do not consume Dagster+ credits.

import pandas as pd
import dagster as dg
@dg.asset
def orders():
orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
orders_df.to_csv("orders.csv")
@dg.asset_check(asset=orders)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(passed=bool(num_null_order_ids == 0))

Blocking checks (prevent downstream materialization on failure)

Section titled “Blocking checks (prevent downstream materialization on failure)”
@dg.asset_check(asset=orders, blocking=True)
def orders_id_has_no_nulls():
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
return dg.AssetCheckResult(passed=bool(num_null_order_ids == 0))
from collections.abc import Iterable
@dg.multi_asset_check(
specs=[
dg.AssetCheckSpec(name="orders_id_has_no_nulls", asset="orders"),
dg.AssetCheckSpec(name="items_id_has_no_nulls", asset="orders"),
]
)
def orders_check() -> Iterable[dg.AssetCheckResult]:
orders_df = pd.read_csv("orders.csv")
num_null_order_ids = orders_df["order_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="orders_id_has_no_nulls",
passed=bool(num_null_order_ids == 0),
asset_key="orders",
)
num_null_item_ids = orders_df["item_id"].isna().sum()
yield dg.AssetCheckResult(
check_name="items_id_has_no_nulls",
passed=bool(num_null_item_ids == 0),
asset_key="orders",
)
asset_job = dg.define_asset_job(
"asset_job",
selection=dg.AssetSelection.assets(orders).without_checks(),
)
check_job = dg.define_asset_job(
"check_job",
selection=dg.AssetSelection.checks_for_assets(orders),
)
asset_schedule = dg.ScheduleDefinition(job=asset_job, cron_schedule="0 0 * * *")
check_schedule = dg.ScheduleDefinition(job=check_job, cron_schedule="0 6 * * *")

Resources provide access to external systems (databases, APIs, cloud services). They are defined by subclassing ConfigurableResource and injected into assets via function parameters.

import dagster as dg
import requests
from requests import Response
class MyConnectionResource(dg.ConfigurableResource):
username: str
def request(self, endpoint: str) -> Response:
return requests.get(
f"https://my-api.com/{endpoint}",
headers={"user-agent": "dagster"},
)
from typing import Any
@dg.asset
def data_from_service(my_conn: MyConnectionResource) -> dict[str, Any]:
return my_conn.request("/fetch_data").json()
defs = dg.Definitions(
assets=[data_from_service],
resources={
"my_conn": MyConnectionResource(username="admin"),
},
)

Use EnvVar to read secrets at runtime (visible in UI as env var references, not plaintext):

defs = dg.Definitions(
resources={
"my_conn": MyConnectionResource(username=dg.EnvVar("API_USERNAME")),
},
)
APIPurpose
ConfigurableResourceBase class for defining resources
EnvVarRead config from environment variables
ResourceParamAnnotate a parameter as a resource
build_init_resource_context()Create context for testing resources
build_resources()Initialize resources outside execution

Dagster offers five automation methods: Schedules, Declarative Automation, Sensors, Asset Sensors, and GraphQL triggers.

Run assets on a cron schedule:

import dagster as dg
@dg.asset
def customer_data(): ...
@dg.asset
def sales_report(): ...
daily_schedule = dg.ScheduleDefinition(
name="daily_refresh",
cron_schedule="0 0 * * *", # midnight daily
target=[customer_data, sales_report],
)
defs = dg.Definitions(schedules=[daily_schedule])

With timezone:

daily_schedule = dg.ScheduleDefinition(
job=daily_refresh_job,
cron_schedule="0 0 * * *",
execution_timezone="America/New_York",
)

Auto-schedule from partitioned job:

daily_partition = dg.DailyPartitionsDefinition(start_date="2024-05-20")
@dg.asset(partitions_def=daily_partition)
def daily_asset(): ...
partitioned_asset_job = dg.define_asset_job(
"partitioned_job", selection=[daily_asset]
)
asset_partitioned_schedule = dg.build_schedule_from_partitioned_job(
partitioned_asset_job,
)

Declarative Automation (AutomationCondition)

Section titled “Declarative Automation (AutomationCondition)”

Set conditions directly on assets. Requires enabling default_automation_condition_sensor in the UI.

on_cron — execute on schedule after upstream updates:

@dg.asset(
deps=["upstream"],
automation_condition=dg.AutomationCondition.on_cron("@hourly"),
)
def hourly_asset() -> None: ...

eager — update whenever any upstream dependency changes:

@dg.asset(
deps=["upstream"],
automation_condition=dg.AutomationCondition.eager(),
)
def eager_asset() -> None: ...

on_missing — materialize once all upstream partitions exist:

@dg.asset(
deps=[upstream],
automation_condition=dg.AutomationCondition.on_missing(),
partitions_def=dg.DailyPartitionsDefinition("2025-01-01"),
)
def downstream() -> None: ...

Event-driven triggers that poll for conditions:

@dg.sensor(job=my_job)
def my_sensor(context: dg.SensorEvaluationContext):
# Check for new files, API events, etc.
if new_data_available():
return dg.RunRequest(run_key="unique_key")

Sensors with dynamic partitions:

@dg.sensor(job=regional_sales_job)
def all_regions_sensor(context: dg.SensorEvaluationContext):
all_regions = ["us", "eu", "jp", "ca", "uk", "au"]
return dg.SensorResult(
run_requests=[dg.RunRequest(partition_key=region) for region in all_regions],
dynamic_partitions_requests=[region_partitions.build_add_request(all_regions)],
)

Partitions split assets into independently materializable chunks — essential for incremental processing.

Recommendation: Keep partition count under 100,000 per asset for UI performance.

daily_partitions = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(partitions_def=daily_partitions)
def daily_sales_data(context: dg.AssetExecutionContext) -> None:
date = context.partition_key
df = pd.DataFrame({
"date": [date] * 10,
"sales": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000],
})
os.makedirs("data/daily_sales", exist_ok=True)
df.to_csv(f"data/daily_sales/sales_{date}.csv", index=False)

Other time-based definitions: WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, HourlyPartitionsDefinition.

region_partitions = dg.StaticPartitionsDefinition(["us", "eu", "jp"])
@dg.asset(partitions_def=region_partitions)
def regional_sales_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_key
...

Partitions created at runtime (e.g., new regions discovered via API):

region_partitions = dg.DynamicPartitionsDefinition(name="regions")
@dg.asset(partitions_def=region_partitions)
def regional_sales_data(context: dg.AssetExecutionContext) -> None:
region = context.partition_key
...
two_dimensional = dg.MultiPartitionsDefinition({
"date": dg.DailyPartitionsDefinition(start_date="2024-01-01"),
"region": dg.StaticPartitionsDefinition(["us", "eu", "jp"]),
})
@dg.asset(partitions_def=two_dimensional)
def daily_regional_sales(context: dg.AssetExecutionContext) -> None:
keys = context.partition_key.keys_by_dimension
date = keys["date"]
region = keys["region"]
...
from datetime import datetime
market_holidays = [
datetime.strptime(d, "%Y-%m-%d")
for d in ["2024-01-01", "2024-07-04", "2024-12-25"]
]
market_calendar = dg.TimeWindowPartitionsDefinition(
start=datetime(2024, 1, 1),
cron_schedule="0 0 * * 1-5", # weekdays only
fmt="%Y-%m-%d",
exclusions=market_holidays,
)
import datetime
@dg.schedule(job=daily_sales_job, cron_schedule="0 1 * * *")
def daily_sales_schedule(context):
previous_day = context.scheduled_execution_time.date() - datetime.timedelta(days=1)
date = previous_day.strftime("%Y-%m-%d")
return dg.RunRequest(run_key=date, partition_key=date)

I/O managers separate data processing logic from storage logic. Assets return data; the I/O manager decides where and how to persist it.

  • Assets stored in consistent locations with predictable paths
  • You want to swap storage backends between environments (DuckDB local, Postgres prod)
  • Upstream data fits in memory
  • SQL-based table creation (just run the SQL directly in the asset)
  • Assets exceeding memory
  • Pipelines managing I/O independently
import pandas as pd
import dagster as dg
@dg.asset
def raw_sales_data() -> pd.DataFrame:
return pd.read_csv("https://example.com/sales.csv")
@dg.asset
def clean_sales_data(raw_sales_data: pd.DataFrame) -> pd.DataFrame:
return raw_sales_data.fillna({"amount": 0.0})
@dg.asset
def sales_summary(clean_sales_data: pd.DataFrame) -> pd.DataFrame:
return clean_sales_data.groupby(["owner"])["amount"].sum().reset_index()

Configure storage via the I/O manager resource:

from dagster_duckdb_pandas import DuckDBPandasIOManager
defs = dg.Definitions(
assets=[raw_sales_data, clean_sales_data, sales_summary],
resources={
"io_manager": DuckDBPandasIOManager(
database="sales.duckdb",
schema="public",
)
},
)

Built-in I/O managers exist for: S3, Azure ADLS2, GCS, BigQuery, Snowflake, DuckDB, and Postgres — supporting Pandas, PySpark, and Polars DataFrames.

Dagster captures two types of logs:

  • Structured event logs — enriched with metadata, filterable in UI
  • Raw compute logs — stdout/stderr streams
@dg.asset
def my_asset(context: dg.AssetExecutionContext):
context.log.info("Processing started")
context.log.warning("Missing optional field")
context.log.error("Failed to connect")

In run config (YAML):

loggers:
console:
config:
log_level: ERROR

Supports standard Python levels: DEBUG, INFO, WARNING, ERROR, CRITICAL.


Dependencies create a directed acyclic graph visible in the Dagster UI:

@dg.asset
def raw_data(): ...
@dg.asset(deps=[raw_data])
def cleaned_data(): ...
@dg.asset(deps=[cleaned_data])
def analytics(): ...

For data passed between assets (not just ordering), use function parameters:

@dg.asset
def raw_data() -> pd.DataFrame:
return pd.read_csv("data.csv")
@dg.asset
def cleaned_data(raw_data: pd.DataFrame) -> pd.DataFrame:
return raw_data.dropna()

Use partitions to process only new data:

daily = dg.DailyPartitionsDefinition(start_date="2024-01-01")
@dg.asset(partitions_def=daily)
def incremental_table(context: dg.AssetExecutionContext):
date = context.partition_key
# Only process this date's data
new_rows = fetch_data_for_date(date)
upsert_to_database(new_rows)
@dg.asset(
retry_policy=dg.RetryPolicy(
max_retries=3,
delay=1,
backoff=dg.Backoff.EXPONENTIAL,
)
)
def flaky_api_asset():
return call_unreliable_api()

Or at the op level within a graph asset:

@dg.op(
retry_policy=dg.RetryPolicy(
max_retries=5,
delay=0.2,
backoff=dg.Backoff.EXPONENTIAL,
jitter=dg.Jitter.PLUS_MINUS,
)
)
def unreliable_step() -> int:
...

Track data produced outside Dagster without managing its computation:

raw_clickstream = dg.AssetSpec("raw_clickstream") # produced by an external system
@dg.asset(deps=[raw_clickstream])
def processed_clickstream():
# Dagster doesn't materialize raw_clickstream, but knows it's a dependency
...

Install: pip install dagster-postgres (or uv add dagster-postgres)

The dagster-postgres library provides:

  • PostgreSQL-backed storage for Dagster’s own metadata (run storage, event log, schedule storage)
  • I/O managers for reading/writing asset data to Postgres tables

Latest version: 1.12.19

For bioloupe-data, Postgres is primarily used as both the orchestration backend and the target for asset materialization.

Install: pip install dagster-openai (or uv add dagster-openai)

from dagster_openai import OpenAIResource
from dagster import EnvVar
# Resource definition
openai = OpenAIResource(api_key=EnvVar("OPENAI_API_KEY"))

Using in an asset:

from dagster_openai import OpenAIResource
from dagster import AssetExecutionContext, Definitions, EnvVar, asset, define_asset_job
@asset(compute_kind="OpenAI")
def openai_asset(context: AssetExecutionContext, openai: OpenAIResource):
with openai.get_client(context) as client:
client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Say this is a test."}],
)
defs = Definitions(
assets=[openai_asset],
resources={
"openai": OpenAIResource(api_key=EnvVar("OPENAI_API_KEY")),
},
)

Key notes:

  • Usage metadata (tokens, costs) automatically logged in asset metadata
  • Dagster+ users see LLM usage in the Insights dashboard
  • Use openai.get_client(context) as a context manager for automatic tracking

4.3 Dagster Pipes (TypeScript / External Processes)

Section titled “4.3 Dagster Pipes (TypeScript / External Processes)”

Dagster Pipes invokes code outside of Python (TypeScript, Rust, shell scripts) while retaining Dagster’s scheduling, reporting, and observability.

import shutil
import dagster as dg
@dg.asset
def wrapper_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
) -> None:
return pipes_subprocess_client.run(
command=[shutil.which("node"), "scripts/process.ts"],
context=context,
).get_results()
defs = dg.Definitions(
assets=[wrapper_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

For Python external processes, use dagster-pipes:

from dagster_pipes import PipesContext, open_dagster_pipes
def main():
with open_dagster_pipes():
context = PipesContext.get()
context.log.info("Processing data")
context.report_asset_materialization(
metadata={"total_orders": 2}
)
if __name__ == "__main__":
main()

For non-Python (TypeScript/Node), the external process can write structured JSON to stdout/stderr following the Pipes protocol, or use environment variables set by the PipesSubprocessClient to communicate results.


Dagster assets are plain Python functions — call them directly in tests.

import dagster as dg
@dg.asset
def loaded_file() -> str:
with open("path.txt") as file:
return file.read()
def test_loaded_file():
assert loaded_file() == "contents"

Pass upstream values as arguments:

@dg.asset
def processed_file(loaded_file: str) -> str:
return loaded_file.strip()
def test_processed_file():
assert processed_file(" contents ") == "contents"
class FilepathConfig(dg.Config):
path: str
@dg.asset
def loaded_file(config: FilepathConfig) -> str:
with open(config.path) as file:
return file.read()
def test_loaded_file():
assert loaded_file(FilepathConfig(path="path1.txt")) == "contents1"
from unittest import mock
@dg.asset
def loaded_file(file_manager: S3FileManager) -> str:
return file_manager.read_data(S3FileHandle("bucket", "path.txt"))
def test_loaded_file():
mocked = mock.Mock(spec=S3FileManager)
mocked.read_data.return_value = "contents"
assert loaded_file(mocked) == "contents"
@dg.asset(partitions_def=dg.DailyPartitionsDefinition("2024-01-01"))
def daily_data(context: dg.AssetExecutionContext) -> str:
return f"data for {context.partition_key}"
def test_daily_data():
context = dg.build_asset_context(partition_key="2024-08-16")
assert daily_data(context) == "data for 2024-08-16"
Terminal window
pytest tests/

Terminal window
dagster dev

Starts the Dagster webserver and daemon locally. The UI is available at http://localhost:3000.

Managed orchestration platform. Two deployment flavors:

  • Serverless — fully managed, code runs in Dagster’s environment
  • Hybrid — you maintain execution infrastructure, Dagster manages the control plane (UI, metadata, scheduling)

Features beyond OSS:

  • Insights — cost optimization, performance analytics
  • Alerts — Slack, PagerDuty, email for failures and SLA violations
  • RBAC — role-based access control, audit logging
  • Asset Catalog — search, lineage, column-level tracking
  • Branch Deployments — staging environments per git branch
  • Compliance — SOC 2 Type II, HIPAA, GDPR

Deploy Dagster yourself via Docker, Kubernetes, or bare metal. You manage the webserver, daemon, and database backend.


FeatureSoloStarterPro
Price$10/mo$100/moContact sales
Credits/mo7,50030,000Unlimited
Users1Up to 3Unlimited
Code locations15Unlimited
Deployments11Unlimited
Free trial30 days30 days

All plans include: asset-based orchestration, branch deployments, event-based automations, alerts, partitions, backfills, asset quality/freshness checks, metadata and lineage.

Pro-only: column-level lineage, catalog focus-mode, custom metrics, cost tracking (BigQuery/Snowflake), team management, audit logs, SAML.

Compute pricing: $0.005/compute-minute (Serverless, Solo/Starter). Pro uses flat-fee pricing.

Overage: $0.03/credit beyond plan allotment.

Solo at $10/mo is the right starting tier for bioloupe-data — 1 user, 1 code location, 7,500 credits/month.


  • One table = one asset. If a function produces a single table/file, it should be one @asset.
  • Use @multi_asset only when a single computation genuinely produces multiple artifacts.
  • Keep assets focused — a 200-line asset doing fetch + clean + transform + load should be split.
  • Define resources as ConfigurableResource subclasses with typed fields.
  • Use EnvVar for secrets — never hardcode credentials.
  • Create environment-specific Definitions: dev resources point to local/test databases, prod resources point to RDS.
  • Use retry_policy on assets hitting flaky external services.
  • Use blocking=True on asset checks to prevent bad data from propagating downstream.
  • Log context with context.log.info() / context.log.error() for debuggability.
  • Use assets (default) — for any persistent data artifact: tables, files, models, embeddings.
  • Use ops — only for side-effect-only work (sending emails, triggering webhooks) that doesn’t produce a data artifact, or when composing steps within a @graph_asset.
  • Rule of thumb: if the result should appear in the asset graph, use @asset. If it’s purely operational, use @op.
  • Use DailyPartitionsDefinition for data that arrives daily (most common).
  • Use StaticPartitionsDefinition for fixed categories (regions, data sources).
  • Use DynamicPartitionsDefinition when categories are discovered at runtime.
  • Keep partitions under 100,000 per asset.
  • Combine with build_schedule_from_partitioned_job for automatic scheduling.
bioloupe-data/
dagster/
__init__.py
assets/
__init__.py
ingestion.py # @asset: raw data from APIs/files
enrichment.py # @asset: LLM enrichment, geocoding
analytics.py # @asset: aggregations, summaries
resources/
__init__.py
postgres.py # ConfigurableResource for RDS
openai.py # OpenAI resource wrapper
checks/
__init__.py
data_quality.py # @asset_check definitions
definitions.py # Definitions(...) entry point
tests/
test_assets.py
test_resources.py