adobe-to-docusign-migrator/web/routers/auth.py

251 lines
8.6 KiB
Python

"""
web/routers/auth.py
-------------------
OAuth endpoints for Adobe Sign and DocuSign.
Adobe Sign uses the same redirect URI as the CLI (https://localhost:8080/callback).
Since nothing runs on that port, the browser lands on a failed page. The user copies
the URL and submits it via POST /api/auth/adobe/exchange — identical to the CLI flow.
DocuSign uses a standard redirect callback handled directly by this server.
Tokens are stored in a signed session cookie.
"""
from urllib.parse import urlparse, parse_qs
import httpx
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse, RedirectResponse
from pydantic import BaseModel
from web.config import settings
from web.session import get_session, save_session
router = APIRouter()
# Adobe Sign registers https://localhost:8080/callback — same as the CLI script.
_ADOBE_REDIRECT_URI = "https://localhost:8080/callback"
_ADOBE_AUTH_URL = "https://secure.eu2.adobesign.com/public/oauth/v2"
_ADOBE_TOKEN_URL = "https://api.eu2.adobesign.com/oauth/v2/token"
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
@router.get("/status")
def auth_status(request: Request):
"""Returns which platforms the current session is connected to."""
session = get_session(request)
return {
"adobe": bool(session.get("adobe_access_token")),
"docusign": bool(session.get("docusign_access_token")),
}
# ---------------------------------------------------------------------------
# Adobe Sign — manual paste flow (matches CLI behaviour)
# ---------------------------------------------------------------------------
@router.get("/adobe/url")
def adobe_auth_url():
"""
Return the Adobe Sign authorization URL for the frontend to open in a new tab.
The user authorizes, lands on a failed page (nothing runs on :8080), copies
the URL, and submits it to POST /api/auth/adobe/exchange.
"""
params = (
f"?response_type=code"
f"&client_id={settings.adobe_client_id}"
f"&redirect_uri={_ADOBE_REDIRECT_URI}"
f"&scope=library_read:self+library_write:self+user_read:self"
)
return {"url": _ADOBE_AUTH_URL + params}
class AdobeExchangeRequest(BaseModel):
redirect_url: str
@router.post("/adobe/exchange")
async def adobe_exchange(body: AdobeExchangeRequest, request: Request):
"""
Accept the full redirect URL that the user copied after Adobe Sign authorization
(e.g. https://localhost:8080/callback?code=...&api_access_point=...).
Extract the code and exchange it for tokens.
"""
parsed = urlparse(body.redirect_url)
params = parse_qs(parsed.query)
if "error" in params:
error = params.get("error_description", params.get("error", ["unknown"]))[0]
return JSONResponse({"error": f"Adobe Sign returned error: {error}"}, status_code=400)
code_list = params.get("code")
if not code_list:
return JSONResponse({"error": "No code found in the URL. Did you paste the correct redirect URL?"}, status_code=400)
code = code_list[0]
async with httpx.AsyncClient() as client:
resp = await client.post(
_ADOBE_TOKEN_URL,
data={
"grant_type": "authorization_code",
"client_id": settings.adobe_client_id,
"client_secret": settings.adobe_client_secret,
"redirect_uri": _ADOBE_REDIRECT_URI,
"code": code,
},
)
if not resp.is_success:
return JSONResponse({"error": "Token exchange failed", "detail": resp.text}, status_code=502)
token_data = resp.json()
if "error" in token_data:
return JSONResponse({"error": token_data.get("error_description", token_data["error"])}, status_code=400)
session = get_session(request)
session["adobe_access_token"] = token_data.get("access_token")
session["adobe_refresh_token"] = token_data.get("refresh_token")
response = JSONResponse({"connected": True})
save_session(response, session)
return response
@router.get("/adobe/connect")
def adobe_connect_env(request: Request):
"""
Load Adobe Sign credentials directly from .env (ADOBE_ACCESS_TOKEN /
ADOBE_REFRESH_TOKEN). Refreshes the token if needed. No browser login required
when a valid refresh token already exists from a previous CLI auth session.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
from adobe_api import _refresh_access_token
token = os.getenv("ADOBE_ACCESS_TOKEN")
refresh_token = os.getenv("ADOBE_REFRESH_TOKEN")
if not token and not refresh_token:
return JSONResponse(
{"error": "No Adobe Sign credentials found in .env. Run src/adobe_auth.py first."},
status_code=400,
)
# Always refresh to ensure the token is fresh (access tokens expire in ~1h)
if refresh_token:
try:
token = _refresh_access_token()
except RuntimeError as e:
return JSONResponse({"error": str(e)}, status_code=500)
session = get_session(request)
session["adobe_access_token"] = token
session["adobe_refresh_token"] = refresh_token
response = JSONResponse({"connected": True})
save_session(response, session)
return response
@router.get("/adobe/disconnect")
def adobe_disconnect(request: Request):
session = get_session(request)
session.pop("adobe_access_token", None)
session.pop("adobe_refresh_token", None)
response = JSONResponse({"disconnected": "adobe"})
save_session(response, session)
return response
# ---------------------------------------------------------------------------
# DocuSign — JWT grant (.env) or OAuth redirect
# ---------------------------------------------------------------------------
@router.get("/docusign/connect")
def docusign_connect(request: Request):
"""
Obtain a DocuSign access token via JWT grant using the credentials already
in .env (DOCUSIGN_CLIENT_ID, DOCUSIGN_USER_ID, DOCUSIGN_PRIVATE_KEY_PATH).
No browser sign-in needed — consent was already granted via the CLI setup.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
from docusign_auth import get_access_token
try:
token = get_access_token()
except RuntimeError as e:
return JSONResponse({"error": str(e)}, status_code=500)
session = get_session(request)
session["docusign_access_token"] = token
response = JSONResponse({"connected": True})
save_session(response, session)
return response
@router.get("/docusign/start")
def docusign_start():
"""Redirect to DocuSign OAuth (alternative to JWT grant)."""
import base64 as _b64
params = (
f"?response_type=code"
f"&scope=signature"
f"&client_id={settings.docusign_client_id}"
f"&redirect_uri={settings.docusign_redirect_uri}"
)
return RedirectResponse(f"https://{settings.docusign_auth_server}/oauth/auth" + params)
@router.get("/docusign/callback")
async def docusign_callback(request: Request, code: str = ""):
"""Handle DocuSign OAuth redirect callback."""
import base64
if not code:
return JSONResponse({"error": "missing code"}, status_code=400)
credentials = base64.b64encode(
f"{settings.docusign_client_id}:{settings.docusign_client_secret}".encode()
).decode()
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://{settings.docusign_auth_server}/oauth/token",
headers={"Authorization": f"Basic {credentials}"},
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": settings.docusign_redirect_uri,
},
)
if not resp.is_success:
return JSONResponse({"error": "token exchange failed", "detail": resp.text}, status_code=502)
token_data = resp.json()
session = get_session(request)
session["docusign_access_token"] = token_data.get("access_token")
session["docusign_refresh_token"] = token_data.get("refresh_token")
response = RedirectResponse("/")
save_session(response, session)
return response
@router.get("/docusign/disconnect")
def docusign_disconnect(request: Request):
session = get_session(request)
session.pop("docusign_access_token", None)
session.pop("docusign_refresh_token", None)
response = JSONResponse({"disconnected": "docusign"})
save_session(response, session)
return response