This commit is contained in:
2026-06-21 05:31:53 -04:00
parent 736635e35f
commit ef1f397ed6

View File

@@ -13,7 +13,7 @@ nothing is shared across addons except the token and this process.
Endpoints: Endpoints:
POST /cry01/signal — register a capacity signal (unchanged) POST /cry01/signal — register a capacity signal (unchanged)
GET /cry01/signals — list signals for an association (unchanged) GET /cry01/signals — list signals for an association (unchanged)
POST /vs01/record — store a Vital Signs record (overwrite-by-Placekey) POST /vs01/record — append a Vital Signs record (append-only)
POST /dsc01/record — store a DSC Categories record (overwrite-by-Placekey) POST /dsc01/record — store a DSC Categories record (overwrite-by-Placekey)
POST /scn01/record — append a Scenario attestation record (append-only) POST /scn01/record — append a Scenario attestation record (append-only)
GET /health — health check GET /health — health check
@@ -26,10 +26,21 @@ Configuration via environment variables (set in systemd unit or shell):
default: /srv/civic-orchestrator/data default: /srv/civic-orchestrator/data
Storage layout under DIAGNOSTIC_DATA_DIR: Storage layout under DIAGNOSTIC_DATA_DIR:
vs01/{association_slug}/{participant_id}.json — one file per participant, overwritten vs01/{association_slug}/{participant_id}.json — one array per participant, appended
dsc01/{association_slug}/{participant_id}.json — one file per participant, overwritten dsc01/{association_slug}/{participant_id}.json — one file per participant, overwritten
scn01/{association_slug}/records.json — one array per association, appended scn01/{association_slug}/records.json — one array per association, appended
vs01 is append-only, not overwrite-by-Placekey, as of this version. Reason:
the homeowner perspective is raw, mutable, participant-submitted input — not
the certified record. A sequence of submissions over time, including sparse
or hostile ones (e.g. a new occupant overwriting a detailed record with
"I don't know"), is itself part of the diagnostic picture and must not be
silently destroyed. The Stage 2 JSON-to-MariaDB parser is responsible for
deriving occupancy epochs and any other interpretation from this raw
sequence — the orchestrator does not infer or collapse history. dsc01
remains on the overwrite handler for now; the same question applies to it
and will be addressed in its own pass.
See docs/orchestrator/receiver-spec.md for the full envelope contract. See docs/orchestrator/receiver-spec.md for the full envelope contract.
""" """
@@ -73,7 +84,7 @@ log = logging.getLogger("civic-orchestrator")
# App # App
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
app = FastAPI(title="Civic Orchestrator — Diagnostic Record Receiver", version="0.2.0") app = FastAPI(title="Civic Orchestrator — Diagnostic Record Receiver", version="0.3.0")
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Shared helpers # Shared helpers
@@ -130,6 +141,18 @@ def validate_common_header(header: dict, expected_addon: str) -> list[str]:
missing.append(f"addon (expected '{expected_addon}')") missing.append(f"addon (expected '{expected_addon}')")
return missing return missing
def _log_failed_write(addon: str, record: dict) -> None:
"""Append a record that failed to write to its normal location into a separate failure log.
This is a last-resort safety net — it must never itself raise."""
try:
failure_log = DIAGNOSTIC_DATA_DIR / "_failed_writes.jsonl"
failure_log.parent.mkdir(parents=True, exist_ok=True)
with failure_log.open("a") as f:
f.write(json.dumps({"addon": addon, "record": record, "logged_at": int(time.time())}) + "\n")
except Exception as e:
log.critical("FAILURE LOG ITSELF FAILED — record may be lost. addon=%s error=%s", addon, e)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# cry01 — capacity signals (unchanged from prior version) # cry01 — capacity signals (unchanged from prior version)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -238,14 +261,92 @@ async def list_signals(
return JSONResponse(content={"count": len(signals), "signals": signals}) return JSONResponse(content={"count": len(signals), "signals": signals})
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# vs01 / dsc01 — overwrite-by-Placekey # vs01 — append-only
# Shared implementation: both record types have identical storage rules, # The homeowner perspective is raw, mutable, participant-submitted input,
# differing only in the addon name and the path segment under DIAGNOSTIC_DATA_DIR. # not the certified record. Every submission is preserved — nothing is
# See docs/orchestrator/receiver-spec.md for the contract. # ever overwritten or destroyed by the orchestrator. See module docstring
# above for the reasoning. Modeled on scn01's append pattern, but without
# the snapshot prerequisite (that requirement is specific to Scenarios).
# ---------------------------------------------------------------------------
@app.post("/vs01/record")
async def receive_vs01_record(request: Request) -> JSONResponse:
"""
Accept a Vital Signs record from vs01_spool.php.
Appends to the participant's records array. Never overwrites or truncates.
See docs/orchestrator/receiver-spec.md and contracts/vs01/record-v1.json.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body.")
header = body.get("_header", {})
payload = body.get("_payload", {})
token_hash = header.get("node_token_hash", "")
if not verify_token(token_hash):
log.warning("Rejected vs01 record — invalid node token hash.")
raise HTTPException(status_code=403, detail="Invalid node token.")
missing = validate_common_header(header, "vs01")
if missing:
raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}")
fields = payload.get("fields")
if not fields:
raise HTTPException(status_code=400, detail="payload.fields is required and must be non-empty.")
association_slug = safe_path_component(header["association_slug"], "association_slug")
participant_id = safe_path_component(header["participant_id"], "participant_id")
records_path = DIAGNOSTIC_DATA_DIR / "vs01" / association_slug / f"{participant_id}.json"
record: dict[str, Any] = {
"_received_at": int(time.time()),
"_header": header,
"_payload": payload,
}
try:
records = load_json_or_default(records_path, [])
if not isinstance(records, list):
# The file exists but is not an array — this is a corruption state,
# most likely a leftover file from the prior overwrite-by-Placekey
# behavior (a single object, not an array). Refuse to silently
# coerce it; fail loudly instead.
log.critical("vs01 records file is not a JSON array: %s", records_path)
_log_failed_write("vs01", record)
raise HTTPException(status_code=500, detail="Records storage is in an unexpected state. This submission was not lost — contact the operator.")
records.append(record)
atomic_write_json(records_path, records)
record_index = len(records) - 1
except OSError as e:
log.error("Failed to write vs01 record for %s/%s: %s", association_slug, participant_id, e)
_log_failed_write("vs01", record)
raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.")
log.info(
"vs01 record appended: assoc=%s participant=%s index=%d",
association_slug, participant_id[:12] + "...", record_index,
)
return JSONResponse(status_code=200, content={
"status": "ok",
"addon": "vs01",
"participant_id": participant_id,
"record_index": record_index,
"stored_at": header["submitted_at"],
})
# ---------------------------------------------------------------------------
# dsc01 — overwrite-by-Placekey (unchanged)
# See docs/orchestrator/receiver-spec.md and contracts/dsc01/record-v1.json.
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def _receive_overwrite_record(request: Request, addon: str) -> JSONResponse: async def _receive_overwrite_record(request: Request, addon: str) -> JSONResponse:
"""Shared handler for vs01 and dsc01 — validates and overwrites the participant's record.""" """Handler for dsc01 — validates and overwrites the participant's record."""
try: try:
body = await request.json() body = await request.json()
except Exception: except Exception:
@@ -282,8 +383,6 @@ async def _receive_overwrite_record(request: Request, addon: str) -> JSONRespons
atomic_write_json(record_path, record) atomic_write_json(record_path, record)
except OSError as e: except OSError as e:
log.error("Failed to write %s record for %s/%s: %s", addon, association_slug, participant_id, e) log.error("Failed to write %s record for %s/%s: %s", addon, association_slug, participant_id, e)
# Log the full envelope to a failure log before returning 500 —
# a failed write must never silently lose the submission.
_log_failed_write(addon, record) _log_failed_write(addon, record)
raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.") raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.")
@@ -300,28 +399,6 @@ async def _receive_overwrite_record(request: Request, addon: str) -> JSONRespons
}) })
def _log_failed_write(addon: str, record: dict) -> None:
"""Append a record that failed to write to its normal location into a separate failure log.
This is a last-resort safety net — it must never itself raise."""
try:
failure_log = DIAGNOSTIC_DATA_DIR / "_failed_writes.jsonl"
failure_log.parent.mkdir(parents=True, exist_ok=True)
with failure_log.open("a") as f:
f.write(json.dumps({"addon": addon, "record": record, "logged_at": int(time.time())}) + "\n")
except Exception as e:
log.critical("FAILURE LOG ITSELF FAILED — record may be lost. addon=%s error=%s", addon, e)
@app.post("/vs01/record")
async def receive_vs01_record(request: Request) -> JSONResponse:
"""
Accept a Vital Signs record from vs01_spool.php.
Overwrites the participant's existing record in full.
See docs/orchestrator/receiver-spec.md and contracts/vs01/record-v1.json.
"""
return await _receive_overwrite_record(request, "vs01")
@app.post("/dsc01/record") @app.post("/dsc01/record")
async def receive_dsc01_record(request: Request) -> JSONResponse: async def receive_dsc01_record(request: Request) -> JSONResponse:
""" """