Dbt
dbt Reference (dagster-dbt Integration)
Section titled “dbt Reference (dagster-dbt Integration)”Overview
Section titled “Overview”dbt (data build tool) manages the computed layer in bioloupe-data — the 5 mart tables that power bioloupe-aero’s read-heavy queries. Instead of REFRESH MATERIALIZED VIEW via raw SQL, we define each model as a .sql file with {{ source() }} and {{ ref() }} references, and let dagster-dbt translate the dbt DAG into Dagster assets automatically.
Why dbt instead of raw matview refresh:
- SQL is version-controlled and testable (schema tests, row-count tests, uniqueness)
{{ source() }}references create a typed contract between Python-managed fact tables and SQL models- dagster-dbt maps each dbt model to a Dagster asset with correct dependencies — no manual wiring
materialized='table'with indexes replacesREFRESH MATERIALIZED VIEW CONCURRENTLY(same result, simpler ops)- Incremental materializations available when full-table rebuilds become too slow
Project Structure
Section titled “Project Structure”bioloupe-data/ dbt/ dbt_project.yml profiles.yml models/ sources/ bioloupe_sources.yml # maps fact tables to dbt sources with meta.dagster.asset_key marts/ mv_development_programs.sql mv_drug_trials.sql mv_efficacy_evidence.sql mv_competitive_landscape.sql mv_drug_pipeline.sql pipelines/ project.py # DbtProject instance assets/ dbt_assets.py # @dbt_assets decoratordbt_project.yml
Section titled “dbt_project.yml”name: bioloupeversion: "1.0.0"config-version: 2
profile: bioloupe
model-paths: ["models"]test-paths: ["tests"]target-path: "target"clean-targets: ["target", "dbt_packages"]
models: bioloupe: marts: +materialized: table +schema: publicAll mart models default to materialized: table (dropped and recreated on each run). This is equivalent to REFRESH MATERIALIZED VIEW but simpler — no need for unique indexes just for concurrent refresh.
profiles.yml
Section titled “profiles.yml”bioloupe: target: prod outputs: prod: type: postgres host: "{{ env_var('BIOLOUPE_DB_HOST') }}" port: "{{ env_var('BIOLOUPE_DB_PORT', '5432') | int }}" user: "{{ env_var('BIOLOUPE_DB_USER') }}" password: "{{ env_var('BIOLOUPE_DB_PASSWORD') }}" dbname: "{{ env_var('BIOLOUPE_DB_NAME') }}" schema: public threads: 4Uses env_var() so credentials never appear in source. Dagster’s EnvVar resource passes these through at runtime.
@dbt_assets Decorator Pattern
Section titled “@dbt_assets Decorator Pattern”dagster-dbt provides @dbt_assets which reads the dbt manifest and creates one Dagster asset per dbt model. The DbtProject class points to the dbt project directory and compiles the manifest.
pipelines/project.py
Section titled “pipelines/project.py”from pathlib import Path
from dagster_dbt import DbtProject
BIOLOUPE_DBT_PROJECT = DbtProject( project_dir=Path(__file__).parent.parent / "dbt",)
# At build time (CI or `dagster dev`), this compiles the manifest:BIOLOUPE_DBT_PROJECT.prepare_if_dev()pipelines/assets/dbt_assets.py
Section titled “pipelines/assets/dbt_assets.py”import dagster as dgfrom dagster_dbt import DbtCliResource, dbt_assets
from pipelines.project import BIOLOUPE_DBT_PROJECT
@dbt_assets( manifest=BIOLOUPE_DBT_PROJECT.manifest_path, dagster_dbt_translator=..., # optional: customize asset keys)def bioloupe_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["build"], context=context).stream()Each dbt model becomes a Dagster asset. Dependencies on {{ source() }} are resolved via meta.dagster.asset_key in the sources YAML — linking dbt sources to existing Dagster assets (Python-managed fact tables).
Registration in definitions.py
Section titled “Registration in definitions.py”from dagster_dbt import DbtCliResourcefrom pipelines.project import BIOLOUPE_DBT_PROJECTfrom pipelines.assets.dbt_assets import bioloupe_dbt_assets
defs = dg.Definitions( assets=[ *EXTERNAL_ASSETS, bioloupe_dbt_assets, # ... other Python assets ... ], resources={ "dbt": DbtCliResource(project_dir=BIOLOUPE_DBT_PROJECT), },)sources.yml with meta.dagster.asset_key
Section titled “sources.yml with meta.dagster.asset_key”The meta.dagster.asset_key field tells dagster-dbt which existing Dagster asset corresponds to each dbt source table. This is how Python-managed assets (direct imports, entity resolution) connect to dbt models.
version: 2
sources: - name: bioloupe schema: public tables: # ── Core entities (external AssetSpecs) ── - name: drugs meta: dagster: asset_key: ["drugs"] - name: diseases meta: dagster: asset_key: ["diseases"] - name: organisations meta: dagster: asset_key: ["organisations"] - name: targets meta: dagster: asset_key: ["targets"] - name: technologies meta: dagster: asset_key: ["technologies"]
# ── Fact tables (Dagster-managed assets) ── - name: drug_approvals meta: dagster: asset_key: ["drug_approvals"] - name: indications meta: dagster: asset_key: ["indications"] - name: clinical_trials meta: dagster: asset_key: ["clinical_trials"] - name: trial_arm_interventions meta: dagster: asset_key: ["trial_arm_interventions"] - name: trial_arms meta: dagster: asset_key: ["trial_arms"] - name: interventions meta: dagster: asset_key: ["interventions"] - name: trial_diseases meta: dagster: asset_key: ["trial_diseases"] - name: trial_arm_results meta: dagster: asset_key: ["trial_arm_results"] - name: publications meta: dagster: asset_key: ["publications"] - name: publication_outcomes meta: dagster: asset_key: ["publication_outcomes"] - name: drug_ownerships meta: dagster: asset_key: ["drug_ownerships"] - name: drug_target_actions meta: dagster: asset_key: ["drug_target_actions"]
# ── Mart tables (dbt-managed, used by ref() not source()) ── # mv_development_programs is referenced by mv_competitive_landscape # and mv_drug_pipeline via {{ ref() }}, not {{ source() }}.Key rule: Tables managed outside dbt use {{ source('bioloupe', 'table_name') }}. Tables managed by dbt use {{ ref('model_name') }}.
Model Materializations
Section titled “Model Materializations”Table (default for marts)
Section titled “Table (default for marts)”-- dbt/models/marts/mv_drug_trials.sql{{ config( materialized='table', indexes=[ {'columns': ['drug_id'], 'type': 'btree'}, {'columns': ['clinical_trial_id'], 'type': 'btree'}, ], unique_key=['drug_id', 'clinical_trial_id', 'role'], )}}
SELECT DISTINCT i.drug_id, ct.id AS clinical_trial_id, CASE WHEN tai.is_investigational = true THEN 'investigational' WHEN ta.arm_type = 'sham_comparator' THEN 'sham_comparator' WHEN ta.arm_type = 'active_comparator' THEN 'active_comparator' WHEN ta.arm_type = 'placebo_comparator' THEN 'placebo_comparator' WHEN ta.arm_type = 'experimental' AND tai.is_investigational = false THEN 'combination_partner' ELSE 'other' END AS roleFROM {{ source('bioloupe', 'trial_arm_interventions') }} taiJOIN {{ source('bioloupe', 'interventions') }} i ON i.id = tai.intervention_idJOIN {{ source('bioloupe', 'trial_arms') }} ta ON ta.id = tai.trial_arm_idJOIN {{ source('bioloupe', 'clinical_trials') }} ct ON ct.id = ta.clinical_trial_idWHERE i.drug_id IS NOT NULLThe indexes config creates indexes after table creation. unique_key is metadata (used by incremental for merge logic, informational for table materialization).
Incremental (for large tables)
Section titled “Incremental (for large tables)”When a table gets too large for full rebuild:
{{ config( materialized='incremental', unique_key=['id', 'source_type'], incremental_strategy='merge', )}}
SELECT ...FROM {{ source('bioloupe', 'trial_arm_results') }} tar{% if is_incremental() %}WHERE tar.updated_at > (SELECT MAX(updated_at) FROM {{ this }}){% endif %}Start with table materialization. Switch to incremental only when full rebuilds exceed acceptable duration.
{{ source() }} vs {{ ref() }}
Section titled “{{ source() }} vs {{ ref() }}”| Function | When to use | Example |
|---|---|---|
{{ source('bioloupe', 'drugs') }} | Referencing tables managed outside dbt (Python assets, MikroORM) | Fact tables, entity tables |
{{ ref('mv_development_programs') }} | Referencing other dbt models | mv_competitive_landscape depends on mv_development_programs |
dbt builds a DAG from these references. dagster-dbt reads the DAG and creates matching Dagster asset dependencies. source() references link to external assets via meta.dagster.asset_key. ref() references link to other dbt model assets.
Example — mv_competitive_landscape uses both:
-- Uses ref() for the upstream dbt modelFROM {{ ref('mv_development_programs') }} dp-- Uses source() for Python-managed tablesJOIN {{ source('bioloupe', 'drugs') }} d ON d.id = dp.drug_idLEFT JOIN {{ source('bioloupe', 'technologies') }} t ON t.id = d.technology_idTesting
Section titled “Testing”dbt tests validate data quality after each model builds. Define tests in the model’s YAML or as standalone .sql files.
Schema tests (in YAML)
Section titled “Schema tests (in YAML)”version: 2
models: - name: mv_development_programs description: "Highest development phase per drug x disease pair" columns: - name: drug_id tests: - not_null - name: disease_id tests: - not_null - name: highest_phase_rank tests: - not_null tests: - unique: column_name: "drug_id || '-' || disease_id"
- name: mv_drug_trials columns: - name: drug_id tests: - not_null - name: clinical_trial_id tests: - not_null - name: role tests: - not_null - accepted_values: values: ['investigational', 'combination_partner', 'active_comparator', 'placebo_comparator', 'sham_comparator', 'other']Custom singular tests
Section titled “Custom singular tests”-- dbt/tests/assert_mv_dev_programs_no_orphans.sql-- Every drug_id in mv_development_programs should exist in drugsSELECT dp.drug_idFROM {{ ref('mv_development_programs') }} dpLEFT JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = dp.drug_idWHERE d.id IS NULLA singular test fails if it returns any rows.
Running tests
Section titled “Running tests”# Run all testsdbt test
# Test a specific modeldbt test --select mv_development_programs
# Via Dagster (tests run as part of `dbt build`)# The @dbt_assets decorator runs `dbt build` which includes testsdagster-dbt surfaces test results as Dagster asset checks. Failed tests appear in the Dagster UI as failed checks on the corresponding asset.
Full Model Examples
Section titled “Full Model Examples”mv_development_programs.sql
Section titled “mv_development_programs.sql”{{ config( materialized='table', indexes=[ {'columns': ['drug_id', 'disease_id'], 'unique': true, 'type': 'btree'}, {'columns': ['highest_phase_rank'], 'type': 'btree'}, {'columns': ['disease_id'], 'type': 'btree'}, {'columns': ['drug_id'], 'type': 'btree'}, ], )}}
WITH phase_sources AS ( SELECT da.drug_id, d.name AS drug_name, ind.disease_id, dis.name AS disease_name, 'approved' AS highest_phase, 100 AS highest_phase_rank, 'approval' AS phase_source, NULL::text[] AS nct_ids, da.approval_date, NULL::timestamptz AS earliest_completion_date FROM {{ source('bioloupe', 'drug_approvals') }} da JOIN {{ source('bioloupe', 'indications') }} ind ON ind.drug_approval_id = da.id JOIN {{ source('bioloupe', 'diseases') }} dis ON dis.id = ind.disease_id JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = da.drug_id WHERE da.approval_status = 'approved'
UNION ALL
SELECT i.drug_id, d.name AS drug_name, td.disease_id, dis.name AS disease_name, ct.phase AS highest_phase, CASE ct.phase WHEN 'phase4' THEN 4 WHEN 'phase3' THEN 3 WHEN 'phase2_3' THEN 3 WHEN 'phase2' THEN 2 WHEN 'phase1_2' THEN 2 WHEN 'phase1' THEN 1 WHEN 'early_phase1' THEN 1 ELSE 0 END AS highest_phase_rank, 'clinical_trial' AS phase_source, ARRAY[ct.nct_id] AS nct_ids, NULL::timestamptz AS approval_date, ct.completion_date AS earliest_completion_date FROM {{ source('bioloupe', 'trial_arm_interventions') }} tai JOIN {{ source('bioloupe', 'interventions') }} i ON i.id = tai.intervention_id JOIN {{ source('bioloupe', 'trial_arms') }} ta ON ta.id = tai.trial_arm_id JOIN {{ source('bioloupe', 'clinical_trials') }} ct ON ct.id = ta.clinical_trial_id JOIN {{ source('bioloupe', 'trial_diseases') }} td ON td.clinical_trial_id = ct.id JOIN {{ source('bioloupe', 'diseases') }} dis ON dis.id = td.disease_id JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = i.drug_id WHERE i.drug_id IS NOT NULL AND ct.overall_status NOT IN ('withdrawn')),aggregated AS ( SELECT drug_id, drug_name, disease_id, disease_name, highest_phase, highest_phase_rank, phase_source, CASE WHEN phase_source = 'clinical_trial' THEN array_agg(DISTINCT unnested_nct ORDER BY unnested_nct) ELSE NULL::text[] END AS nct_ids, MIN(approval_date) AS approval_date, MIN(earliest_completion_date) AS earliest_completion_date FROM phase_sources, LATERAL unnest(COALESCE(nct_ids, ARRAY[NULL::text])) AS unnested_nct GROUP BY drug_id, drug_name, disease_id, disease_name, highest_phase, highest_phase_rank, phase_source),cleaned AS ( SELECT drug_id, drug_name, disease_id, disease_name, highest_phase, highest_phase_rank, phase_source, array_remove(nct_ids, NULL) AS nct_ids, approval_date, earliest_completion_date FROM aggregated)SELECT DISTINCT ON (drug_id, disease_id) drug_id, drug_name, disease_id, disease_name, highest_phase, highest_phase_rank, phase_source, nct_ids, approval_date, earliest_completion_dateFROM cleanedORDER BY drug_id, disease_id, highest_phase_rank DESC, phase_source ASCmv_competitive_landscape.sql (uses ref + source)
Section titled “mv_competitive_landscape.sql (uses ref + source)”{{ config( materialized='table', indexes=[ {'columns': ['drug_id', 'disease_id'], 'unique': true, 'type': 'btree'}, {'columns': ['disease_id'], 'type': 'btree'}, {'columns': ['highest_phase_rank'], 'type': 'btree'}, {'columns': ['technology_name'], 'type': 'btree'}, ], )}}
SELECT dp.drug_id, dp.drug_name, dp.disease_id, dp.disease_name, dp.highest_phase, dp.highest_phase_rank, dp.phase_source, d.drug_type, t.name AS technology_name, t.drug_class, owners.current_owner_namesFROM {{ ref('mv_development_programs') }} dpJOIN {{ source('bioloupe', 'drugs') }} d ON d.id = dp.drug_idLEFT JOIN {{ source('bioloupe', 'technologies') }} t ON t.id = d.technology_idLEFT JOIN LATERAL ( SELECT array_agg(DISTINCT o.name ORDER BY o.name) AS current_owner_names FROM {{ source('bioloupe', 'drug_ownerships') }} dow JOIN {{ source('bioloupe', 'organisations') }} o ON o.id = dow.organisation_id WHERE dow.drug_id = d.id AND dow.is_current = true) owners ON truemv_drug_pipeline.sql (uses ref + source)
Section titled “mv_drug_pipeline.sql (uses ref + source)”{{ config( materialized='table', indexes=[ {'columns': ['drug_id'], 'unique': true, 'type': 'btree'}, {'columns': ['drug_status'], 'type': 'btree'}, {'columns': ['highest_phase_rank_overall'], 'type': 'btree'}, {'columns': ['technology_name'], 'type': 'btree'}, ], )}}
SELECT d.id AS drug_id, d.bloupe_id, d.name AS drug_name, d.drug_status, d.drug_type, d.first_in_class, EXISTS(SELECT 1 FROM {{ source('bioloupe', 'drugs') }} c WHERE c.parent_id = d.id) AS is_combination, t.name AS technology_name, t.drug_class, primary_tgt.target_name AS primary_target_name, all_tgts.target_names AS target_names, owners.owner_names AS current_owner_names, phase_agg.highest_phase_overall, phase_agg.highest_phase_rank_overallFROM {{ source('bioloupe', 'drugs') }} dLEFT JOIN {{ source('bioloupe', 'technologies') }} t ON t.id = d.technology_idLEFT JOIN LATERAL ( SELECT tgt.name AS target_name FROM {{ source('bioloupe', 'drug_target_actions') }} dta JOIN {{ source('bioloupe', 'targets') }} tgt ON tgt.id = dta.target_id WHERE dta.drug_id = d.id ORDER BY dta.is_primary DESC, tgt.name ASC LIMIT 1) primary_tgt ON trueLEFT JOIN LATERAL ( SELECT array_agg(DISTINCT tgt.name ORDER BY tgt.name) AS target_names FROM {{ source('bioloupe', 'drug_target_actions') }} dta JOIN {{ source('bioloupe', 'targets') }} tgt ON tgt.id = dta.target_id WHERE dta.drug_id = d.id) all_tgts ON trueLEFT JOIN LATERAL ( SELECT array_agg(DISTINCT o.name ORDER BY o.name) AS owner_names FROM {{ source('bioloupe', 'drug_ownerships') }} dow JOIN {{ source('bioloupe', 'organisations') }} o ON o.id = dow.organisation_id WHERE dow.drug_id = d.id AND dow.is_current = true) owners ON trueLEFT JOIN LATERAL ( SELECT MAX(dp.highest_phase_rank) AS highest_phase_rank_overall, (array_agg(dp.highest_phase ORDER BY dp.highest_phase_rank DESC))[1] AS highest_phase_overall FROM {{ ref('mv_development_programs') }} dp WHERE dp.drug_id = d.id) phase_agg ON true