""" web/routers/auth.py ------------------- OAuth endpoints for Adobe Sign and DocuSign. Both providers now support standard browser redirect callbacks handled directly by this server. Tokens are stored in a server-side session keyed by a signed browser cookie. """ import secrets from urllib.parse import urlparse, parse_qs import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse, RedirectResponse from pydantic import BaseModel from web.audit import is_admin_session, log_event from web.config import settings from web.docusign_context import ( DocusignContextError, account_picker_required, clear_selected_account, current_account, fetch_userinfo, merge_userinfo, select_account, ) from web.session import get_session, save_session, session_public_view router = APIRouter() _ADOBE_AUTH_URL = "https://secure.eu2.adobesign.com/public/oauth/v2" _ADOBE_TOKEN_URL = "https://api.eu2.adobesign.com/oauth/v2/token" def _adobe_redirect_uri() -> str: return settings.adobe_redirect_uri def _sanitize_return_to(value: str | None) -> str: if value and value.startswith("/#/"): return value if value and value.startswith("#/"): return f"/{value}" return "/#/templates" def _build_adobe_authorization_url(state: str) -> str: return ( f"{_ADOBE_AUTH_URL}" f"?response_type=code" f"&client_id={settings.adobe_client_id}" f"&redirect_uri={_adobe_redirect_uri()}" f"&scope=library_read:self+library_write:self+user_read:self" f"&state={state}" ) async def _refresh_adobe_session_token(refresh_token: str) -> dict: async with httpx.AsyncClient() as client: resp = await client.post( _ADOBE_TOKEN_URL, data={ "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": settings.adobe_client_id, "client_secret": settings.adobe_client_secret, }, ) if not resp.is_success: raise RuntimeError(f"Adobe Sign token refresh failed ({resp.status_code})") token_data = resp.json() if "error" in token_data: raise RuntimeError(token_data.get("error_description", token_data["error"])) return token_data async def _fetch_adobe_profile(access_token: str) -> dict: """ Best-effort Adobe Sign profile lookup used only for nicer UI labels. This should never block a successful connection if Adobe returns sparse data. """ try: async with httpx.AsyncClient() as client: resp = await client.get( f"{settings.adobe_sign_base_url}/users/me", headers={"Authorization": f"Bearer {access_token}"}, ) except Exception: return {} if not resp.is_success: return {} data = resp.json() if resp.content else {} if not isinstance(data, dict): return {} company = data.get("company") or {} if not isinstance(company, dict): company = {"name": str(company)} if company else {} account_name = ( company.get("name") or data.get("companyName") or data.get("accountName") or data.get("name") ) account_id = ( company.get("id") or data.get("companyId") or data.get("accountId") ) return { "adobe_user_name": data.get("name"), "adobe_user_email": data.get("email"), "adobe_account_name": account_name, "adobe_account_id": account_id, } async def _merge_adobe_profile(session: dict, access_token: str) -> dict: profile = await _fetch_adobe_profile(access_token) if not profile: return session updated = dict(session) for key, value in profile.items(): if value: updated[key] = value return updated # --------------------------------------------------------------------------- # Status # --------------------------------------------------------------------------- @router.get("/status") def auth_status(request: Request): """Returns which platforms the current session is connected to.""" session = get_session(request) docusign_account = None try: docusign_account = current_account(session) if session.get("docusign_access_token") else None except DocusignContextError: docusign_account = None return { **session_public_view(session), "adobe_label": session.get("adobe_account_name") or session.get("adobe_user_name") or "Adobe Sign", "docusign_label": session.get("docusign_user_name") or "Docusign", "adobe_account_name": session.get("adobe_account_name"), "adobe_account_id": session.get("adobe_account_id"), "docusign_account_id": (docusign_account or {}).get("account_id"), "docusign_account_name": (docusign_account or {}).get("account_name"), "base_url": (docusign_account or {}).get("base_url", settings.docusign_base_url), "docusign_account_selection_required": account_picker_required(session), "is_admin": is_admin_session(session), } # --------------------------------------------------------------------------- # Adobe Sign — OAuth Authorization Code Grant # --------------------------------------------------------------------------- @router.get("/adobe/url") def adobe_auth_url(request: Request, return_to: str | None = None): session = get_session(request) state = secrets.token_urlsafe(24) session["adobe_oauth_state"] = state session["adobe_auth_mode"] = "authorization_pending" session["adobe_return_to"] = _sanitize_return_to(return_to) response = JSONResponse({"url": _build_adobe_authorization_url(state)}) save_session(response, session) return response class AdobeExchangeRequest(BaseModel): redirect_url: str class DocusignAccountSelectRequest(BaseModel): account_id: str @router.post("/adobe/exchange") async def adobe_exchange(body: AdobeExchangeRequest, request: Request): """ Accept the full redirect URL that the user copied after Adobe Sign authorization (e.g. https://localhost:8080/callback?code=...&api_access_point=...). Extract the code and exchange it for tokens. """ parsed = urlparse(body.redirect_url) params = parse_qs(parsed.query) if "error" in params: error = params.get("error_description", params.get("error", ["unknown"]))[0] return JSONResponse({"error": f"Adobe Sign returned error: {error}"}, status_code=400) code_list = params.get("code") if not code_list: return JSONResponse({"error": "No code found in the URL. Did you paste the correct redirect URL?"}, status_code=400) code = code_list[0] async with httpx.AsyncClient() as client: resp = await client.post( _ADOBE_TOKEN_URL, data={ "grant_type": "authorization_code", "client_id": settings.adobe_client_id, "client_secret": settings.adobe_client_secret, "redirect_uri": _adobe_redirect_uri(), "code": code, }, ) if not resp.is_success: return JSONResponse({"error": "Token exchange failed", "detail": resp.text}, status_code=502) token_data = resp.json() if "error" in token_data: return JSONResponse({"error": token_data.get("error_description", token_data["error"])}, status_code=400) session = get_session(request) session["adobe_access_token"] = token_data.get("access_token") session["adobe_refresh_token"] = token_data.get("refresh_token") session["adobe_auth_mode"] = "session_oauth" session = await _merge_adobe_profile(session, session["adobe_access_token"]) log_event( request, session, "adobe_connected", {"auth_mode": "session_oauth", "source": "manual_exchange"}, ) response = JSONResponse({"connected": True}) save_session(response, session) return response @router.get("/adobe/connect") async def adobe_connect(request: Request, force_oauth: bool = False, return_to: str | None = None): """ Obtain an Adobe Sign access token for this browser session. If session/env tokens are unavailable or force_oauth=true, return an authorization URL so the frontend can start a normal OAuth flow. """ session = get_session(request) token = session.get("adobe_access_token") refresh_token = session.get("adobe_refresh_token") if not force_oauth and not token and not refresh_token: import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from adobe_api import _refresh_access_token env_token = os.getenv("ADOBE_ACCESS_TOKEN") env_refresh_token = os.getenv("ADOBE_REFRESH_TOKEN") if env_token or env_refresh_token: token = env_token refresh_token = env_refresh_token if refresh_token: try: token = _refresh_access_token() except RuntimeError as e: return JSONResponse({"error": str(e)}, status_code=500) session["adobe_access_token"] = token session["adobe_refresh_token"] = refresh_token session["adobe_auth_mode"] = "shared_env" session = await _merge_adobe_profile(session, token) log_event( request, session, "adobe_connected", {"auth_mode": "shared_env", "source": "server_env"}, ) response = JSONResponse({"connected": True}) save_session(response, session) return response if not force_oauth and not token and refresh_token: try: token_data = await _refresh_adobe_session_token(refresh_token) except RuntimeError as e: return JSONResponse({"error": str(e)}, status_code=500) token = token_data.get("access_token") session["adobe_access_token"] = token session["adobe_refresh_token"] = token_data.get("refresh_token", refresh_token) session["adobe_auth_mode"] = "session_oauth" session = await _merge_adobe_profile(session, token) log_event( request, session, "adobe_connected", {"auth_mode": "session_oauth", "source": "session_refresh"}, ) response = JSONResponse({"connected": True}) save_session(response, session) return response if not force_oauth and token: response = JSONResponse({"connected": True}) save_session(response, session) return response state = secrets.token_urlsafe(24) session["adobe_oauth_state"] = state session["adobe_auth_mode"] = "authorization_pending" session["adobe_return_to"] = _sanitize_return_to(return_to) authorization_url = _build_adobe_authorization_url(state) log_event( request, session, "adobe_authorization_requested", {"auth_mode": "authorization_pending"}, ) response = JSONResponse( { "connected": False, "authorization_required": True, "authorization_url": authorization_url, } ) save_session(response, session) return response @router.get("/adobe/callback") async def adobe_callback(request: Request, code: str = "", state: str = ""): """Handle Adobe Sign OAuth redirect callback.""" if not code: return JSONResponse({"error": "missing code"}, status_code=400) session = get_session(request) expected_state = session.get("adobe_oauth_state") if not expected_state or state != expected_state: return JSONResponse({"error": "invalid oauth state"}, status_code=400) async with httpx.AsyncClient() as client: resp = await client.post( _ADOBE_TOKEN_URL, data={ "grant_type": "authorization_code", "client_id": settings.adobe_client_id, "client_secret": settings.adobe_client_secret, "redirect_uri": _adobe_redirect_uri(), "code": code, }, ) if not resp.is_success: return JSONResponse({"error": "token exchange failed", "detail": resp.text}, status_code=502) token_data = resp.json() if "error" in token_data: return JSONResponse({"error": token_data.get("error_description", token_data["error"])}, status_code=400) session["adobe_access_token"] = token_data.get("access_token") session["adobe_refresh_token"] = token_data.get("refresh_token") session["adobe_auth_mode"] = "session_oauth" session.pop("adobe_oauth_state", None) session = await _merge_adobe_profile(session, session["adobe_access_token"]) log_event( request, session, "adobe_connected", {"auth_mode": "session_oauth", "source": "browser_callback"}, ) response = RedirectResponse(session.pop("adobe_return_to", "#/templates")) save_session(response, session) return response @router.get("/adobe/disconnect") def adobe_disconnect(request: Request): session = get_session(request) previous_account_name = session.get("adobe_account_name") session.pop("adobe_access_token", None) session.pop("adobe_refresh_token", None) session.pop("adobe_user_name", None) session.pop("adobe_user_email", None) session.pop("adobe_account_name", None) session.pop("adobe_account_id", None) session.pop("adobe_oauth_state", None) session["adobe_auth_mode"] = "disconnected" log_event( request, session, "adobe_disconnected", {"previous_account_name": previous_account_name}, ) response = JSONResponse({"disconnected": "adobe"}) save_session(response, session) return response # --------------------------------------------------------------------------- # DocuSign — Auth Code Grant + refresh token # --------------------------------------------------------------------------- @router.get("/docusign/connect") async def docusign_connect(request: Request, return_to: str | None = None): """ Obtain a DocuSign access token from the current browser session. If the session has not been authorized yet, return an authorization URL so the frontend can start an isolated OAuth flow for this tester. """ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from docusign_auth import ( build_authorization_url, refresh_access_token, session_from_token_data, session_has_valid_access_token, ) session = get_session(request) try: if session_has_valid_access_token(session): token = session["docusign_access_token"] elif session.get("docusign_refresh_token"): token_data = refresh_access_token(session["docusign_refresh_token"]) session = session_from_token_data(token_data, session) token = session["docusign_access_token"] else: raise RuntimeError("No DocuSign refresh token found for this browser session.") except RuntimeError as e: if "refresh token" in str(e).lower(): state = secrets.token_urlsafe(24) session["docusign_oauth_state"] = state session["docusign_auth_mode"] = "authorization_pending" session["docusign_return_to"] = _sanitize_return_to(return_to) authorization_url = build_authorization_url(state=state) log_event( request, session, "docusign_authorization_requested", {"auth_mode": "authorization_pending"}, ) response = JSONResponse( { "connected": False, "authorization_required": True, "authorization_url": authorization_url, }, status_code=200, ) save_session(response, session) return response return JSONResponse({"error": str(e)}, status_code=500) if not session.get("docusign_accounts"): try: session = merge_userinfo(session, await fetch_userinfo(token)) except DocusignContextError as e: return JSONResponse({"error": str(e)}, status_code=e.status_code) session["docusign_access_token"] = token session["docusign_auth_mode"] = "session_oauth" log_event( request, session, "docusign_connected", { "auth_mode": "session_oauth", "accounts_count": session.get("docusign_accounts_count", 0), "selection_required": account_picker_required(session), }, ) response = JSONResponse({ "connected": True, "account_selection_required": account_picker_required(session), }) save_session(response, session) return response @router.get("/docusign/start") def docusign_start(request: Request): """Redirect to the DocuSign OAuth authorization screen for this browser session.""" import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from docusign_auth import build_authorization_url session = get_session(request) state = secrets.token_urlsafe(24) session["docusign_oauth_state"] = state session["docusign_auth_mode"] = "authorization_pending" session["docusign_return_to"] = "/#/templates" authorization_url = build_authorization_url(state=state) log_event( request, session, "docusign_authorization_started", {"auth_mode": "authorization_pending"}, ) response = RedirectResponse(authorization_url) save_session(response, session) return response @router.get("/docusign/callback") async def docusign_callback(request: Request, code: str = "", state: str = ""): """Handle DocuSign OAuth redirect callback.""" import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from docusign_auth import exchange_code_for_token, session_from_token_data if not code: return JSONResponse({"error": "missing code"}, status_code=400) session = get_session(request) expected_state = session.get("docusign_oauth_state") if not expected_state or state != expected_state: return JSONResponse({"error": "invalid oauth state"}, status_code=400) try: token_data = exchange_code_for_token(code) session = session_from_token_data(token_data, session) session = merge_userinfo(session, await fetch_userinfo(session["docusign_access_token"])) except Exception as e: return JSONResponse({"error": "token exchange failed", "detail": str(e)}, status_code=502) session.pop("docusign_oauth_state", None) session["docusign_auth_mode"] = "session_oauth" log_event( request, session, "docusign_connected", { "auth_mode": "session_oauth", "accounts_count": session.get("docusign_accounts_count", 0), "selection_required": account_picker_required(session), }, ) response = RedirectResponse(session.pop("docusign_return_to", "#/templates")) save_session(response, session) return response @router.get("/docusign/accounts") def docusign_accounts(request: Request): session = get_session(request) if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) return { "accounts": session.get("docusign_accounts", []), "selected_account_id": session.get("docusign_selected_account_id"), "selection_required": account_picker_required(session), } @router.post("/docusign/account-select") def docusign_account_select(body: DocusignAccountSelectRequest, request: Request): session = get_session(request) if not session.get("docusign_access_token"): return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401) try: session = select_account(session, body.account_id) except DocusignContextError as e: return JSONResponse({"error": str(e), "code": e.code}, status_code=e.status_code) log_event( request, session, "docusign_account_selected", { "selected_account_id": session.get("docusign_selected_account_id"), "selected_account_name": session.get("docusign_selected_account_name"), }, ) response = JSONResponse( { "selected_account_id": session.get("docusign_selected_account_id"), "selected_account_name": session.get("docusign_selected_account_name"), } ) save_session(response, session) return response @router.get("/docusign/disconnect") def docusign_disconnect(request: Request): session = get_session(request) previous_account_name = session.get("docusign_selected_account_name") session.pop("docusign_access_token", None) session.pop("docusign_refresh_token", None) session.pop("docusign_token_expiry", None) session.pop("docusign_oauth_state", None) session.pop("docusign_user_name", None) session.pop("docusign_user_email", None) session.pop("docusign_accounts", None) session.pop("docusign_accounts_count", None) clear_selected_account(session) session["docusign_auth_mode"] = "disconnected" log_event( request, session, "docusign_disconnected", {"previous_account_name": previous_account_name}, ) response = JSONResponse({"disconnected": "docusign"}) save_session(response, session) return response