Fix seed_extract.py: correct paths, query by H7, verified 2401 H9 per H5

This commit is contained in:
otivm
2026-04-26 07:20:03 +00:00
parent 89e977f167
commit 690b7d246a

333
pipeline/seed_extract.py Normal file
View File

@@ -0,0 +1,333 @@
"""
seed_extract.py — TESSERA 4.0 seed extraction
Reads the five OTIVM launch waypoint H5 hexes from tessera.db (TESSERA 3.0
SpatiaLite, Dell pipeline node) and writes them to staging_otivm.sqlite3
as draft rows, following the RFC-TESSERA-4.0-001 pipeline contract.
Run on the Dell (tessera-pipeline CT):
/opt/tessera-pipeline/venv/bin/python3 /opt/tessera-pipeline/seed_extract.py
Prerequisites: h3 library in venv (already installed)
Outputs:
/tmp/staging_otivm.sqlite3
After this script completes, run seed_promote.py on the same CT.
Then scp staging_otivm.sqlite3 to the OTIVM container.
VERIFIED:
- Each H5 has 49 H7 children, each H7 has 49 H9 children = 2401 H9 per H5
- 5 waypoints x 2401 = 12005 total H9 rows
- H3 int/text round-trip confirmed for all five waypoints
- Query strategy: batch by H7 (245 queries of 49 rows each) — safe for SQLite
- tessera.db path inside tessera-pipeline CT: /mnt/tessera-db/tessera.db
"""
import sqlite3
import json
import os
import sys
import traceback
from datetime import datetime, timezone
try:
import h3
except ImportError:
sys.exit("ERROR: h3 library not installed. Activate venv first.")
# ---------------------------------------------------------------
# Configuration — paths correct for tessera-pipeline CT
# ---------------------------------------------------------------
TESSERA_DB = "/mnt/tessera-db/tessera.db" # SSD via bind mount
STAGING_DB = "/tmp/staging_otivm.sqlite3"
SCHEMA_SQL = "/opt/tessera-pipeline/create_otivm_db.sql" # copied from OTIVM repo
RUN_KEY = "tessera3-seed-2026-04-26"
# Five OTIVM launch waypoints — H5 TEXT IDs
# Verified: int round-trips clean for all five
H5_WAYPOINTS = [
("Ostia", "851e805bfffffff"),
("Capua", "851e8333fffffff"),
("Brundisium", "851e8ba3fffffff"),
("Carthago", "85386e23fffffff"),
("Alexandria", "853f5ba7fffffff"),
]
# Source registry IDs — must match create_otivm_db.sql insertion order
SRC_GEBCO = 1
SRC_ESA = 2
SRC_HYDRO = 3
SRC_MRDS = 4
SRC_IGME = 5
SRC_SEED = 6 # TESSERA3_SEED — used for occ (stage 06 not run)
CONF_INDICATED = 2
CONF_NO_DATA = 4
FIELDS_UPDATED = ["elev_cm", "terrain", "hydro", "geo_dep", "geo_flag", "occ_flag"]
H9_EXPECTED_PER_H5 = 2401 # 7^4 — verified
# ---------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------
def now_utc() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def create_staging_db(schema_sql_path: str) -> sqlite3.Connection:
if not os.path.exists(schema_sql_path):
sys.exit(
f"ERROR: Schema file not found: {schema_sql_path}\n"
f"Copy it from the OTIVM repo first:\n"
f" lxc file push <path>/create_otivm_db.sql "
f"tessera-pipeline/opt/tessera-pipeline/create_otivm_db.sql"
)
if os.path.exists(STAGING_DB):
os.remove(STAGING_DB)
print(f"Removed existing {STAGING_DB}")
with open(schema_sql_path, "r") as f:
schema = f.read()
# Strip any comment-only lines at top (CLAUDE CODE INSTRUCTIONS block)
sql_lines = []
in_comment_block = False
for line in schema.splitlines():
stripped = line.strip()
if stripped.startswith("<!--"):
in_comment_block = True
continue
if in_comment_block:
if "-->" in stripped:
in_comment_block = False
continue
sql_lines.append(line)
con = sqlite3.connect(STAGING_DB)
con.execute("PRAGMA foreign_keys = ON")
con.execute("PRAGMA journal_mode = WAL")
con.executescript("\n".join(sql_lines))
con.commit()
print(f"Created staging database: {STAGING_DB}")
# Verify tables were created
tables = [r[0] for r in con.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
).fetchall()]
print(f" Tables: {tables}")
expected = {"confidence_grades", "h5_coverage", "lifecycle_states",
"pipeline_runs", "source_registry", "tessera_cells"}
missing = expected - set(tables)
if missing:
sys.exit(f"ERROR: Missing tables after schema creation: {missing}")
return con
# ---------------------------------------------------------------
# Main extraction
# ---------------------------------------------------------------
def extract():
print(f"[{now_utc()}] seed_extract.py starting")
print(f" Source: {TESSERA_DB}")
print(f" Staging: {STAGING_DB}")
print(f" Run key: {RUN_KEY}")
print(f" Expected H9 per H5: {H9_EXPECTED_PER_H5}")
if not os.path.exists(TESSERA_DB):
sys.exit(f"ERROR: tessera.db not found at {TESSERA_DB}")
db_size_gb = os.path.getsize(TESSERA_DB) / 1e9
print(f" tessera.db size: {db_size_gb:.1f} GB")
# Open tessera.db read-only
src = sqlite3.connect(f"file:{TESSERA_DB}?mode=ro", uri=True)
src.row_factory = sqlite3.Row
# Verify tessera.db schema
src_tables = [r[0] for r in src.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()]
if "tessera_cells" not in src_tables:
src.close()
sys.exit(f"ERROR: tessera_cells table not found in tessera.db. Tables: {src_tables}")
print(f" tessera.db tables: {src_tables}")
# Create staging database
dst = create_staging_db(SCHEMA_SQL)
# Convert H5 waypoints to integers
h5_ints = [h3.str_to_int(h5t) for _, h5t in H5_WAYPOINTS]
source_versions = {
"GEBCO_2025": "2025",
"ESA_WORLDCOVER_V200": "v2.0.0",
"HYDROSHEDS_V11": "1.1",
"USGS_MRDS": "2022-08-23",
"BGR_IGME5000": "2007",
"TESSERA3_SEED": "2026-04-26",
}
started_at = now_utc()
cur = dst.execute(
"""INSERT INTO pipeline_runs
(run_key, started_at, status, h5_cells, fields_updated,
source_versions, notes)
VALUES (?, ?, 1, ?, ?, ?, ?)""",
(
RUN_KEY,
started_at,
json.dumps(h5_ints),
json.dumps(FIELDS_UPDATED),
json.dumps(source_versions),
"Seed extraction from TESSERA 3.0 tessera.db — five OTIVM launch waypoints.",
),
)
run_id = cur.lastrowid
dst.commit()
print(f" pipeline_runs row: id={run_id}, status=1 (draft)")
total_rows = 0
try:
for city_name, h5_text in H5_WAYPOINTS:
h5_int = h3.str_to_int(h5_text)
print(f"\n [{city_name}] H5={h5_text}")
# Get all H7 children of this H5 (49 cells)
h7_children = list(h3.cell_to_children(h5_text, 7))
assert len(h7_children) == 49, f"Expected 49 H7, got {len(h7_children)}"
# Insert h5_coverage at draft
dst.execute(
"""INSERT INTO h5_coverage
(h5, status, h9_total, h9_current, last_updated, run_id, notes)
VALUES (?, 1, ?, 0, ?, ?, ?)""",
(h5_int, H9_EXPECTED_PER_H5, now_utc(), run_id, city_name),
)
dst.commit()
h5_rows_found = 0
h5_rows_missing = 0
created_at = now_utc()
# Query tessera.db one H7 at a time (49 queries x 49 rows = 2401)
for h7_text in h7_children:
h7_int = h3.str_to_int(h7_text)
# Get expected H9 children of this H7 (49 cells)
h9_expected = list(h3.cell_to_children(h7_text, 9))
assert len(h9_expected) == 49, f"Expected 49 H9, got {len(h9_expected)}"
placeholders = ",".join("?" * len(h9_expected))
rows = src.execute(
f"""SELECT h9_cell, lat, lon,
elev_cm, terrain, hydro, geo_dep, geo_flag
FROM tessera_cells
WHERE h9_cell IN ({placeholders})""",
h9_expected,
).fetchall()
h5_rows_found += len(rows)
h5_rows_missing += (len(h9_expected) - len(rows))
batch = []
for row in rows:
h9_int = h3.str_to_int(row["h9_cell"])
geo_dep = row["geo_dep"]
geo_flag = row["geo_flag"]
gdep_conf = CONF_INDICATED if (geo_dep is not None and geo_dep != 255) else CONF_NO_DATA
gflag_conf = CONF_INDICATED if (geo_flag is not None and geo_flag != 0) else CONF_NO_DATA
batch.append((
h9_int, h7_int, h5_int,
row["lat"], row["lon"],
row["elev_cm"],
row["terrain"],
row["hydro"],
geo_dep,
geo_flag,
0, # occ_flag — stage 06 not run
SRC_GEBCO, CONF_INDICATED, # elev
SRC_ESA, CONF_INDICATED, # terrain
SRC_HYDRO, CONF_INDICATED, # hydro
SRC_MRDS, gdep_conf, # geo_dep
SRC_IGME, gflag_conf, # geo_flag
SRC_SEED, CONF_NO_DATA, # occ
1, # status = draft
run_id,
created_at,
))
if batch:
dst.executemany(
"""INSERT INTO tessera_cells
(h9, h7, h5, lat, lon,
elev_cm, terrain, hydro, geo_dep, geo_flag, occ_flag,
elev_src, elev_conf,
terr_src, terr_conf,
hydro_src, hydro_conf,
gdep_src, gdep_conf,
gflag_src, gflag_conf,
occ_src, occ_conf,
status, run_id, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
batch,
)
dst.execute(
"""UPDATE h5_coverage
SET h9_current = ?, last_updated = ?
WHERE h5 = ?""",
(h5_rows_found, now_utc(), h5_int),
)
dst.commit()
total_rows += h5_rows_found
status = "OK" if h5_rows_missing == 0 else f"WARNING: {h5_rows_missing} missing"
print(f" Found: {h5_rows_found}/{H9_EXPECTED_PER_H5} [{status}]")
# Mark pipeline run complete
dst.execute(
"""UPDATE pipeline_runs
SET status = 2, completed_at = ?, row_count = ?
WHERE id = ?""",
(now_utc(), total_rows, run_id),
)
dst.commit()
print(f"\n[{now_utc()}] Extraction complete.")
print(f" Total draft rows: {total_rows}")
print(f" Expected total: {len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5}")
if total_rows < len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5:
print(f" WARNING: {len(H5_WAYPOINTS) * H9_EXPECTED_PER_H5 - total_rows} cells missing from tessera.db")
print(f" This is acceptable if occ_flag gaps — check per-city counts above.")
print(f" pipeline_runs status: 2 (complete)")
print(f" Next: run seed_promote.py on this CT")
except Exception:
dst.execute(
"""UPDATE pipeline_runs
SET status = 4, completed_at = ?, notes = ?
WHERE id = ?""",
(now_utc(), f"FAILED: {traceback.format_exc()}", run_id),
)
dst.commit()
print(f"\n[{now_utc()}] FAILED — pipeline_run {run_id} marked retired (4).")
traceback.print_exc()
sys.exit(1)
finally:
src.close()
dst.close()
if __name__ == "__main__":
extract()