""" web/routers/auth.py ------------------- OAuth endpoints for Adobe Sign and DocuSign. Adobe Sign uses the same redirect URI as the CLI (https://localhost:8080/callback). Since nothing runs on that port, the browser lands on a failed page. The user copies the URL and submits it via POST /api/auth/adobe/exchange — identical to the CLI flow. DocuSign uses a standard redirect callback handled directly by this server. Tokens are stored in a signed session cookie. """ from urllib.parse import urlparse, parse_qs import httpx from fastapi import APIRouter, Request from fastapi.responses import JSONResponse, RedirectResponse from pydantic import BaseModel from web.config import settings from web.session import get_session, save_session router = APIRouter() # Adobe Sign registers https://localhost:8080/callback — same as the CLI script. _ADOBE_REDIRECT_URI = "https://localhost:8080/callback" _ADOBE_AUTH_URL = "https://secure.eu2.adobesign.com/public/oauth/v2" _ADOBE_TOKEN_URL = "https://api.eu2.adobesign.com/oauth/v2/token" # --------------------------------------------------------------------------- # Status # --------------------------------------------------------------------------- @router.get("/status") def auth_status(request: Request): """Returns which platforms the current session is connected to.""" session = get_session(request) return { "adobe": bool(session.get("adobe_access_token")), "docusign": bool(session.get("docusign_access_token")), } # --------------------------------------------------------------------------- # Adobe Sign — manual paste flow (matches CLI behaviour) # --------------------------------------------------------------------------- @router.get("/adobe/url") def adobe_auth_url(): """ Return the Adobe Sign authorization URL for the frontend to open in a new tab. The user authorizes, lands on a failed page (nothing runs on :8080), copies the URL, and submits it to POST /api/auth/adobe/exchange. """ params = ( f"?response_type=code" f"&client_id={settings.adobe_client_id}" f"&redirect_uri={_ADOBE_REDIRECT_URI}" f"&scope=library_read:self+library_write:self+user_read:self" ) return {"url": _ADOBE_AUTH_URL + params} class AdobeExchangeRequest(BaseModel): redirect_url: str @router.post("/adobe/exchange") async def adobe_exchange(body: AdobeExchangeRequest, request: Request): """ Accept the full redirect URL that the user copied after Adobe Sign authorization (e.g. https://localhost:8080/callback?code=...&api_access_point=...). Extract the code and exchange it for tokens. """ parsed = urlparse(body.redirect_url) params = parse_qs(parsed.query) if "error" in params: error = params.get("error_description", params.get("error", ["unknown"]))[0] return JSONResponse({"error": f"Adobe Sign returned error: {error}"}, status_code=400) code_list = params.get("code") if not code_list: return JSONResponse({"error": "No code found in the URL. Did you paste the correct redirect URL?"}, status_code=400) code = code_list[0] async with httpx.AsyncClient() as client: resp = await client.post( _ADOBE_TOKEN_URL, data={ "grant_type": "authorization_code", "client_id": settings.adobe_client_id, "client_secret": settings.adobe_client_secret, "redirect_uri": _ADOBE_REDIRECT_URI, "code": code, }, ) if not resp.is_success: return JSONResponse({"error": "Token exchange failed", "detail": resp.text}, status_code=502) token_data = resp.json() if "error" in token_data: return JSONResponse({"error": token_data.get("error_description", token_data["error"])}, status_code=400) session = get_session(request) session["adobe_access_token"] = token_data.get("access_token") session["adobe_refresh_token"] = token_data.get("refresh_token") response = JSONResponse({"connected": True}) save_session(response, session) return response @router.get("/adobe/connect") def adobe_connect_env(request: Request): """ Load Adobe Sign credentials directly from .env (ADOBE_ACCESS_TOKEN / ADOBE_REFRESH_TOKEN). Refreshes the token if needed. No browser login required when a valid refresh token already exists from a previous CLI auth session. """ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from adobe_api import _refresh_access_token token = os.getenv("ADOBE_ACCESS_TOKEN") refresh_token = os.getenv("ADOBE_REFRESH_TOKEN") if not token and not refresh_token: return JSONResponse( {"error": "No Adobe Sign credentials found in .env. Run src/adobe_auth.py first."}, status_code=400, ) # Always refresh to ensure the token is fresh (access tokens expire in ~1h) if refresh_token: try: token = _refresh_access_token() except RuntimeError as e: return JSONResponse({"error": str(e)}, status_code=500) session = get_session(request) session["adobe_access_token"] = token session["adobe_refresh_token"] = refresh_token response = JSONResponse({"connected": True}) save_session(response, session) return response @router.get("/adobe/disconnect") def adobe_disconnect(request: Request): session = get_session(request) session.pop("adobe_access_token", None) session.pop("adobe_refresh_token", None) response = JSONResponse({"disconnected": "adobe"}) save_session(response, session) return response # --------------------------------------------------------------------------- # DocuSign — Auth Code Grant + refresh token # --------------------------------------------------------------------------- @router.get("/docusign/connect") def docusign_connect(request: Request): """ Obtain a DocuSign access token from cached OAuth credentials in .env. If the app has not been authorized yet, return the authorization URL so the frontend can start the browser flow. """ import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from docusign_auth import build_authorization_url, get_access_token try: token = get_access_token() except RuntimeError as e: if "refresh token" in str(e).lower(): return JSONResponse( { "connected": False, "authorization_required": True, "authorization_url": build_authorization_url(), }, status_code=200, ) return JSONResponse({"error": str(e)}, status_code=500) session = get_session(request) session["docusign_access_token"] = token response = JSONResponse({"connected": True}) save_session(response, session) return response @router.get("/docusign/start") def docusign_start(): """Redirect to the DocuSign OAuth authorization screen.""" import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from docusign_auth import build_authorization_url return RedirectResponse(build_authorization_url()) @router.get("/docusign/callback") async def docusign_callback(request: Request, code: str = ""): """Handle DocuSign OAuth redirect callback.""" import sys import os sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) from docusign_auth import save_code_token_exchange if not code: return JSONResponse({"error": "missing code"}, status_code=400) session = get_session(request) try: session["docusign_access_token"] = save_code_token_exchange(code) except Exception as e: return JSONResponse({"error": "token exchange failed", "detail": str(e)}, status_code=502) session["docusign_refresh_token"] = os.getenv("DOCUSIGN_REFRESH_TOKEN") response = RedirectResponse("/") save_session(response, session) return response @router.get("/docusign/disconnect") def docusign_disconnect(request: Request): session = get_session(request) session.pop("docusign_access_token", None) session.pop("docusign_refresh_token", None) response = JSONResponse({"disconnected": "docusign"}) save_session(response, session) return response