From 96aad19b16ca250359bd3a0b4de90146ad3bf1e6 Mon Sep 17 00:00:00 2001 From: Damon McCullough Date: Tue, 16 Jun 2026 21:39:38 -0400 Subject: [PATCH 1/2] add build DB disk usage report script --- admin/ops/clean_build_artifacts.py | 20 +- admin/ops/disk_usage_report.py | 316 +++++++++++++++++++++++++++++ dcpy/connectors/edm/connectors.py | 21 ++ 3 files changed, 342 insertions(+), 15 deletions(-) create mode 100644 admin/ops/disk_usage_report.py diff --git a/admin/ops/clean_build_artifacts.py b/admin/ops/clean_build_artifacts.py index 52498a833b..a9bc1f6638 100644 --- a/admin/ops/clean_build_artifacts.py +++ b/admin/ops/clean_build_artifacts.py @@ -3,6 +3,7 @@ import requests import typer +from dcpy.connectors.edm import connectors from dcpy.lifecycle.builds import metadata from dcpy.utils import postgres from dcpy.utils.git import github @@ -10,22 +11,11 @@ BUILD_REPO = "data-engineering" -BUILD_DBS = [ - "db-cbbr", - "db-cdbg", - "db-ceqr", - "db-checkbook", - "db-colp", - "db-cpdb", - "db-devdb", - "db-facilities", - "db-green-fast-track", - # "db-cscl", we need to preserve schemas while this data product is in development - "db-pluto", - "db-template", - "db-ztl", - "kpdb", +# Databases whose build schemas we never auto-drop (e.g. active development). +PRESERVED_DBS = [ + "db-cscl", # we need to preserve schemas while this data product is in development ] +BUILD_DBS = [db for db in connectors.BUILD_DBS if db not in PRESERVED_DBS] PROTECTED_BUILD_NAMES = ["nightly_qa"] diff --git a/admin/ops/disk_usage_report.py b/admin/ops/disk_usage_report.py new file mode 100644 index 0000000000..433325c119 --- /dev/null +++ b/admin/ops/disk_usage_report.py @@ -0,0 +1,316 @@ +""" +Disk usage diagnostic across all build databases. + +Surfaces: + - Database sizes (cluster-level) + - Inactive replication slots holding WAL + - Per-database: schema sizes, top tables, bloated tables, unused indexes + +Usage: + python3 admin/ops/disk_usage_report.py [--db DB_NAME] [--output PATH] + +With --db, report only the named database (useful for drilling in). The report is +both printed and saved to a file (default: .lifecycle/logs/disk_usage_.txt). +""" + +import os +import sys +from datetime import datetime +from pathlib import Path + +import pandas as pd +import typer +from sqlalchemy import create_engine, text + +from dcpy.connectors.edm.connectors import BUILD_DBS +from dcpy.utils import postgres + +IGNORED_SCHEMAS = frozenset( + postgres.PROTECTED_POSTGRES_SCHEMAS + + ["pg_toast", "tiger", "tiger_data", "topology"] +) + + +class _Tee: + """Writes to both stdout and a file simultaneously.""" + + def __init__(self, path: Path): + self._file = path.open("w") + self._stdout = sys.stdout + + def write(self, text: str): + self._stdout.write(text) + self._file.write(text) + + def flush(self): + self._stdout.flush() + self._file.flush() + + def close(self): + self._file.close() + + +pd.set_option("display.max_rows", 100) +pd.set_option("display.max_colwidth", 60) +pd.set_option("display.width", 120) + +app = typer.Typer(add_completion=False) + + +def _engine(database: str): + server = os.environ["BUILD_ENGINE_SERVER"] + uri = postgres.generate_engine_uri(server, database) + return create_engine(uri, isolation_level="AUTOCOMMIT") + + +def _query(engine, sql: str) -> pd.DataFrame: + with engine.connect() as conn: + return pd.read_sql(text(sql), conn) + + +def _section(title: str): + print(f"\n{'=' * 60}") + print(f" {title}") + print("=" * 60) + + +def _print(df: pd.DataFrame, indent: int = 2): + prefix = " " * indent + for line in df.to_string(index=False).splitlines(): + print(prefix + line) + + +# --------------------------------------------------------------------------- +# Cluster-level queries (can run from any database) +# --------------------------------------------------------------------------- + + +def report_database_sizes(engine): + _section("Database sizes (cluster)") + df = _query( + engine, + """ + SELECT + datname AS database, + pg_size_pretty(pg_database_size(datname)) AS size, + pg_database_size(datname) AS size_bytes + FROM pg_database + WHERE datname NOT IN ('template0', 'template1', 'postgres', 'defaultdb', 'rdsadmin') + ORDER BY pg_database_size(datname) DESC + """, + ) + df = df.drop(columns=["size_bytes"]) + _print(df) + + +def report_replication_slots(engine): + _section("Replication slots (WAL retention risk)") + try: + df = _query( + engine, + """ + SELECT + slot_name, + slot_type, + active, + pg_size_pretty( + pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) + ) AS wal_retained + FROM pg_replication_slots + """, + ) + except Exception: + # pg_wal_lsn_diff may be restricted on managed databases; fall back + try: + df = _query( + engine, + """ + SELECT slot_name, slot_type, active + FROM pg_replication_slots + """, + ) + except Exception as e: + print(f" (skipped — insufficient privileges: {e})") + return + if df.empty: + print(" (none)") + else: + _print(df) + inactive = df[~df["active"]] + if not inactive.empty: + print( + "\n !! Inactive slots above are holding WAL — consider dropping them:" + ) + for name in inactive["slot_name"]: + print(f" SELECT pg_drop_replication_slot('{name}');") + + +# --------------------------------------------------------------------------- +# Per-database queries +# --------------------------------------------------------------------------- + + +def report_schema_sizes(engine, database: str): + _section(f"[{database}] Schema sizes") + df = _query( + engine, + """ + SELECT + n.nspname AS schema, + pg_size_pretty(sum(pg_total_relation_size(c.oid))::bigint) AS total_size, + sum(pg_total_relation_size(c.oid)) AS total_bytes, + count(*) AS relations + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind IN ('r', 'm') + GROUP BY n.nspname + ORDER BY sum(pg_total_relation_size(c.oid)) DESC + """, + ) + df = df.drop(columns=["total_bytes"]) + _print(df) + + +def report_top_tables(engine, database: str, limit: int = 20): + _section(f"[{database}] Top {limit} tables by total size") + df = _query( + engine, + f""" + SELECT + schemaname AS schema, + tablename AS table, + pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS total, + pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) AS heap, + pg_size_pretty(pg_indexes_size(schemaname||'.'||tablename)) AS indexes, + pg_size_pretty( + pg_total_relation_size(schemaname||'.'||tablename) + - pg_relation_size(schemaname||'.'||tablename) + - pg_indexes_size(schemaname||'.'||tablename) + ) AS toast + FROM pg_tables + WHERE schemaname NOT IN ({", ".join(repr(s) for s in IGNORED_SCHEMAS)}) + ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC + LIMIT {limit} + """, + ) + _print(df) + + +def report_bloat(engine, database: str): + _section(f"[{database}] Table bloat (dead tuples > 10%)") + df = _query( + engine, + """ + SELECT + schemaname AS schema, + relname AS table, + pg_size_pretty(pg_total_relation_size(schemaname||'.'||relname)) AS total_size, + n_live_tup AS live, + n_dead_tup AS dead, + round( + n_dead_tup::numeric / NULLIF(n_live_tup + n_dead_tup, 0) * 100, 1 + ) AS dead_pct, + coalesce(last_vacuum::date::text, last_autovacuum::date::text, 'never') AS last_vacuum + FROM pg_stat_user_tables + WHERE n_dead_tup > 1000 + AND round( + n_dead_tup::numeric / NULLIF(n_live_tup + n_dead_tup, 0) * 100, 1 + ) > 10 + ORDER BY n_dead_tup DESC + LIMIT 20 + """, + ) + if df.empty: + print(" (no bloated tables found)") + else: + _print(df) + print("\n To reclaim space (locks table; run during downtime):") + print(" VACUUM FULL ANALYZE .;") + + +def report_unused_indexes(engine, database: str, min_size_mb: int = 1): + _section(f"[{database}] Unused indexes >= {min_size_mb} MB (idx_scan = 0)") + df = _query( + engine, + f""" + SELECT + schemaname AS schema, + relname AS table, + indexrelname AS index, + pg_size_pretty(pg_relation_size(indexrelid)) AS index_size, + pg_relation_size(indexrelid) AS index_bytes + FROM pg_stat_user_indexes + WHERE idx_scan = 0 + AND pg_relation_size(indexrelid) >= {min_size_mb * 1024 * 1024} + ORDER BY pg_relation_size(indexrelid) DESC + LIMIT 30 + """, + ) + if df.empty: + print(f" (no unused indexes >= {min_size_mb} MB)") + else: + total_mb = df["index_bytes"].sum() / (1024 * 1024) + df = df.drop(columns=["index_bytes"]) + _print(df) + print(f"\n Total reclaimable from unused indexes: {total_mb:.0f} MB") + print(" To drop (verify first that the index is truly unused):") + print(" DROP INDEX CONCURRENTLY .;") + + +# --------------------------------------------------------------------------- +# Entrypoint +# --------------------------------------------------------------------------- + + +@app.command() +def main( + db: str = typer.Option(None, "--db", help="Limit report to a single database"), + output: Path = typer.Option( + None, + "--output", + "-o", + help="Write report to this file (default: disk_usage_.txt)", + ), +): + """Print disk usage diagnostics across build databases.""" + out_path = ( + output + or Path(".lifecycle/logs") + / f"disk_usage_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + ) + out_path.parent.mkdir(parents=True, exist_ok=True) + + databases = [db] if db else BUILD_DBS + + original_stdout = sys.stdout + tee = _Tee(out_path) + sys.stdout = tee + try: + # Use the first listed db for cluster-level queries + cluster_engine = _engine(databases[0]) + report_database_sizes(cluster_engine) + report_replication_slots(cluster_engine) + cluster_engine.dispose() + + for database in databases: + print(f"\n{'#' * 60}") + print(f"# {database}") + print(f"{'#' * 60}") + try: + engine = _engine(database) + report_schema_sizes(engine, database) + report_top_tables(engine, database) + report_bloat(engine, database) + report_unused_indexes(engine, database) + engine.dispose() + except Exception as e: + print(f" ERROR connecting to {database}: {e}") + finally: + sys.stdout = original_stdout + tee.close() + + print(f"\nReport saved to {out_path}") + + +if __name__ == "__main__": + app() diff --git a/dcpy/connectors/edm/connectors.py b/dcpy/connectors/edm/connectors.py index 60e4d047ec..67512f5970 100644 --- a/dcpy/connectors/edm/connectors.py +++ b/dcpy/connectors/edm/connectors.py @@ -67,6 +67,27 @@ } +# All Postgres build/transform databases in the edm-data cluster (one per product). +# Source of truth for admin tooling that operates across the whole cluster, e.g. +# stale-schema cleanup and disk-usage reporting. +BUILD_DBS = [ + "db-cbbr", + "db-cdbg", + "db-ceqr", + "db-checkbook", + "db-colp", + "db-cpdb", + "db-cscl", + "db-devdb", + "db-facilities", + "db-green-fast-track", + "db-pluto", + "db-template", + "db-ztl", + "kpdb", +] + + def _map_product_name(product: str) -> str: """Map recipe product name to S3 product name if needed.""" return PRODUCT_NAME_MAPPING.get(product, product) From d786cb0125664a7a95a571239c5a5ec987210945 Mon Sep 17 00:00:00 2001 From: Damon McCullough Date: Sun, 14 Jun 2026 23:16:44 -0400 Subject: [PATCH 2/2] migrate developments build to ingested dob_bis datasets --- products/developments/README.md | 4 +- products/developments/models/sources.yml | 4 +- products/developments/recipe.yml | 4 +- products/developments/sql/_bis.sql | 91 +++++++++++++++++++++++- products/developments/sql/_status_q.sql | 20 ++++-- 5 files changed, 111 insertions(+), 12 deletions(-) diff --git a/products/developments/README.md b/products/developments/README.md index 944d1f608e..bc8cc09294 100644 --- a/products/developments/README.md +++ b/products/developments/README.md @@ -118,8 +118,8 @@ env: - [ ] `hpd_hny_units_by_building` - [ ] `hny_geocode_results` -- [ ] `dob_permitissuance` -- [ ] `dob_permitissuance` +- [ ] `dob_bis_permits` +- [ ] `dob_bis_applications` #### Other DOB data diff --git a/products/developments/models/sources.yml b/products/developments/models/sources.yml index 31c503608d..91eec01883 100644 --- a/products/developments/models/sources.yml +++ b/products/developments/models/sources.yml @@ -29,6 +29,6 @@ sources: - name: doitt_buildingfootprints_historical - name: dcp_censusdata - name: dcp_censusdata_blocks - - name: dob_permitissuance - - name: dob_jobapplications + - name: dob_bis_permits + - name: dob_bis_applications - name: dob_jobapplications_parkingspaces diff --git a/products/developments/recipe.yml b/products/developments/recipe.yml index 49362cb63c..28c176be5f 100644 --- a/products/developments/recipe.yml +++ b/products/developments/recipe.yml @@ -53,8 +53,8 @@ inputs: version: "2020" - name: dcp_censusdata_blocks version: "2020" - - name: dob_permitissuance - - name: dob_jobapplications + - name: dob_bis_permits + - name: dob_bis_applications - name: dob_jobapplications_parkingspaces version: 20240603 - id: db-developments diff --git a/products/developments/sql/_bis.sql b/products/developments/sql/_bis.sql index c757903415..07291b0041 100644 --- a/products/developments/sql/_bis.sql +++ b/products/developments/sql/_bis.sql @@ -3,7 +3,8 @@ DESCRIPTION: Initial field mapping and prelimilary data cleaning for BIS job applications data INPUTS: - dob_jobapplications + dob_bis_applications + dob_jobapplications_parkingspaces OUTPUTS: _INIT_BIS_devdb ( @@ -66,7 +67,93 @@ OUTPUTS: DROP TABLE IF EXISTS _init_bis_devdb; -WITH applications AS (SELECT * FROM dob_jobapplications), +-- dob_bis_applications is now ingested raw (column names cleaned to snake_case), +-- so the filtering, gid de-duplication and field naming that the data-library +-- template used to do is reproduced here. +WITH applications_raw AS ( + SELECT + "job_#" AS jobnumber, + "doc_#" AS jobdocnumber, + job_type AS jobtype, + job_description AS jobdescription, + existing_occupancy AS existingoccupancy, + proposed_occupancy AS proposedoccupancy, + "existingno._of_stories" AS existingnumstories, + "proposed_no._of_stories" AS proposednumstories, + existing_zoning_sqft AS existingzoningsqft, + proposed_zoning_sqft AS proposedzoningsqft, + existing_dwelling_units AS existingdwellingunits, + proposed_dwelling_units AS proposeddwellingunits, + job_status_descrp AS jobstatusdesc, + latest_action_date AS latestactiondate, + sprinkler, + pre__filing_date AS prefilingdate, + fully_paid AS fullypaid, + approved, + fully_permitted AS fullypermitted, + signoff_date AS signoffdate, + zoning_dist1 AS zoningdist1, + zoning_dist2 AS zoningdist2, + zoning_dist3 AS zoningdist3, + special_district_1 AS specialdistrict1, + special_district_2 AS specialdistrict2, + landmarked, + city_owned AS cityowned, + owner_type AS ownertype, + non_profit AS nonprofit, + owner_s_first_name AS ownerfirstname, + owner_s_last_name AS ownerlastname, + owner_s_business_name AS ownerbusinessname, + owner_shouse_street_name AS ownerhousestreetname, + zip, + -- source dataset no longer provides an owner phone column + NULL::text AS ownerphone, + existing_height AS existingheight, + proposed_height AS proposedheight, + total_construction_floor_area AS totalconstructionfloorarea, + horizontal_enlrgmt AS horizontalenlrgmt, + vertical_enlrgmt AS verticalenlrgmt, + enlargement_sq_footage AS enlargementsqfootage, + initial_cost AS initialcost, + loft_board AS loftboard, + little_e AS littlee, + curb_cut AS curbcut, + cluster, + "house_#" AS housenumber, + street_name AS streetname, + "bin_#" AS bin, + borough, + block, + lot, + special_action_status AS specialactionstatus, + latitude, + longitude, + building_class AS buildingclass, + other_description AS otherdesc, + dobrundate + FROM dob_bis_applications + WHERE + "doc_#" = '01' + AND ( + job_type LIKE '%A1%' + OR job_type LIKE '%A2%' + OR job_type LIKE '%DM%' + OR job_type LIKE '%NB%' + ) +), + +applications AS ( + SELECT + *, + row_number() OVER ( + PARTITION BY jobnumber + ORDER BY + substr(dobrundate, 7, 4) + || substr(dobrundate, 1, 2) + || substr(dobrundate, 4, 2) DESC + ) AS gid + FROM applications_raw +), parking_spaces AS (SELECT * FROM dob_jobapplications_parkingspaces), diff --git a/products/developments/sql/_status_q.sql b/products/developments/sql/_status_q.sql index a2adeb6eab..ddc454023c 100644 --- a/products/developments/sql/_status_q.sql +++ b/products/developments/sql/_status_q.sql @@ -7,9 +7,11 @@ INPUTS: job_number ) - dob_permitissuance ( - jobnum, - issuancedate + dob_bis_permits ( + job_number, + job_doc_number, + job_type, + issuance_date ) OUTPUTS: @@ -23,11 +25,21 @@ OUTPUTS: */ DROP TABLE IF EXISTS status_q_devdb; WITH +-- dob_bis_permits is now ingested raw; alias the cleaned column names back to +-- the names this script expects +permits AS ( + SELECT + job_number AS jobnum, + job_doc_number AS jobdocnum, + job_type AS jobtype, + issuance_date AS issuancedate + FROM dob_bis_permits +), status_q_create AS ( SELECT jobnum AS job_number, min(issuancedate::date) AS date_permittd - FROM dob_permitissuance + FROM permits WHERE jobdocnum = '01' AND jobtype ~* 'A1|DM|NB|A2'