if.bus: envelope emission (request/result)
Path: scripts/build_blog_medium_import_pipeline.py (lines 171-204)
*,
producer_id: str,
producer_version: str,
event_name: str,
topic: str,
partition_key: str,
subject_kind: str,
subject_id: str,
payload: dict,
causation_id: str | None = None,
correlation_id: str | None = None,
) -> dict:
payload_bytes = canonical_json_bytes(payload)
env: dict = {
"schema_id": "if.bus.envelope",
"schema_version": "1.0.0",
"event_id": new_event_id(),
"emitted_utc": now_utc_iso(),
"producer": {"kind": "if.api.adapter", "id": producer_id, "version": producer_version},
"event": {"name": event_name},
"subject": {"kind": subject_kind, "id": subject_id},
"routing": {"topic": topic, "partition_key": partition_key, "priority": 5, "ttl_seconds": 86400},
"payload_media_type": "application/json",
"payload": payload,
"payload_sha256": sha256_hex(payload_bytes),
"extensions": {},
}
if causation_id or correlation_id:
corr = {"correlation_id": correlation_id or (causation_id or env["event_id"])}
if causation_id:
corr["causation_id"] = causation_id
env["correlation"] = corr
return env