246 lines
8.4 KiB
Python
246 lines
8.4 KiB
Python
"""
|
|
web/routers/auth.py
|
|
-------------------
|
|
OAuth endpoints for Adobe Sign and DocuSign.
|
|
|
|
Adobe Sign uses the same redirect URI as the CLI (https://localhost:8080/callback).
|
|
Since nothing runs on that port, the browser lands on a failed page. The user copies
|
|
the URL and submits it via POST /api/auth/adobe/exchange — identical to the CLI flow.
|
|
|
|
DocuSign uses a standard redirect callback handled directly by this server.
|
|
|
|
Tokens are stored in a signed session cookie.
|
|
"""
|
|
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Request
|
|
from fastapi.responses import JSONResponse, RedirectResponse
|
|
from pydantic import BaseModel
|
|
|
|
from web.config import settings
|
|
from web.session import get_session, save_session
|
|
|
|
router = APIRouter()
|
|
|
|
# Adobe Sign registers https://localhost:8080/callback — same as the CLI script.
|
|
_ADOBE_REDIRECT_URI = "https://localhost:8080/callback"
|
|
_ADOBE_AUTH_URL = "https://secure.eu2.adobesign.com/public/oauth/v2"
|
|
_ADOBE_TOKEN_URL = "https://api.eu2.adobesign.com/oauth/v2/token"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/status")
|
|
def auth_status(request: Request):
|
|
"""Returns which platforms the current session is connected to."""
|
|
session = get_session(request)
|
|
return {
|
|
"adobe": bool(session.get("adobe_access_token")),
|
|
"docusign": bool(session.get("docusign_access_token")),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adobe Sign — manual paste flow (matches CLI behaviour)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/adobe/url")
|
|
def adobe_auth_url():
|
|
"""
|
|
Return the Adobe Sign authorization URL for the frontend to open in a new tab.
|
|
The user authorizes, lands on a failed page (nothing runs on :8080), copies
|
|
the URL, and submits it to POST /api/auth/adobe/exchange.
|
|
"""
|
|
params = (
|
|
f"?response_type=code"
|
|
f"&client_id={settings.adobe_client_id}"
|
|
f"&redirect_uri={_ADOBE_REDIRECT_URI}"
|
|
f"&scope=library_read:self+library_write:self+user_read:self"
|
|
)
|
|
return {"url": _ADOBE_AUTH_URL + params}
|
|
|
|
|
|
class AdobeExchangeRequest(BaseModel):
|
|
redirect_url: str
|
|
|
|
|
|
@router.post("/adobe/exchange")
|
|
async def adobe_exchange(body: AdobeExchangeRequest, request: Request):
|
|
"""
|
|
Accept the full redirect URL that the user copied after Adobe Sign authorization
|
|
(e.g. https://localhost:8080/callback?code=...&api_access_point=...).
|
|
Extract the code and exchange it for tokens.
|
|
"""
|
|
parsed = urlparse(body.redirect_url)
|
|
params = parse_qs(parsed.query)
|
|
|
|
if "error" in params:
|
|
error = params.get("error_description", params.get("error", ["unknown"]))[0]
|
|
return JSONResponse({"error": f"Adobe Sign returned error: {error}"}, status_code=400)
|
|
|
|
code_list = params.get("code")
|
|
if not code_list:
|
|
return JSONResponse({"error": "No code found in the URL. Did you paste the correct redirect URL?"}, status_code=400)
|
|
|
|
code = code_list[0]
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
_ADOBE_TOKEN_URL,
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"client_id": settings.adobe_client_id,
|
|
"client_secret": settings.adobe_client_secret,
|
|
"redirect_uri": _ADOBE_REDIRECT_URI,
|
|
"code": code,
|
|
},
|
|
)
|
|
|
|
if not resp.is_success:
|
|
return JSONResponse({"error": "Token exchange failed", "detail": resp.text}, status_code=502)
|
|
|
|
token_data = resp.json()
|
|
if "error" in token_data:
|
|
return JSONResponse({"error": token_data.get("error_description", token_data["error"])}, status_code=400)
|
|
|
|
session = get_session(request)
|
|
session["adobe_access_token"] = token_data.get("access_token")
|
|
session["adobe_refresh_token"] = token_data.get("refresh_token")
|
|
|
|
response = JSONResponse({"connected": True})
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
@router.get("/adobe/connect")
|
|
def adobe_connect_env(request: Request):
|
|
"""
|
|
Load Adobe Sign credentials directly from .env (ADOBE_ACCESS_TOKEN /
|
|
ADOBE_REFRESH_TOKEN). Refreshes the token if needed. No browser login required
|
|
when a valid refresh token already exists from a previous CLI auth session.
|
|
"""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
|
|
from adobe_api import _refresh_access_token
|
|
|
|
token = os.getenv("ADOBE_ACCESS_TOKEN")
|
|
refresh_token = os.getenv("ADOBE_REFRESH_TOKEN")
|
|
|
|
if not token and not refresh_token:
|
|
return JSONResponse(
|
|
{"error": "No Adobe Sign credentials found in .env. Run src/adobe_auth.py first."},
|
|
status_code=400,
|
|
)
|
|
|
|
# Always refresh to ensure the token is fresh (access tokens expire in ~1h)
|
|
if refresh_token:
|
|
try:
|
|
token = _refresh_access_token()
|
|
except RuntimeError as e:
|
|
return JSONResponse({"error": str(e)}, status_code=500)
|
|
|
|
session = get_session(request)
|
|
session["adobe_access_token"] = token
|
|
session["adobe_refresh_token"] = refresh_token
|
|
|
|
response = JSONResponse({"connected": True})
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
@router.get("/adobe/disconnect")
|
|
def adobe_disconnect(request: Request):
|
|
session = get_session(request)
|
|
session.pop("adobe_access_token", None)
|
|
session.pop("adobe_refresh_token", None)
|
|
response = JSONResponse({"disconnected": "adobe"})
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DocuSign — Auth Code Grant + refresh token
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/docusign/connect")
|
|
def docusign_connect(request: Request):
|
|
"""
|
|
Obtain a DocuSign access token from cached OAuth credentials in .env.
|
|
If the app has not been authorized yet, return the authorization URL so the
|
|
frontend can start the browser flow.
|
|
"""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
|
|
from docusign_auth import build_authorization_url, get_access_token
|
|
|
|
try:
|
|
token = get_access_token()
|
|
except RuntimeError as e:
|
|
if "refresh token" in str(e).lower():
|
|
return JSONResponse(
|
|
{
|
|
"connected": False,
|
|
"authorization_required": True,
|
|
"authorization_url": build_authorization_url(),
|
|
},
|
|
status_code=200,
|
|
)
|
|
return JSONResponse({"error": str(e)}, status_code=500)
|
|
|
|
session = get_session(request)
|
|
session["docusign_access_token"] = token
|
|
|
|
response = JSONResponse({"connected": True})
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
@router.get("/docusign/start")
|
|
def docusign_start():
|
|
"""Redirect to the DocuSign OAuth authorization screen."""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
|
|
from docusign_auth import build_authorization_url
|
|
|
|
return RedirectResponse(build_authorization_url())
|
|
|
|
|
|
@router.get("/docusign/callback")
|
|
async def docusign_callback(request: Request, code: str = ""):
|
|
"""Handle DocuSign OAuth redirect callback."""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
|
|
from docusign_auth import save_code_token_exchange
|
|
|
|
if not code:
|
|
return JSONResponse({"error": "missing code"}, status_code=400)
|
|
session = get_session(request)
|
|
try:
|
|
session["docusign_access_token"] = save_code_token_exchange(code)
|
|
except Exception as e:
|
|
return JSONResponse({"error": "token exchange failed", "detail": str(e)}, status_code=502)
|
|
|
|
session["docusign_refresh_token"] = os.getenv("DOCUSIGN_REFRESH_TOKEN")
|
|
|
|
response = RedirectResponse("/")
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
@router.get("/docusign/disconnect")
|
|
def docusign_disconnect(request: Request):
|
|
session = get_session(request)
|
|
session.pop("docusign_access_token", None)
|
|
session.pop("docusign_refresh_token", None)
|
|
response = JSONResponse({"disconnected": "docusign"})
|
|
save_session(response, session)
|
|
return response
|