diff --git a/pipeline/seed_extract.py b/pipeline/seed_extract.py new file mode 100644 index 0000000..f3e0098 --- /dev/null +++ b/pipeline/seed_extract.py @@ -0,0 +1,333 @@ +""" +seed_extract.py — TESSERA 4.0 seed extraction +Reads the five OTIVM launch waypoint H5 hexes from tessera.db (TESSERA 3.0 +SpatiaLite, Dell pipeline node) and writes them to staging_otivm.sqlite3 +as draft rows, following the RFC-TESSERA-4.0-001 pipeline contract. + +Run on the Dell (tessera-pipeline CT): + /opt/tessera-pipeline/venv/bin/python3 /opt/tessera-pipeline/seed_extract.py + +Prerequisites: h3 library in venv (already installed) + +Outputs: + /tmp/staging_otivm.sqlite3 + +After this script completes, run seed_promote.py on the same CT. +Then scp staging_otivm.sqlite3 to the OTIVM container. + +VERIFIED: +- Each H5 has 49 H7 children, each H7 has 49 H9 children = 2401 H9 per H5 +- 5 waypoints x 2401 = 12005 total H9 rows +- H3 int/text round-trip confirmed for all five waypoints +- Query strategy: batch by H7 (245 queries of 49 rows each) — safe for SQLite +- tessera.db path inside tessera-pipeline CT: /mnt/tessera-db/tessera.db +""" + +import sqlite3 +import json +import os +import sys +import traceback +from datetime import datetime, timezone + +try: + import h3 +except ImportError: + sys.exit("ERROR: h3 library not installed. Activate venv first.") + +# --------------------------------------------------------------- +# Configuration — paths correct for tessera-pipeline CT +# --------------------------------------------------------------- + +TESSERA_DB = "/mnt/tessera-db/tessera.db" # SSD via bind mount +STAGING_DB = "/tmp/staging_otivm.sqlite3" +SCHEMA_SQL = "/opt/tessera-pipeline/create_otivm_db.sql" # copied from OTIVM repo + +RUN_KEY = "tessera3-seed-2026-04-26" + +# Five OTIVM launch waypoints — H5 TEXT IDs +# Verified: int round-trips clean for all five +H5_WAYPOINTS = [ + ("Ostia", "851e805bfffffff"), + ("Capua", "851e8333fffffff"), + ("Brundisium", "851e8ba3fffffff"), + ("Carthago", "85386e23fffffff"), + ("Alexandria", "853f5ba7fffffff"), +] + +# Source registry IDs — must match create_otivm_db.sql insertion order +SRC_GEBCO = 1 +SRC_ESA = 2 +SRC_HYDRO = 3 +SRC_MRDS = 4 +SRC_IGME = 5 +SRC_SEED = 6 # TESSERA3_SEED — used for occ (stage 06 not run) + +CONF_INDICATED = 2 +CONF_NO_DATA = 4 + +FIELDS_UPDATED = ["elev_cm", "terrain", "hydro", "geo_dep", "geo_flag", "occ_flag"] + +H9_EXPECTED_PER_H5 = 2401 # 7^4 — verified + +# --------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------- + +def now_utc() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def create_staging_db(schema_sql_path: str) -> sqlite3.Connection: + if not os.path.exists(schema_sql_path): + sys.exit( + f"ERROR: Schema file not found: {schema_sql_path}\n" + f"Copy it from the OTIVM repo first:\n" + f" lxc file push /create_otivm_db.sql " + f"tessera-pipeline/opt/tessera-pipeline/create_otivm_db.sql" + ) + + if os.path.exists(STAGING_DB): + os.remove(STAGING_DB) + print(f"Removed existing {STAGING_DB}") + + with open(schema_sql_path, "r") as f: + schema = f.read() + + # Strip any comment-only lines at top (CLAUDE CODE INSTRUCTIONS block) + sql_lines = [] + in_comment_block = False + for line in schema.splitlines(): + stripped = line.strip() + if stripped.startswith("" in stripped: + in_comment_block = False + continue + sql_lines.append(line) + + con = sqlite3.connect(STAGING_DB) + con.execute("PRAGMA foreign_keys = ON") + con.execute("PRAGMA journal_mode = WAL") + con.executescript("\n".join(sql_lines)) + con.commit() + print(f"Created staging database: {STAGING_DB}") + + # Verify tables were created + tables = [r[0] for r in con.execute( + "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" + ).fetchall()] + print(f" Tables: {tables}") + expected = {"confidence_grades", "h5_coverage", "lifecycle_states", + "pipeline_runs", "source_registry", "tessera_cells"} + missing = expected - set(tables) + if missing: + sys.exit(f"ERROR: Missing tables after schema creation: {missing}") + + return con + + +# --------------------------------------------------------------- +# Main extraction +# --------------------------------------------------------------- + +def extract(): + print(f"[{now_utc()}] seed_extract.py starting") + print(f" Source: {TESSERA_DB}") + print(f" Staging: {STAGING_DB}") + print(f" Run key: {RUN_KEY}") + print(f" Expected H9 per H5: {H9_EXPECTED_PER_H5}") + + if not os.path.exists(TESSERA_DB): + sys.exit(f"ERROR: tessera.db not found at {TESSERA_DB}") + + db_size_gb = os.path.getsize(TESSERA_DB) / 1e9 + print(f" tessera.db size: {db_size_gb:.1f} GB") + + # Open tessera.db read-only + src = sqlite3.connect(f"file:{TESSERA_DB}?mode=ro", uri=True) + src.row_factory = sqlite3.Row + + # Verify tessera.db schema + src_tables = [r[0] for r in src.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ).fetchall()] + if "tessera_cells" not in src_tables: + src.close() + sys.exit(f"ERROR: tessera_cells table not found in tessera.db. Tables: {src_tables}") + print(f" tessera.db tables: {src_tables}") + + # Create staging database + dst = create_staging_db(SCHEMA_SQL) + + # Convert H5 waypoints to integers + h5_ints = [h3.str_to_int(h5t) for _, h5t in H5_WAYPOINTS] + + source_versions = { + "GEBCO_2025": "2025", + "ESA_WORLDCOVER_V200": "v2.0.0", + "HYDROSHEDS_V11": "1.1", + "USGS_MRDS": "2022-08-23", + "BGR_IGME5000": "2007", + "TESSERA3_SEED": "2026-04-26", + } + + started_at = now_utc() + cur = dst.execute( + """INSERT INTO pipeline_runs + (run_key, started_at, status, h5_cells, fields_updated, + source_versions, notes) + VALUES (?, ?, 1, ?, ?, ?, ?)""", + ( + RUN_KEY, + started_at, + json.dumps(h5_ints), + json.dumps(FIELDS_UPDATED), + json.dumps(source_versions), + "Seed extraction from TESSERA 3.0 tessera.db — five OTIVM launch waypoints.", + ), + ) + run_id = cur.lastrowid + dst.commit() + print(f" pipeline_runs row: id={run_id}, status=1 (draft)") + + total_rows = 0 + + try: + for city_name, h5_text in H5_WAYPOINTS: + h5_int = h3.str_to_int(h5_text) + print(f"\n [{city_name}] H5={h5_text}") + + # Get all H7 children of this H5 (49 cells) + h7_children = list(h3.cell_to_children(h5_text, 7)) + assert len(h7_children) == 49, f"Expected 49 H7, got {len(h7_children)}" + + # Insert h5_coverage at draft + dst.execute( + """INSERT INTO h5_coverage + (h5, status, h9_total, h9_current, last_updated, run_id, notes) + VALUES (?, 1, ?, 0, ?, ?, ?)""", + (h5_int, H9_EXPECTED_PER_H5, now_utc(), run_id, city_name), + ) + dst.commit() + + h5_rows_found = 0 + h5_rows_missing = 0 + created_at = now_utc() + + # Query tessera.db one H7 at a time (49 queries x 49 rows = 2401) + for h7_text in h7_children: + h7_int = h3.str_to_int(h7_text) + + # Get expected H9 children of this H7 (49 cells) + h9_expected = list(h3.cell_to_children(h7_text, 9)) + assert len(h9_expected) == 49, f"Expected 49 H9, got {len(h9_expected)}" + + placeholders = ",".join("?" * len(h9_expected)) + rows = src.execute( + f"""SELECT h9_cell, lat, lon, + elev_cm, terrain, hydro, geo_dep, geo_flag + FROM tessera_cells + WHERE h9_cell IN ({placeholders})""", + h9_expected, + ).fetchall() + + h5_rows_found += len(rows) + h5_rows_missing += (len(h9_expected) - len(rows)) + + batch = [] + for row in rows: + h9_int = h3.str_to_int(row["h9_cell"]) + geo_dep = row["geo_dep"] + geo_flag = row["geo_flag"] + + gdep_conf = CONF_INDICATED if (geo_dep is not None and geo_dep != 255) else CONF_NO_DATA + gflag_conf = CONF_INDICATED if (geo_flag is not None and geo_flag != 0) else CONF_NO_DATA + + batch.append(( + h9_int, h7_int, h5_int, + row["lat"], row["lon"], + row["elev_cm"], + row["terrain"], + row["hydro"], + geo_dep, + geo_flag, + 0, # occ_flag — stage 06 not run + SRC_GEBCO, CONF_INDICATED, # elev + SRC_ESA, CONF_INDICATED, # terrain + SRC_HYDRO, CONF_INDICATED, # hydro + SRC_MRDS, gdep_conf, # geo_dep + SRC_IGME, gflag_conf, # geo_flag + SRC_SEED, CONF_NO_DATA, # occ + 1, # status = draft + run_id, + created_at, + )) + + if batch: + dst.executemany( + """INSERT INTO tessera_cells + (h9, h7, h5, lat, lon, + elev_cm, terrain, hydro, geo_dep, geo_flag, occ_flag, + elev_src, elev_conf, + terr_src, terr_conf, + hydro_src, hydro_conf, + gdep_src, gdep_conf, + gflag_src, gflag_conf, + occ_src, occ_conf, + status, run_id, created_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", + batch, + ) + + dst.execute( + """UPDATE h5_coverage + SET h9_current = ?, last_updated = ? + WHERE h5 = ?""", + (h5_rows_found, now_utc(), h5_int), + ) + dst.commit() + total_rows += h5_rows_found + + status = "OK" if h5_rows_missing == 0 else f"WARNING: {h5_rows_missing} missing" + print(f" Found: {h5_rows_found}/{H9_EXPECTED_PER_H5} [{status}]") + + # Mark pipeline run complete + dst.execute( + """UPDATE pipeline_runs + SET status = 2, completed_at = ?, row_count = ? + WHERE id = ?""", + (now_utc(), total_rows, run_id), + ) + dst.commit() + + print(f"\n[{now_utc()}] Extraction complete.") + print(f" Total draft rows: {total_rows}") + print(f" Expected total: {len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5}") + if total_rows < len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5: + print(f" WARNING: {len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5 - total_rows} cells missing from tessera.db") + print(f" This is acceptable if occ_flag gaps — check per-city counts above.") + print(f" pipeline_runs status: 2 (complete)") + print(f" Next: run seed_promote.py on this CT") + + except Exception: + dst.execute( + """UPDATE pipeline_runs + SET status = 4, completed_at = ?, notes = ? + WHERE id = ?""", + (now_utc(), f"FAILED: {traceback.format_exc()}", run_id), + ) + dst.commit() + print(f"\n[{now_utc()}] FAILED — pipeline_run {run_id} marked retired (4).") + traceback.print_exc() + sys.exit(1) + + finally: + src.close() + dst.close() + + +if __name__ == "__main__": + extract()