""" Helpers for DocuSign session/account context. DocuSign users can belong to multiple accounts, each with its own account_id and base_uri. We fetch that account list from /oauth/userinfo and store it in the browser session so the UI can present an account picker. """ from __future__ import annotations from typing import Any import httpx from web.config import settings class DocusignContextError(RuntimeError): """Raised when the current session is missing required DocuSign context.""" def __init__(self, message: str, *, status_code: int = 400, code: str = "docusign_context_error"): super().__init__(message) self.status_code = status_code self.code = code def userinfo_url() -> str: return f"https://{settings.docusign_auth_server}/oauth/userinfo" async def fetch_userinfo(access_token: str) -> dict[str, Any]: async with httpx.AsyncClient() as client: resp = await client.get( userinfo_url(), headers={"Authorization": f"Bearer {access_token}"}, ) if not resp.is_success: raise DocusignContextError( f"DocuSign userinfo failed ({resp.status_code})", status_code=502, code="userinfo_failed", ) return resp.json() def normalize_accounts(userinfo: dict[str, Any]) -> list[dict[str, Any]]: accounts = [] for raw in userinfo.get("accounts", []): account_id = raw.get("account_id") or raw.get("accountId") base_uri = (raw.get("base_uri") or raw.get("baseUri") or "").rstrip("/") if not account_id or not base_uri: continue accounts.append({ "account_id": account_id, "account_name": raw.get("account_name") or raw.get("accountName") or account_id, "base_uri": base_uri, "base_url": f"{base_uri}/restapi", "is_default": bool(raw.get("is_default") or raw.get("isDefault")), "organization_name": raw.get("organization_name") or raw.get("organizationName"), }) accounts.sort(key=lambda item: ((item.get("account_name") or "").lower(), item["account_id"].lower())) return accounts def merge_userinfo(session: dict[str, Any], userinfo: dict[str, Any]) -> dict[str, Any]: updated = dict(session) updated["docusign_user_name"] = userinfo.get("name") updated["docusign_user_email"] = userinfo.get("email") updated["docusign_accounts"] = normalize_accounts(userinfo) updated["docusign_accounts_count"] = len(updated["docusign_accounts"]) selected_id = updated.get("docusign_selected_account_id") if selected_id: selected = find_account(updated, selected_id) if selected: _apply_selected_account(updated, selected) else: clear_selected_account(updated) if updated["docusign_accounts_count"] == 1: _apply_selected_account(updated, updated["docusign_accounts"][0]) return updated def find_account(session: dict[str, Any], account_id: str) -> dict[str, Any] | None: for account in session.get("docusign_accounts", []): if account.get("account_id") == account_id: return account return None def _apply_selected_account(session: dict[str, Any], account: dict[str, Any]) -> None: session["docusign_selected_account_id"] = account["account_id"] session["docusign_selected_account_name"] = account.get("account_name") session["docusign_selected_base_uri"] = account.get("base_uri") session["docusign_selected_base_url"] = account.get("base_url") def clear_selected_account(session: dict[str, Any]) -> None: session.pop("docusign_selected_account_id", None) session.pop("docusign_selected_account_name", None) session.pop("docusign_selected_base_uri", None) session.pop("docusign_selected_base_url", None) def select_account(session: dict[str, Any], account_id: str) -> dict[str, Any]: account = find_account(session, account_id) if not account: raise DocusignContextError( "DocuSign account not found in this session.", status_code=404, code="account_not_found", ) updated = dict(session) _apply_selected_account(updated, account) return updated def account_picker_required(session: dict[str, Any]) -> bool: if not session.get("docusign_access_token"): return False accounts = session.get("docusign_accounts") or [] return len(accounts) > 1 and not session.get("docusign_selected_account_id") def current_account(session: dict[str, Any]) -> dict[str, Any]: accounts = session.get("docusign_accounts") or [] if accounts: selected_id = session.get("docusign_selected_account_id") if not selected_id: raise DocusignContextError( "Select a DocuSign account before continuing.", status_code=409, code="account_selection_required", ) selected = find_account(session, selected_id) if not selected: raise DocusignContextError( "Selected DocuSign account is no longer available.", status_code=409, code="account_selection_required", ) return selected # Fallback for legacy/single-account env-based behavior. if settings.docusign_account_id and settings.docusign_base_url: return { "account_id": settings.docusign_account_id, "account_name": settings.docusign_account_id, "base_url": settings.docusign_base_url.rstrip("/"), "base_uri": settings.docusign_base_url.rstrip("/").removesuffix("/restapi"), "is_default": True, } raise DocusignContextError( "No DocuSign account is configured for this session.", status_code=409, code="account_selection_required", )