""" web/routers/auth.py ------------------- OAuth endpoints for Adobe Sign and DocuSign. Adobe Sign uses the same redirect URI as the CLI (https://localhost:8080/callback). Since nothing runs on that port, the browser lands on a failed page. The user copies the URL and submits it via POST /api/auth/adobe/exchange — identical to the CLI flow. DocuSign uses a standard redirect callback handled directly by this server. Tokens are stored in a signed session cookie. """ from urllib.parse import urlparse, parse_qs import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse, RedirectResponse from pydantic import BaseModel from web.config import settings from web.session import get_session, save_session router = APIRouter() # Adobe Sign registers https://localhost:8080/callback — same as the CLI script. _ADOBE_REDIRECT_URI = "https://localhost:8080/callback" _ADOBE_AUTH_URL = "https://secure.eu2.adobesign.com/public/oauth/v2" _ADOBE_TOKEN_URL = "https://api.eu2.adobesign.com/oauth/v2/token" # --------------------------------------------------------------------------- # Status # --------------------------------------------------------------------------- @router.get("/status") def auth_status(request: Request): """Returns which platforms the current session is connected to.""" session = get_session(request) return { "adobe": bool(session.get("adobe_access_token")), "docusign": bool(session.get("docusign_access_token")), } # --------------------------------------------------------------------------- # Adobe Sign — manual paste flow (matches CLI behaviour) # --------------------------------------------------------------------------- @router.get("/adobe/url") def adobe_auth_url(): """ Return the Adobe Sign authorization URL for the frontend to open in a new tab. The user authorizes, lands on a failed page (nothing runs on :8080), copies the URL, and submits it to POST /api/auth/adobe/exchange. """ params = ( f"?response_type=code" f"&client_id={settings.adobe_client_id}" f"&redirect_uri={_ADOBE_REDIRECT_URI}" f"&scope=library_read:self+library_write:self+user_read:self" ) return {"url": _ADOBE_AUTH_URL + params} class AdobeExchangeRequest(BaseModel): redirect_url: str @router.post("/adobe/exchange") async def adobe_exchange(body: AdobeExchangeRequest, request: Request): """ Accept the full redirect URL that the user copied after Adobe Sign authorization (e.g. https://localhost:8080/callback?code=...&api_access_point=...). Extract the code and exchange it for tokens. """ parsed = urlparse(body.redirect_url) params = parse_qs(parsed.query) if "error" in params: error = params.get("error_description", params.get("error", ["unknown"]))[0] return JSONResponse({"error": f"Adobe Sign returned error: {error}"}, status_code=400) code_list = params.get("code") if not code_list: return JSONResponse({"error": "No code found in the URL. Did you paste the correct redirect URL?"}, status_code=400) code = code_list[0] async with httpx.AsyncClient() as client: resp = await client.post( _ADOBE_TOKEN_URL, data={ "grant_type": "authorization_code", "client_id": settings.adobe_client_id, "client_secret": settings.adobe_client_secret, "redirect_uri": _ADOBE_REDIRECT_URI, "code": code, }, ) if not resp.is_success: return JSONResponse({"error": "Token exchange failed", "detail": resp.text}, status_code=502) token_data = resp.json() if "error" in token_data: return JSONResponse({"error": token_data.get("error_description", token_data["error"])}, status_code=400) session = get_session(request) session["adobe_access_token"] = token_data.get("access_token") session["adobe_refresh_token"] = token_data.get("refresh_token") response = JSONResponse({"connected": True}) save_session(response, session) return response @router.get("/adobe/connect") def adobe_connect_env(request: Request): """ Load Adobe Sign credentials directly from .env (ADOBE_ACCESS_TOKEN / ADOBE_REFRESH_TOKEN). Refreshes the token if needed. No browser login required when a valid refresh token already exists from a previous CLI auth session. """ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from adobe_api import _refresh_access_token token = os.getenv("ADOBE_ACCESS_TOKEN") refresh_token = os.getenv("ADOBE_REFRESH_TOKEN") if not token and not refresh_token: return JSONResponse( {"error": "No Adobe Sign credentials found in .env. Run src/adobe_auth.py first."}, status_code=400, ) # Always refresh to ensure the token is fresh (access tokens expire in ~1h) if refresh_token: try: token = _refresh_access_token() except RuntimeError as e: return JSONResponse({"error": str(e)}, status_code=500) session = get_session(request) session["adobe_access_token"] = token session["adobe_refresh_token"] = refresh_token response = JSONResponse({"connected": True}) save_session(response, session) return response @router.get("/adobe/disconnect") def adobe_disconnect(request: Request): session = get_session(request) session.pop("adobe_access_token", None) session.pop("adobe_refresh_token", None) response = JSONResponse({"disconnected": "adobe"}) save_session(response, session) return response # --------------------------------------------------------------------------- # DocuSign — JWT grant (.env) or OAuth redirect # --------------------------------------------------------------------------- @router.get("/docusign/connect") def docusign_connect(request: Request): """ Obtain a DocuSign access token via JWT grant using the credentials already in .env (DOCUSIGN_CLIENT_ID, DOCUSIGN_USER_ID, DOCUSIGN_PRIVATE_KEY_PATH). No browser sign-in needed — consent was already granted via the CLI setup. """ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from docusign_auth import get_access_token try: token = get_access_token() except RuntimeError as e: return JSONResponse({"error": str(e)}, status_code=500) session = get_session(request) session["docusign_access_token"] = token response = JSONResponse({"connected": True}) save_session(response, session) return response @router.get("/docusign/start") def docusign_start(): """Redirect to DocuSign OAuth (alternative to JWT grant).""" import base64 as _b64 params = ( f"?response_type=code" f"&scope=signature" f"&client_id={settings.docusign_client_id}" f"&redirect_uri={settings.docusign_redirect_uri}" ) return RedirectResponse(f"https://{settings.docusign_auth_server}/oauth/auth" + params) @router.get("/docusign/callback") async def docusign_callback(request: Request, code: str = ""): """Handle DocuSign OAuth redirect callback.""" import base64 if not code: return JSONResponse({"error": "missing code"}, status_code=400) credentials = base64.b64encode( f"{settings.docusign_client_id}:{settings.docusign_client_secret}".encode() ).decode() async with httpx.AsyncClient() as client: resp = await client.post( f"https://{settings.docusign_auth_server}/oauth/token", headers={"Authorization": f"Basic {credentials}"}, data={ "grant_type": "authorization_code", "code": code, "redirect_uri": settings.docusign_redirect_uri, }, ) if not resp.is_success: return JSONResponse({"error": "token exchange failed", "detail": resp.text}, status_code=502) token_data = resp.json() session = get_session(request) session["docusign_access_token"] = token_data.get("access_token") session["docusign_refresh_token"] = token_data.get("refresh_token") response = RedirectResponse("/") save_session(response, session) return response @router.get("/docusign/disconnect") def docusign_disconnect(request: Request): session = get_session(request) session.pop("docusign_access_token", None) session.pop("docusign_refresh_token", None) response = JSONResponse({"disconnected": "docusign"}) save_session(response, session) return response