Add seed_promote.py: promote draft rows to current in staging_otivm.sqlite3
This commit is contained in:
166
pipeline/seed_promote.py
Normal file
166
pipeline/seed_promote.py
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
"""
|
||||||
|
seed_promote.py — TESSERA 4.0 seed promotion
|
||||||
|
Promotes draft rows in staging_otivm.sqlite3 to current, following the
|
||||||
|
RFC-TESSERA-4.0-001 pipeline contract.
|
||||||
|
|
||||||
|
Run on the Dell (tessera-pipeline CT) after seed_extract.py completes:
|
||||||
|
/opt/tessera-pipeline/venv/bin/python3 /opt/tessera-pipeline/seed_promote.py
|
||||||
|
|
||||||
|
What this script does:
|
||||||
|
1. Verifies all five H5 hexes have exactly 2401 draft rows each
|
||||||
|
2. Promotes tessera_cells.status 1→2 for all rows in the seed run
|
||||||
|
3. Updates h5_coverage.status 1→2 for all five H5 hexes
|
||||||
|
4. Reports final state — ready for copy to otivm.sqlite3
|
||||||
|
|
||||||
|
After this script completes:
|
||||||
|
- Verify output shown here
|
||||||
|
- scp /tmp/staging_otivm.sqlite3 to OTIVM container as data/otivm.sqlite3
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
STAGING_DB = "/tmp/staging_otivm.sqlite3"
|
||||||
|
RUN_KEY = "tessera3-seed-2026-04-26"
|
||||||
|
H9_EXPECTED = 2401
|
||||||
|
H5_WAYPOINTS = [
|
||||||
|
("Ostia", "851e805bfffffff"),
|
||||||
|
("Capua", "851e8333fffffff"),
|
||||||
|
("Brundisium", "851e8ba3fffffff"),
|
||||||
|
("Carthago", "85386e23fffffff"),
|
||||||
|
("Alexandria", "853f5ba7fffffff"),
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
import h3
|
||||||
|
except ImportError:
|
||||||
|
sys.exit("ERROR: h3 library not installed. Activate venv first.")
|
||||||
|
|
||||||
|
|
||||||
|
def now_utc() -> str:
|
||||||
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
|
||||||
|
|
||||||
|
def promote():
|
||||||
|
print(f"[{now_utc()}] seed_promote.py starting")
|
||||||
|
print(f" Staging: {STAGING_DB}")
|
||||||
|
|
||||||
|
import os
|
||||||
|
if not os.path.exists(STAGING_DB):
|
||||||
|
sys.exit(f"ERROR: {STAGING_DB} not found. Run seed_extract.py first.")
|
||||||
|
|
||||||
|
con = sqlite3.connect(STAGING_DB)
|
||||||
|
con.execute("PRAGMA foreign_keys = ON")
|
||||||
|
|
||||||
|
# Confirm pipeline run exists and is complete (status=2)
|
||||||
|
run = con.execute(
|
||||||
|
"SELECT id, status, row_count FROM pipeline_runs WHERE run_key = ?",
|
||||||
|
(RUN_KEY,)
|
||||||
|
).fetchone()
|
||||||
|
|
||||||
|
if not run:
|
||||||
|
con.close()
|
||||||
|
sys.exit(f"ERROR: pipeline run '{RUN_KEY}' not found in staging db.")
|
||||||
|
|
||||||
|
run_id, run_status, row_count = run
|
||||||
|
if run_status != 2:
|
||||||
|
con.close()
|
||||||
|
sys.exit(
|
||||||
|
f"ERROR: pipeline run status is {run_status}, expected 2 (complete).\n"
|
||||||
|
f"Re-run seed_extract.py to completion before promoting."
|
||||||
|
)
|
||||||
|
|
||||||
|
print(f" pipeline_run id={run_id}, status=2 (complete), row_count={row_count}")
|
||||||
|
|
||||||
|
# Verify per-H5 draft counts before promoting anything
|
||||||
|
print(f"\n Pre-promotion verification:")
|
||||||
|
all_ok = True
|
||||||
|
h5_ints = []
|
||||||
|
for city, h5_text in H5_WAYPOINTS:
|
||||||
|
h5_int = h3.str_to_int(h5_text)
|
||||||
|
h5_ints.append(h5_int)
|
||||||
|
|
||||||
|
draft_count = con.execute(
|
||||||
|
"SELECT COUNT(*) FROM tessera_cells WHERE h5 = ? AND status = 1 AND run_id = ?",
|
||||||
|
(h5_int, run_id)
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
current_count = con.execute(
|
||||||
|
"SELECT COUNT(*) FROM tessera_cells WHERE h5 = ? AND status = 2",
|
||||||
|
(h5_int,)
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
cov = con.execute(
|
||||||
|
"SELECT status, h9_total, h9_current FROM h5_coverage WHERE h5 = ?",
|
||||||
|
(h5_int,)
|
||||||
|
).fetchone()
|
||||||
|
|
||||||
|
status_str = "OK" if draft_count == H9_EXPECTED else f"ERROR: expected {H9_EXPECTED}"
|
||||||
|
print(f" {city}: draft={draft_count} current={current_count} "
|
||||||
|
f"h5_cov_status={cov[0] if cov else 'MISSING'} [{status_str}]")
|
||||||
|
|
||||||
|
if draft_count != H9_EXPECTED:
|
||||||
|
all_ok = False
|
||||||
|
|
||||||
|
if not all_ok:
|
||||||
|
con.close()
|
||||||
|
sys.exit("ERROR: Pre-promotion verification failed. Do not promote.")
|
||||||
|
|
||||||
|
print(f"\n All five H5 hexes verified. Promoting...")
|
||||||
|
|
||||||
|
# Promote all draft rows from this run to current
|
||||||
|
promoted = con.execute(
|
||||||
|
"UPDATE tessera_cells SET status = 2 WHERE run_id = ? AND status = 1",
|
||||||
|
(run_id,)
|
||||||
|
).rowcount
|
||||||
|
print(f" tessera_cells promoted: {promoted} rows (status 1→2)")
|
||||||
|
|
||||||
|
# Update h5_coverage for all five H5 hexes
|
||||||
|
updated_cov = 0
|
||||||
|
for h5_int in h5_ints:
|
||||||
|
con.execute(
|
||||||
|
"""UPDATE h5_coverage
|
||||||
|
SET status = 2, h9_current = ?, last_updated = ?
|
||||||
|
WHERE h5 = ?""",
|
||||||
|
(H9_EXPECTED, now_utc(), h5_int)
|
||||||
|
)
|
||||||
|
updated_cov += 1
|
||||||
|
|
||||||
|
con.commit()
|
||||||
|
print(f" h5_coverage updated: {updated_cov} rows (status 1→2)")
|
||||||
|
|
||||||
|
# Final verification
|
||||||
|
print(f"\n Post-promotion verification:")
|
||||||
|
grand_total = 0
|
||||||
|
all_complete = True
|
||||||
|
for city, h5_text in H5_WAYPOINTS:
|
||||||
|
h5_int = h3.str_to_int(h5_text)
|
||||||
|
|
||||||
|
current_count = con.execute(
|
||||||
|
"SELECT COUNT(*) FROM tessera_cells WHERE h5 = ? AND status = 2",
|
||||||
|
(h5_int,)
|
||||||
|
).fetchone()[0]
|
||||||
|
|
||||||
|
cov = con.execute(
|
||||||
|
"SELECT status, h9_total, h9_current FROM h5_coverage WHERE h5 = ?",
|
||||||
|
(h5_int,)
|
||||||
|
).fetchone()
|
||||||
|
|
||||||
|
complete = (current_count == H9_EXPECTED and cov and cov[0] == 2)
|
||||||
|
status_str = "COMPLETE" if complete else "ERROR"
|
||||||
|
print(f" {city}: current={current_count} h5_coverage_status={cov[0] if cov else 'MISSING'} [{status_str}]")
|
||||||
|
grand_total += current_count
|
||||||
|
if not complete:
|
||||||
|
all_complete = False
|
||||||
|
|
||||||
|
con.close()
|
||||||
|
|
||||||
|
print(f"\n Total current rows: {grand_total}")
|
||||||
|
|
||||||
|
if all_complete:
|
||||||
|
print(f"\n[{now_utc()}] Promotion complete. staging_otivm.sqlite3 is ready.")
|
||||||
|
print(f"\nNext steps:")
|
||||||
|
print(f" 1. scp the staging db to the OTIVM container:")
|
||||||
|
print(f" lxc file pull tessera-pipeline/tmp/staging_otivm.sqlite3 /tmp/staging_otivm.sqlite3")
|
||||||
|
print(f" scp -i /home/sandor/.ssh/tessera-transfer /tmp/staging_otivm.sqlite3 \\") print(f" sandor@10.0.0.23:/tmp/staging_otivm.sqlite3") print(f" 2. On OTIVM container, move to production:") print(f" mv /tmp/staging_otivm.sqlite3 /home/otivm/OTIVM/data/otivm.sqlite3") print(f" chown otivm:otivm /home/otivm/OTIVM/data/otivm.sqlite3") print(f" 3. Verify on OTIVM container:") print(f" sqlite3 /home/otivm/OTIVM/data/otivm.sqlite3 \\") print(f" 'SELECT h5, h9_current, status FROM h5_coverage;'") else: print(f"\n[{now_utc()}] ERROR: Post-promotion verification failed.") sys.exit(1)if __name__ == "__main__": promote()
|
||||||
Reference in New Issue
Block a user