Skip to content

Dbt

dbt (data build tool) manages the computed layer in bioloupe-data — the 5 mart tables that power bioloupe-aero’s read-heavy queries. Instead of REFRESH MATERIALIZED VIEW via raw SQL, we define each model as a .sql file with {{ source() }} and {{ ref() }} references, and let dagster-dbt translate the dbt DAG into Dagster assets automatically.

Why dbt instead of raw matview refresh:

  • SQL is version-controlled and testable (schema tests, row-count tests, uniqueness)
  • {{ source() }} references create a typed contract between Python-managed fact tables and SQL models
  • dagster-dbt maps each dbt model to a Dagster asset with correct dependencies — no manual wiring
  • materialized='table' with indexes replaces REFRESH MATERIALIZED VIEW CONCURRENTLY (same result, simpler ops)
  • Incremental materializations available when full-table rebuilds become too slow
bioloupe-data/
dbt/
dbt_project.yml
profiles.yml
models/
sources/
bioloupe_sources.yml # maps fact tables to dbt sources with meta.dagster.asset_key
marts/
mv_development_programs.sql
mv_drug_trials.sql
mv_efficacy_evidence.sql
mv_competitive_landscape.sql
mv_drug_pipeline.sql
pipelines/
project.py # DbtProject instance
assets/
dbt_assets.py # @dbt_assets decorator
name: bioloupe
version: "1.0.0"
config-version: 2
profile: bioloupe
model-paths: ["models"]
test-paths: ["tests"]
target-path: "target"
clean-targets: ["target", "dbt_packages"]
models:
bioloupe:
marts:
+materialized: table
+schema: public

All mart models default to materialized: table (dropped and recreated on each run). This is equivalent to REFRESH MATERIALIZED VIEW but simpler — no need for unique indexes just for concurrent refresh.

bioloupe:
target: prod
outputs:
prod:
type: postgres
host: "{{ env_var('BIOLOUPE_DB_HOST') }}"
port: "{{ env_var('BIOLOUPE_DB_PORT', '5432') | int }}"
user: "{{ env_var('BIOLOUPE_DB_USER') }}"
password: "{{ env_var('BIOLOUPE_DB_PASSWORD') }}"
dbname: "{{ env_var('BIOLOUPE_DB_NAME') }}"
schema: public
threads: 4

Uses env_var() so credentials never appear in source. Dagster’s EnvVar resource passes these through at runtime.

dagster-dbt provides @dbt_assets which reads the dbt manifest and creates one Dagster asset per dbt model. The DbtProject class points to the dbt project directory and compiles the manifest.

pipelines/project.py
from pathlib import Path
from dagster_dbt import DbtProject
BIOLOUPE_DBT_PROJECT = DbtProject(
project_dir=Path(__file__).parent.parent / "dbt",
)
# At build time (CI or `dagster dev`), this compiles the manifest:
BIOLOUPE_DBT_PROJECT.prepare_if_dev()
pipelines/assets/dbt_assets.py
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets
from pipelines.project import BIOLOUPE_DBT_PROJECT
@dbt_assets(
manifest=BIOLOUPE_DBT_PROJECT.manifest_path,
dagster_dbt_translator=..., # optional: customize asset keys
)
def bioloupe_dbt_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()

Each dbt model becomes a Dagster asset. Dependencies on {{ source() }} are resolved via meta.dagster.asset_key in the sources YAML — linking dbt sources to existing Dagster assets (Python-managed fact tables).

from dagster_dbt import DbtCliResource
from pipelines.project import BIOLOUPE_DBT_PROJECT
from pipelines.assets.dbt_assets import bioloupe_dbt_assets
defs = dg.Definitions(
assets=[
*EXTERNAL_ASSETS,
bioloupe_dbt_assets,
# ... other Python assets ...
],
resources={
"dbt": DbtCliResource(project_dir=BIOLOUPE_DBT_PROJECT),
},
)

The meta.dagster.asset_key field tells dagster-dbt which existing Dagster asset corresponds to each dbt source table. This is how Python-managed assets (direct imports, entity resolution) connect to dbt models.

dbt/models/sources/bioloupe_sources.yml
version: 2
sources:
- name: bioloupe
schema: public
tables:
# ── Core entities (external AssetSpecs) ──
- name: drugs
meta:
dagster:
asset_key: ["drugs"]
- name: diseases
meta:
dagster:
asset_key: ["diseases"]
- name: organisations
meta:
dagster:
asset_key: ["organisations"]
- name: targets
meta:
dagster:
asset_key: ["targets"]
- name: technologies
meta:
dagster:
asset_key: ["technologies"]
# ── Fact tables (Dagster-managed assets) ──
- name: drug_approvals
meta:
dagster:
asset_key: ["drug_approvals"]
- name: indications
meta:
dagster:
asset_key: ["indications"]
- name: clinical_trials
meta:
dagster:
asset_key: ["clinical_trials"]
- name: trial_arm_interventions
meta:
dagster:
asset_key: ["trial_arm_interventions"]
- name: trial_arms
meta:
dagster:
asset_key: ["trial_arms"]
- name: interventions
meta:
dagster:
asset_key: ["interventions"]
- name: trial_diseases
meta:
dagster:
asset_key: ["trial_diseases"]
- name: trial_arm_results
meta:
dagster:
asset_key: ["trial_arm_results"]
- name: publications
meta:
dagster:
asset_key: ["publications"]
- name: publication_outcomes
meta:
dagster:
asset_key: ["publication_outcomes"]
- name: drug_ownerships
meta:
dagster:
asset_key: ["drug_ownerships"]
- name: drug_target_actions
meta:
dagster:
asset_key: ["drug_target_actions"]
# ── Mart tables (dbt-managed, used by ref() not source()) ──
# mv_development_programs is referenced by mv_competitive_landscape
# and mv_drug_pipeline via {{ ref() }}, not {{ source() }}.

Key rule: Tables managed outside dbt use {{ source('bioloupe', 'table_name') }}. Tables managed by dbt use {{ ref('model_name') }}.

-- dbt/models/marts/mv_drug_trials.sql
{{
config(
materialized='table',
indexes=[
{'columns': ['drug_id'], 'type': 'btree'},
{'columns': ['clinical_trial_id'], 'type': 'btree'},
],
unique_key=['drug_id', 'clinical_trial_id', 'role'],
)
}}
SELECT DISTINCT
i.drug_id,
ct.id AS clinical_trial_id,
CASE
WHEN tai.is_investigational = true THEN 'investigational'
WHEN ta.arm_type = 'sham_comparator' THEN 'sham_comparator'
WHEN ta.arm_type = 'active_comparator' THEN 'active_comparator'
WHEN ta.arm_type = 'placebo_comparator' THEN 'placebo_comparator'
WHEN ta.arm_type = 'experimental'
AND tai.is_investigational = false THEN 'combination_partner'
ELSE 'other'
END AS role
FROM {{ source('bioloupe', 'trial_arm_interventions') }} tai
JOIN {{ source('bioloupe', 'interventions') }} i ON i.id = tai.intervention_id
JOIN {{ source('bioloupe', 'trial_arms') }} ta ON ta.id = tai.trial_arm_id
JOIN {{ source('bioloupe', 'clinical_trials') }} ct ON ct.id = ta.clinical_trial_id
WHERE i.drug_id IS NOT NULL

The indexes config creates indexes after table creation. unique_key is metadata (used by incremental for merge logic, informational for table materialization).

When a table gets too large for full rebuild:

{{
config(
materialized='incremental',
unique_key=['id', 'source_type'],
incremental_strategy='merge',
)
}}
SELECT ...
FROM {{ source('bioloupe', 'trial_arm_results') }} tar
{% if is_incremental() %}
WHERE tar.updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Start with table materialization. Switch to incremental only when full rebuilds exceed acceptable duration.

FunctionWhen to useExample
{{ source('bioloupe', 'drugs') }}Referencing tables managed outside dbt (Python assets, MikroORM)Fact tables, entity tables
{{ ref('mv_development_programs') }}Referencing other dbt modelsmv_competitive_landscape depends on mv_development_programs

dbt builds a DAG from these references. dagster-dbt reads the DAG and creates matching Dagster asset dependencies. source() references link to external assets via meta.dagster.asset_key. ref() references link to other dbt model assets.

Example — mv_competitive_landscape uses both:

-- Uses ref() for the upstream dbt model
FROM {{ ref('mv_development_programs') }} dp
-- Uses source() for Python-managed tables
JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = dp.drug_id
LEFT JOIN {{ source('bioloupe', 'technologies') }} t ON t.id = d.technology_id

dbt tests validate data quality after each model builds. Define tests in the model’s YAML or as standalone .sql files.

dbt/models/marts/schema.yml
version: 2
models:
- name: mv_development_programs
description: "Highest development phase per drug x disease pair"
columns:
- name: drug_id
tests:
- not_null
- name: disease_id
tests:
- not_null
- name: highest_phase_rank
tests:
- not_null
tests:
- unique:
column_name: "drug_id || '-' || disease_id"
- name: mv_drug_trials
columns:
- name: drug_id
tests:
- not_null
- name: clinical_trial_id
tests:
- not_null
- name: role
tests:
- not_null
- accepted_values:
values: ['investigational', 'combination_partner', 'active_comparator', 'placebo_comparator', 'sham_comparator', 'other']
-- dbt/tests/assert_mv_dev_programs_no_orphans.sql
-- Every drug_id in mv_development_programs should exist in drugs
SELECT dp.drug_id
FROM {{ ref('mv_development_programs') }} dp
LEFT JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = dp.drug_id
WHERE d.id IS NULL

A singular test fails if it returns any rows.

Terminal window
# Run all tests
dbt test
# Test a specific model
dbt test --select mv_development_programs
# Via Dagster (tests run as part of `dbt build`)
# The @dbt_assets decorator runs `dbt build` which includes tests

dagster-dbt surfaces test results as Dagster asset checks. Failed tests appear in the Dagster UI as failed checks on the corresponding asset.

{{
config(
materialized='table',
indexes=[
{'columns': ['drug_id', 'disease_id'], 'unique': true, 'type': 'btree'},
{'columns': ['highest_phase_rank'], 'type': 'btree'},
{'columns': ['disease_id'], 'type': 'btree'},
{'columns': ['drug_id'], 'type': 'btree'},
],
)
}}
WITH phase_sources AS (
SELECT
da.drug_id,
d.name AS drug_name,
ind.disease_id,
dis.name AS disease_name,
'approved' AS highest_phase,
100 AS highest_phase_rank,
'approval' AS phase_source,
NULL::text[] AS nct_ids,
da.approval_date,
NULL::timestamptz AS earliest_completion_date
FROM {{ source('bioloupe', 'drug_approvals') }} da
JOIN {{ source('bioloupe', 'indications') }} ind ON ind.drug_approval_id = da.id
JOIN {{ source('bioloupe', 'diseases') }} dis ON dis.id = ind.disease_id
JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = da.drug_id
WHERE da.approval_status = 'approved'
UNION ALL
SELECT
i.drug_id,
d.name AS drug_name,
td.disease_id,
dis.name AS disease_name,
ct.phase AS highest_phase,
CASE ct.phase
WHEN 'phase4' THEN 4
WHEN 'phase3' THEN 3
WHEN 'phase2_3' THEN 3
WHEN 'phase2' THEN 2
WHEN 'phase1_2' THEN 2
WHEN 'phase1' THEN 1
WHEN 'early_phase1' THEN 1
ELSE 0
END AS highest_phase_rank,
'clinical_trial' AS phase_source,
ARRAY[ct.nct_id] AS nct_ids,
NULL::timestamptz AS approval_date,
ct.completion_date AS earliest_completion_date
FROM {{ source('bioloupe', 'trial_arm_interventions') }} tai
JOIN {{ source('bioloupe', 'interventions') }} i ON i.id = tai.intervention_id
JOIN {{ source('bioloupe', 'trial_arms') }} ta ON ta.id = tai.trial_arm_id
JOIN {{ source('bioloupe', 'clinical_trials') }} ct ON ct.id = ta.clinical_trial_id
JOIN {{ source('bioloupe', 'trial_diseases') }} td ON td.clinical_trial_id = ct.id
JOIN {{ source('bioloupe', 'diseases') }} dis ON dis.id = td.disease_id
JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = i.drug_id
WHERE i.drug_id IS NOT NULL
AND ct.overall_status NOT IN ('withdrawn')
),
aggregated AS (
SELECT
drug_id, drug_name, disease_id, disease_name,
highest_phase, highest_phase_rank, phase_source,
CASE
WHEN phase_source = 'clinical_trial'
THEN array_agg(DISTINCT unnested_nct ORDER BY unnested_nct)
ELSE NULL::text[]
END AS nct_ids,
MIN(approval_date) AS approval_date,
MIN(earliest_completion_date) AS earliest_completion_date
FROM phase_sources,
LATERAL unnest(COALESCE(nct_ids, ARRAY[NULL::text])) AS unnested_nct
GROUP BY drug_id, drug_name, disease_id, disease_name,
highest_phase, highest_phase_rank, phase_source
),
cleaned AS (
SELECT
drug_id, drug_name, disease_id, disease_name,
highest_phase, highest_phase_rank, phase_source,
array_remove(nct_ids, NULL) AS nct_ids,
approval_date, earliest_completion_date
FROM aggregated
)
SELECT DISTINCT ON (drug_id, disease_id)
drug_id, drug_name, disease_id, disease_name,
highest_phase, highest_phase_rank, phase_source,
nct_ids, approval_date, earliest_completion_date
FROM cleaned
ORDER BY drug_id, disease_id, highest_phase_rank DESC, phase_source ASC

mv_competitive_landscape.sql (uses ref + source)

Section titled “mv_competitive_landscape.sql (uses ref + source)”
{{
config(
materialized='table',
indexes=[
{'columns': ['drug_id', 'disease_id'], 'unique': true, 'type': 'btree'},
{'columns': ['disease_id'], 'type': 'btree'},
{'columns': ['highest_phase_rank'], 'type': 'btree'},
{'columns': ['technology_name'], 'type': 'btree'},
],
)
}}
SELECT
dp.drug_id,
dp.drug_name,
dp.disease_id,
dp.disease_name,
dp.highest_phase,
dp.highest_phase_rank,
dp.phase_source,
d.drug_type,
t.name AS technology_name,
t.drug_class,
owners.current_owner_names
FROM {{ ref('mv_development_programs') }} dp
JOIN {{ source('bioloupe', 'drugs') }} d ON d.id = dp.drug_id
LEFT JOIN {{ source('bioloupe', 'technologies') }} t ON t.id = d.technology_id
LEFT JOIN LATERAL (
SELECT array_agg(DISTINCT o.name ORDER BY o.name) AS current_owner_names
FROM {{ source('bioloupe', 'drug_ownerships') }} dow
JOIN {{ source('bioloupe', 'organisations') }} o ON o.id = dow.organisation_id
WHERE dow.drug_id = d.id AND dow.is_current = true
) owners ON true
{{
config(
materialized='table',
indexes=[
{'columns': ['drug_id'], 'unique': true, 'type': 'btree'},
{'columns': ['drug_status'], 'type': 'btree'},
{'columns': ['highest_phase_rank_overall'], 'type': 'btree'},
{'columns': ['technology_name'], 'type': 'btree'},
],
)
}}
SELECT
d.id AS drug_id,
d.bloupe_id,
d.name AS drug_name,
d.drug_status,
d.drug_type,
d.first_in_class,
EXISTS(SELECT 1 FROM {{ source('bioloupe', 'drugs') }} c WHERE c.parent_id = d.id) AS is_combination,
t.name AS technology_name,
t.drug_class,
primary_tgt.target_name AS primary_target_name,
all_tgts.target_names AS target_names,
owners.owner_names AS current_owner_names,
phase_agg.highest_phase_overall,
phase_agg.highest_phase_rank_overall
FROM {{ source('bioloupe', 'drugs') }} d
LEFT JOIN {{ source('bioloupe', 'technologies') }} t ON t.id = d.technology_id
LEFT JOIN LATERAL (
SELECT tgt.name AS target_name
FROM {{ source('bioloupe', 'drug_target_actions') }} dta
JOIN {{ source('bioloupe', 'targets') }} tgt ON tgt.id = dta.target_id
WHERE dta.drug_id = d.id
ORDER BY dta.is_primary DESC, tgt.name ASC
LIMIT 1
) primary_tgt ON true
LEFT JOIN LATERAL (
SELECT array_agg(DISTINCT tgt.name ORDER BY tgt.name) AS target_names
FROM {{ source('bioloupe', 'drug_target_actions') }} dta
JOIN {{ source('bioloupe', 'targets') }} tgt ON tgt.id = dta.target_id
WHERE dta.drug_id = d.id
) all_tgts ON true
LEFT JOIN LATERAL (
SELECT array_agg(DISTINCT o.name ORDER BY o.name) AS owner_names
FROM {{ source('bioloupe', 'drug_ownerships') }} dow
JOIN {{ source('bioloupe', 'organisations') }} o ON o.id = dow.organisation_id
WHERE dow.drug_id = d.id AND dow.is_current = true
) owners ON true
LEFT JOIN LATERAL (
SELECT
MAX(dp.highest_phase_rank) AS highest_phase_rank_overall,
(array_agg(dp.highest_phase ORDER BY dp.highest_phase_rank DESC))[1] AS highest_phase_overall
FROM {{ ref('mv_development_programs') }} dp
WHERE dp.drug_id = d.id
) phase_agg ON true