6
def _now_iso() -> str:
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def _stable_id(prefix: str, seed: str) -> str:
h = hashlib.sha256(seed.encode("utf-8")).hexdigest()(:10)
return f"{prefix}_{h}"
class MockEHR:
def __init__(self):
self.orders_queue: List(SurgeryOrder) = ()
self.patient_docs: Dict(str, List(ClinicalDocument)) = {}
def seed_data(self, n_orders: int = 5):
random.seed(7)
def make_patient(i: int) -> Patient:
pid = f"PT{i:04d}"
plan = random.choice(list(InsurancePlan))
return Patient(
patient_id=pid,
name=f"Patient {i}",
dob="1980-01-01",
member_id=f"M{i:08d}",
plan=plan,
)
def docs_for_order(patient: Patient, surgery: SurgeryType) -> List(ClinicalDocument):
base = (
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "H&P"),
doc_type=DocType.H_AND_P,
created_at=_now_iso(),
content="H&P: Relevant history, exam findings, and surgical indication.",
source="EHR",
),
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "NOTE"),
doc_type=DocType.CLINICAL_NOTE,
created_at=_now_iso(),
content="Clinical note: Symptoms, conservative management attempted, clinician assessment.",
source="EHR",
),
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "MEDS"),
doc_type=DocType.MED_LIST,
created_at=_now_iso(),
content="Medication list: Current meds, allergies, contraindications.",
source="EHR",
),
)
maybe = ()
if surgery in (SurgeryType.KNEE_ARTHROPLASTY, SurgeryType.SPINE_FUSION, SurgeryType.BARIATRIC):
maybe.append(
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "LABS"),
doc_type=DocType.LABS,
created_at=_now_iso(),
content="Labs: CBC/CMP within last 30 days.",
source="LabSystem",
)
)
if surgery in (SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY):
maybe.append(
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "IMG"),
doc_type=DocType.IMAGING,
created_at=_now_iso(),
content="Imaging: MRI/X-ray report supporting diagnosis and severity.",
source="Radiology",
)
)
final = base + (d for d in maybe if random.random() > 0.35)
if random.random() > 0.6:
final.append(
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "PRIOR_TX"),
doc_type=DocType.PRIOR_TX,
created_at=_now_iso(),
content="Prior treatments: PT, meds, injections tried over 6+ weeks.",
source="EHR",
)
)
if random.random() > 0.5:
final.append(
ClinicalDocument(
doc_id=_stable_id("DOC", patient.patient_id + "CONSENT"),
doc_type=DocType.CONSENT,
created_at=_now_iso(),
content="Consent: Signed procedure consent and risk disclosure.",
source="EHR",
)
)
return final
for i in range(1, n_orders + 1):
patient = make_patient(i)
surgery = random.choice(list(SurgeryType))
order = SurgeryOrder(
order_id=_stable_id("ORD", patient.patient_id + surgery.value),
patient=patient,
surgery_type=surgery,
scheduled_date=(datetime.utcnow().date() + timedelta(days=random.randint(3, 21))).isoformat(),
ordering_provider_npi=str(random.randint(1000000000, 1999999999)),
diagnosis_codes=("M17.11", "M54.5") if surgery != SurgeryType.CATARACT else ("H25.9"),
created_at=_now_iso(),
)
self.orders_queue.append(order)
self.patient_docs(patient.patient_id) = docs_for_order(patient, surgery)
def poll_new_surgery_orders(self, max_n: int = 1) -> List(SurgeryOrder):
pulled = self.orders_queue(:max_n)
self.orders_queue = self.orders_queue(max_n:)
return pulled
def get_patient_documents(self, patient_id: str) -> List(ClinicalDocument):
return list(self.patient_docs.get(patient_id, ()))
def fetch_additional_docs(self, patient_id: str, needed: List(DocType)) -> List(ClinicalDocument):
generated = ()
for dt in needed:
generated.append(
ClinicalDocument(
doc_id=_stable_id("DOC", patient_id + dt.value + str(time.time())),
doc_type=dt,
created_at=_now_iso(),
content=f"Auto-collected document for {dt.value}: extracted and formatted per payer policy.",
source="AutoCollector",
)
)
self.patient_docs.setdefault(patient_id, ()).extend(generated)
return generated
class MockPayerPortal:
def __init__(self):
self.db: Dict(str, Dict(str, Any)) = {}
random.seed(11)
def required_docs_policy(self, plan: InsurancePlan, surgery: SurgeryType) -> List(DocType):
base = (DocType.H_AND_P, DocType.CLINICAL_NOTE, DocType.MED_LIST)
if surgery in (SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY):
base += (DocType.IMAGING, DocType.LABS, DocType.PRIOR_TX)
if surgery == SurgeryType.BARIATRIC:
base += (DocType.LABS, DocType.PRIOR_TX)
if plan in (InsurancePlan.PAYER_BETA, InsurancePlan.PAYER_GAMMA):
base += (DocType.CONSENT)
return sorted(list(set(base)), key=lambda x: x.value)
def submit(self, pa: PriorAuthRequest) -> PayerResponse:
payer_ref = _stable_id("PAYREF", pa.request_id + _now_iso())
docs_present = {d.doc_type for d in pa.docs_attached}
required = self.required_docs_policy(pa.order.patient.plan, pa.order.surgery_type)
missing = (d for d in required if d not in docs_present)
self.db(payer_ref) = {
"status": AuthStatus.SUBMITTED,
"order_id": pa.order.order_id,
"plan": pa.order.patient.plan,
"surgery": pa.order.surgery_type,
"missing": missing,
"polls": 0,
"submitted_at": _now_iso(),
"denial_reason": None,
}
msg = "Submission received. Case queued for review."
if missing:
msg += " Initial validation indicates incomplete documentation."
return PayerResponse(status=AuthStatus.SUBMITTED, payer_ref=payer_ref, message=msg)
def check_status(self, payer_ref: str) -> PayerResponse:
if payer_ref not in self.db:
return PayerResponse(
status=AuthStatus.DENIED,
payer_ref=payer_ref,
message="Case not found (possible payer system error).",
denial_reason=DenialReason.OTHER,
confidence=0.4,
)
case = self.db(payer_ref)
case("polls") += 1
if case("status") == AuthStatus.SUBMITTED and case("polls") >= 1:
case("status") = AuthStatus.IN_REVIEW
if case("status") == AuthStatus.IN_REVIEW and case("polls") >= 3:
if case("missing"):
case("status") = AuthStatus.DENIED
case("denial_reason") = DenialReason.MISSING_DOCS
else:
roll = random.random()
if roll < 0.10:
case("status") = AuthStatus.DENIED
case("denial_reason") = DenialReason.CODING_ISSUE
elif roll < 0.18:
case("status") = AuthStatus.DENIED
case("denial_reason") = DenialReason.MEDICAL_NECESSITY
else:
case("status") = AuthStatus.APPROVED
if case("status") == AuthStatus.DENIED:
dr = case("denial_reason") or DenialReason.OTHER
missing = case("missing") if dr == DenialReason.MISSING_DOCS else ()
conf = 0.9 if dr != DenialReason.OTHER else 0.55
return PayerResponse(
status=AuthStatus.DENIED,
payer_ref=payer_ref,
message=f"Denied. Reason={dr.value}.",
denial_reason=dr,
missing_docs=missing,
confidence=conf,
)
if case("status") == AuthStatus.APPROVED:
return PayerResponse(
status=AuthStatus.APPROVED,
payer_ref=payer_ref,
message="Approved. Authorization issued.",
confidence=0.95,
)
return PayerResponse(
status=case("status"),
payer_ref=payer_ref,
message=f"Status={case('status').value}. Polls={case('polls')}.",
confidence=0.9,
)
def file_appeal(self, payer_ref: str, appeal_text: str, attached_docs: List(ClinicalDocument)) -> PayerResponse:
if payer_ref not in self.db:
return PayerResponse(
status=AuthStatus.DENIED,
payer_ref=payer_ref,
message="Appeal failed: case not found.",
denial_reason=DenialReason.OTHER,
confidence=0.4,
)
case = self.db(payer_ref)
docs_present = {d.doc_type for d in attached_docs}
still_missing = (d for d in case("missing") if d not in docs_present)
case("missing") = still_missing
case("status") = AuthStatus.APPEALED
case("polls") = 0
msg = "Appeal submitted and queued for review."
if still_missing:
msg += f" Warning: still missing {', '.join((d.value for d in still_missing))}."
return PayerResponse(status=AuthStatus.APPEALED, payer_ref=payer_ref, message=msg, confidence=0.9)
