diff --git a/orchestrator/app/main.py b/orchestrator/app/main.py index 10a3afb..c8cf56e 100644 --- a/orchestrator/app/main.py +++ b/orchestrator/app/main.py @@ -13,7 +13,7 @@ nothing is shared across addons except the token and this process. Endpoints: POST /cry01/signal — register a capacity signal (unchanged) GET /cry01/signals — list signals for an association (unchanged) - POST /vs01/record — store a Vital Signs record (overwrite-by-Placekey) + POST /vs01/record — append a Vital Signs record (append-only) POST /dsc01/record — store a DSC Categories record (overwrite-by-Placekey) POST /scn01/record — append a Scenario attestation record (append-only) GET /health — health check @@ -26,10 +26,21 @@ Configuration via environment variables (set in systemd unit or shell): default: /srv/civic-orchestrator/data Storage layout under DIAGNOSTIC_DATA_DIR: - vs01/{association_slug}/{participant_id}.json — one file per participant, overwritten + vs01/{association_slug}/{participant_id}.json — one array per participant, appended dsc01/{association_slug}/{participant_id}.json — one file per participant, overwritten scn01/{association_slug}/records.json — one array per association, appended +vs01 is append-only, not overwrite-by-Placekey, as of this version. Reason: +the homeowner perspective is raw, mutable, participant-submitted input — not +the certified record. A sequence of submissions over time, including sparse +or hostile ones (e.g. a new occupant overwriting a detailed record with +"I don't know"), is itself part of the diagnostic picture and must not be +silently destroyed. The Stage 2 JSON-to-MariaDB parser is responsible for +deriving occupancy epochs and any other interpretation from this raw +sequence — the orchestrator does not infer or collapse history. dsc01 +remains on the overwrite handler for now; the same question applies to it +and will be addressed in its own pass. + See docs/orchestrator/receiver-spec.md for the full envelope contract. """ @@ -73,7 +84,7 @@ log = logging.getLogger("civic-orchestrator") # App # --------------------------------------------------------------------------- -app = FastAPI(title="Civic Orchestrator — Diagnostic Record Receiver", version="0.2.0") +app = FastAPI(title="Civic Orchestrator — Diagnostic Record Receiver", version="0.3.0") # --------------------------------------------------------------------------- # Shared helpers @@ -130,6 +141,18 @@ def validate_common_header(header: dict, expected_addon: str) -> list[str]: missing.append(f"addon (expected '{expected_addon}')") return missing + +def _log_failed_write(addon: str, record: dict) -> None: + """Append a record that failed to write to its normal location into a separate failure log. + This is a last-resort safety net — it must never itself raise.""" + try: + failure_log = DIAGNOSTIC_DATA_DIR / "_failed_writes.jsonl" + failure_log.parent.mkdir(parents=True, exist_ok=True) + with failure_log.open("a") as f: + f.write(json.dumps({"addon": addon, "record": record, "logged_at": int(time.time())}) + "\n") + except Exception as e: + log.critical("FAILURE LOG ITSELF FAILED — record may be lost. addon=%s error=%s", addon, e) + # --------------------------------------------------------------------------- # cry01 — capacity signals (unchanged from prior version) # --------------------------------------------------------------------------- @@ -238,14 +261,92 @@ async def list_signals( return JSONResponse(content={"count": len(signals), "signals": signals}) # --------------------------------------------------------------------------- -# vs01 / dsc01 — overwrite-by-Placekey -# Shared implementation: both record types have identical storage rules, -# differing only in the addon name and the path segment under DIAGNOSTIC_DATA_DIR. -# See docs/orchestrator/receiver-spec.md for the contract. +# vs01 — append-only +# The homeowner perspective is raw, mutable, participant-submitted input, +# not the certified record. Every submission is preserved — nothing is +# ever overwritten or destroyed by the orchestrator. See module docstring +# above for the reasoning. Modeled on scn01's append pattern, but without +# the snapshot prerequisite (that requirement is specific to Scenarios). +# --------------------------------------------------------------------------- + +@app.post("/vs01/record") +async def receive_vs01_record(request: Request) -> JSONResponse: + """ + Accept a Vital Signs record from vs01_spool.php. + Appends to the participant's records array. Never overwrites or truncates. + See docs/orchestrator/receiver-spec.md and contracts/vs01/record-v1.json. + """ + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON body.") + + header = body.get("_header", {}) + payload = body.get("_payload", {}) + + token_hash = header.get("node_token_hash", "") + if not verify_token(token_hash): + log.warning("Rejected vs01 record — invalid node token hash.") + raise HTTPException(status_code=403, detail="Invalid node token.") + + missing = validate_common_header(header, "vs01") + if missing: + raise HTTPException(status_code=400, detail=f"Missing or invalid header fields: {missing}") + + fields = payload.get("fields") + if not fields: + raise HTTPException(status_code=400, detail="payload.fields is required and must be non-empty.") + + association_slug = safe_path_component(header["association_slug"], "association_slug") + participant_id = safe_path_component(header["participant_id"], "participant_id") + + records_path = DIAGNOSTIC_DATA_DIR / "vs01" / association_slug / f"{participant_id}.json" + + record: dict[str, Any] = { + "_received_at": int(time.time()), + "_header": header, + "_payload": payload, + } + + try: + records = load_json_or_default(records_path, []) + if not isinstance(records, list): + # The file exists but is not an array — this is a corruption state, + # most likely a leftover file from the prior overwrite-by-Placekey + # behavior (a single object, not an array). Refuse to silently + # coerce it; fail loudly instead. + log.critical("vs01 records file is not a JSON array: %s", records_path) + _log_failed_write("vs01", record) + raise HTTPException(status_code=500, detail="Records storage is in an unexpected state. This submission was not lost — contact the operator.") + + records.append(record) + atomic_write_json(records_path, records) + record_index = len(records) - 1 + except OSError as e: + log.error("Failed to write vs01 record for %s/%s: %s", association_slug, participant_id, e) + _log_failed_write("vs01", record) + raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.") + + log.info( + "vs01 record appended: assoc=%s participant=%s index=%d", + association_slug, participant_id[:12] + "...", record_index, + ) + + return JSONResponse(status_code=200, content={ + "status": "ok", + "addon": "vs01", + "participant_id": participant_id, + "record_index": record_index, + "stored_at": header["submitted_at"], + }) + +# --------------------------------------------------------------------------- +# dsc01 — overwrite-by-Placekey (unchanged) +# See docs/orchestrator/receiver-spec.md and contracts/dsc01/record-v1.json. # --------------------------------------------------------------------------- async def _receive_overwrite_record(request: Request, addon: str) -> JSONResponse: - """Shared handler for vs01 and dsc01 — validates and overwrites the participant's record.""" + """Handler for dsc01 — validates and overwrites the participant's record.""" try: body = await request.json() except Exception: @@ -282,8 +383,6 @@ async def _receive_overwrite_record(request: Request, addon: str) -> JSONRespons atomic_write_json(record_path, record) except OSError as e: log.error("Failed to write %s record for %s/%s: %s", addon, association_slug, participant_id, e) - # Log the full envelope to a failure log before returning 500 — - # a failed write must never silently lose the submission. _log_failed_write(addon, record) raise HTTPException(status_code=500, detail="Storage write failed. This submission was not lost — contact the operator.") @@ -300,28 +399,6 @@ async def _receive_overwrite_record(request: Request, addon: str) -> JSONRespons }) -def _log_failed_write(addon: str, record: dict) -> None: - """Append a record that failed to write to its normal location into a separate failure log. - This is a last-resort safety net — it must never itself raise.""" - try: - failure_log = DIAGNOSTIC_DATA_DIR / "_failed_writes.jsonl" - failure_log.parent.mkdir(parents=True, exist_ok=True) - with failure_log.open("a") as f: - f.write(json.dumps({"addon": addon, "record": record, "logged_at": int(time.time())}) + "\n") - except Exception as e: - log.critical("FAILURE LOG ITSELF FAILED — record may be lost. addon=%s error=%s", addon, e) - - -@app.post("/vs01/record") -async def receive_vs01_record(request: Request) -> JSONResponse: - """ - Accept a Vital Signs record from vs01_spool.php. - Overwrites the participant's existing record in full. - See docs/orchestrator/receiver-spec.md and contracts/vs01/record-v1.json. - """ - return await _receive_overwrite_record(request, "vs01") - - @app.post("/dsc01/record") async def receive_dsc01_record(request: Request) -> JSONResponse: """