""" seed_extract.py — TESSERA 4.0 seed extraction Reads the five OTIVM launch waypoint H5 hexes from tessera.db (TESSERA 3.0 SpatiaLite, Dell pipeline node) and writes them to staging_otivm.sqlite3 as draft rows, following the RFC-TESSERA-4.0-001 pipeline contract. Run on the Dell (tessera-pipeline CT): /opt/tessera-pipeline/venv/bin/python3 /opt/tessera-pipeline/seed_extract.py Prerequisites: h3 library in venv (already installed) Outputs: /tmp/staging_otivm.sqlite3 After this script completes, run seed_promote.py on the same CT. Then scp staging_otivm.sqlite3 to the OTIVM container. VERIFIED: - Each H5 has 49 H7 children, each H7 has 49 H9 children = 2401 H9 per H5 - 5 waypoints x 2401 = 12005 total H9 rows - H3 int/text round-trip confirmed for all five waypoints - Query strategy: batch by H7 (245 queries of 49 rows each) — safe for SQLite - tessera.db path inside tessera-pipeline CT: /mnt/tessera-db/tessera.db """ import sqlite3 import json import os import sys import traceback from datetime import datetime, timezone try: import h3 except ImportError: sys.exit("ERROR: h3 library not installed. Activate venv first.") # --------------------------------------------------------------- # Configuration — paths correct for tessera-pipeline CT # --------------------------------------------------------------- TESSERA_DB = "/mnt/tessera-db/tessera.db" # SSD via bind mount STAGING_DB = "/tmp/staging_otivm.sqlite3" SCHEMA_SQL = "/opt/tessera-pipeline/create_otivm_db.sql" # copied from OTIVM repo RUN_KEY = "tessera3-seed-2026-04-26" # Five OTIVM launch waypoints — H5 TEXT IDs # Verified: int round-trips clean for all five H5_WAYPOINTS = [ ("Ostia", "851e805bfffffff"), ("Capua", "851e8333fffffff"), ("Brundisium", "851e8ba3fffffff"), ("Carthago", "85386e23fffffff"), ("Alexandria", "853f5ba7fffffff"), ] # Source registry IDs — must match create_otivm_db.sql insertion order SRC_GEBCO = 1 SRC_ESA = 2 SRC_HYDRO = 3 SRC_MRDS = 4 SRC_IGME = 5 SRC_SEED = 6 # TESSERA3_SEED — used for occ (stage 06 not run) CONF_INDICATED = 2 CONF_NO_DATA = 4 FIELDS_UPDATED = ["elev_cm", "terrain", "hydro", "geo_dep", "geo_flag", "occ_flag"] H9_EXPECTED_PER_H5 = 2401 # 7^4 — verified # --------------------------------------------------------------- # Helpers # --------------------------------------------------------------- def now_utc() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def create_staging_db(schema_sql_path: str) -> sqlite3.Connection: if not os.path.exists(schema_sql_path): sys.exit( f"ERROR: Schema file not found: {schema_sql_path}\n" f"Copy it from the OTIVM repo first:\n" f" lxc file push /create_otivm_db.sql " f"tessera-pipeline/opt/tessera-pipeline/create_otivm_db.sql" ) if os.path.exists(STAGING_DB): os.remove(STAGING_DB) print(f"Removed existing {STAGING_DB}") with open(schema_sql_path, "r") as f: schema = f.read() # Strip any comment-only lines at top (CLAUDE CODE INSTRUCTIONS block) sql_lines = [] in_comment_block = False for line in schema.splitlines(): stripped = line.strip() if stripped.startswith("" in stripped: in_comment_block = False continue sql_lines.append(line) con = sqlite3.connect(STAGING_DB) con.execute("PRAGMA foreign_keys = ON") con.execute("PRAGMA journal_mode = WAL") con.executescript("\n".join(sql_lines)) con.commit() print(f"Created staging database: {STAGING_DB}") # Verify tables were created tables = [r[0] for r in con.execute( "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" ).fetchall()] print(f" Tables: {tables}") expected = {"confidence_grades", "h5_coverage", "lifecycle_states", "pipeline_runs", "source_registry", "tessera_cells"} missing = expected - set(tables) if missing: sys.exit(f"ERROR: Missing tables after schema creation: {missing}") return con # --------------------------------------------------------------- # Main extraction # --------------------------------------------------------------- def extract(): print(f"[{now_utc()}] seed_extract.py starting") print(f" Source: {TESSERA_DB}") print(f" Staging: {STAGING_DB}") print(f" Run key: {RUN_KEY}") print(f" Expected H9 per H5: {H9_EXPECTED_PER_H5}") if not os.path.exists(TESSERA_DB): sys.exit(f"ERROR: tessera.db not found at {TESSERA_DB}") db_size_gb = os.path.getsize(TESSERA_DB) / 1e9 print(f" tessera.db size: {db_size_gb:.1f} GB") # Open tessera.db read-only src = sqlite3.connect(f"file:{TESSERA_DB}?mode=ro", uri=True) src.row_factory = sqlite3.Row # Verify tessera.db schema src_tables = [r[0] for r in src.execute( "SELECT name FROM sqlite_master WHERE type='table'" ).fetchall()] if "tessera_cells" not in src_tables: src.close() sys.exit(f"ERROR: tessera_cells table not found in tessera.db. Tables: {src_tables}") print(f" tessera.db tables: {src_tables}") # Create staging database dst = create_staging_db(SCHEMA_SQL) # Convert H5 waypoints to integers h5_ints = [h3.str_to_int(h5t) for _, h5t in H5_WAYPOINTS] source_versions = { "GEBCO_2025": "2025", "ESA_WORLDCOVER_V200": "v2.0.0", "HYDROSHEDS_V11": "1.1", "USGS_MRDS": "2022-08-23", "BGR_IGME5000": "2007", "TESSERA3_SEED": "2026-04-26", } started_at = now_utc() cur = dst.execute( """INSERT INTO pipeline_runs (run_key, started_at, status, h5_cells, fields_updated, source_versions, notes) VALUES (?, ?, 1, ?, ?, ?, ?)""", ( RUN_KEY, started_at, json.dumps(h5_ints), json.dumps(FIELDS_UPDATED), json.dumps(source_versions), "Seed extraction from TESSERA 3.0 tessera.db — five OTIVM launch waypoints.", ), ) run_id = cur.lastrowid dst.commit() print(f" pipeline_runs row: id={run_id}, status=1 (draft)") total_rows = 0 try: for city_name, h5_text in H5_WAYPOINTS: h5_int = h3.str_to_int(h5_text) print(f"\n [{city_name}] H5={h5_text}") # Get all H7 children of this H5 (49 cells) h7_children = list(h3.cell_to_children(h5_text, 7)) assert len(h7_children) == 49, f"Expected 49 H7, got {len(h7_children)}" # Insert h5_coverage at draft dst.execute( """INSERT INTO h5_coverage (h5, status, h9_total, h9_current, last_updated, run_id, notes) VALUES (?, 1, ?, 0, ?, ?, ?)""", (h5_int, H9_EXPECTED_PER_H5, now_utc(), run_id, city_name), ) dst.commit() h5_rows_found = 0 h5_rows_missing = 0 created_at = now_utc() # Query tessera.db one H7 at a time (49 queries x 49 rows = 2401) for h7_text in h7_children: h7_int = h3.str_to_int(h7_text) # Get expected H9 children of this H7 (49 cells) h9_expected = list(h3.cell_to_children(h7_text, 9)) assert len(h9_expected) == 49, f"Expected 49 H9, got {len(h9_expected)}" placeholders = ",".join("?" * len(h9_expected)) rows = src.execute( f"""SELECT h9_cell, lat, lon, elev_cm, terrain, hydro, geo_dep, geo_flag FROM tessera_cells WHERE h9_cell IN ({placeholders})""", h9_expected, ).fetchall() h5_rows_found += len(rows) h5_rows_missing += (len(h9_expected) - len(rows)) batch = [] for row in rows: h9_int = h3.str_to_int(row["h9_cell"]) geo_dep = row["geo_dep"] geo_flag = row["geo_flag"] gdep_conf = CONF_INDICATED if (geo_dep is not None and geo_dep != 255) else CONF_NO_DATA gflag_conf = CONF_INDICATED if (geo_flag is not None and geo_flag != 0) else CONF_NO_DATA batch.append(( h9_int, h7_int, h5_int, row["lat"], row["lon"], row["elev_cm"], row["terrain"], row["hydro"], geo_dep, geo_flag, 0, # occ_flag — stage 06 not run SRC_GEBCO, CONF_INDICATED, # elev SRC_ESA, CONF_INDICATED, # terrain SRC_HYDRO, CONF_INDICATED, # hydro SRC_MRDS, gdep_conf, # geo_dep SRC_IGME, gflag_conf, # geo_flag SRC_SEED, CONF_NO_DATA, # occ 1, # status = draft run_id, created_at, )) if batch: dst.executemany( """INSERT INTO tessera_cells (h9, h7, h5, lat, lon, elev_cm, terrain, hydro, geo_dep, geo_flag, occ_flag, elev_src, elev_conf, terr_src, terr_conf, hydro_src, hydro_conf, gdep_src, gdep_conf, gflag_src, gflag_conf, occ_src, occ_conf, status, run_id, created_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", batch, ) dst.execute( """UPDATE h5_coverage SET h9_current = ?, last_updated = ? WHERE h5 = ?""", (h5_rows_found, now_utc(), h5_int), ) dst.commit() total_rows += h5_rows_found status = "OK" if h5_rows_missing == 0 else f"WARNING: {h5_rows_missing} missing" print(f" Found: {h5_rows_found}/{H9_EXPECTED_PER_H5} [{status}]") # Mark pipeline run complete dst.execute( """UPDATE pipeline_runs SET status = 2, completed_at = ?, row_count = ? WHERE id = ?""", (now_utc(), total_rows, run_id), ) dst.commit() print(f"\n[{now_utc()}] Extraction complete.") print(f" Total draft rows: {total_rows}") print(f" Expected total: {len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5}") if total_rows < len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5: print(f" WARNING: {len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5 - total_rows} cells missing from tessera.db") print(f" This is acceptable if occ_flag gaps — check per-city counts above.") print(f" pipeline_runs status: 2 (complete)") print(f" Next: run seed_promote.py on this CT") except Exception: dst.execute( """UPDATE pipeline_runs SET status = 4, completed_at = ?, notes = ? WHERE id = ?""", (now_utc(), f"FAILED: {traceback.format_exc()}", run_id), ) dst.commit() print(f"\n[{now_utc()}] FAILED — pipeline_run {run_id} marked retired (4).") traceback.print_exc() sys.exit(1) finally: src.close() dst.close() if __name__ == "__main__": extract()