165 lines
5.6 KiB
Python
165 lines
5.6 KiB
Python
"""
|
|
web/routers/auth.py
|
|
-------------------
|
|
OAuth endpoints for Adobe Sign and DocuSign.
|
|
|
|
Adobe Sign uses the same redirect URI as the CLI (https://localhost:8080/callback).
|
|
Since nothing runs on that port, the browser lands on a failed page. The user copies
|
|
the URL and submits it via POST /api/auth/adobe/exchange — identical to the CLI flow.
|
|
|
|
DocuSign uses a standard redirect callback handled directly by this server.
|
|
|
|
Tokens are stored in a signed session cookie.
|
|
"""
|
|
|
|
from urllib.parse import urlparse, parse_qs
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Request
|
|
from fastapi.responses import JSONResponse, RedirectResponse
|
|
from pydantic import BaseModel
|
|
|
|
from web.config import settings
|
|
from web.session import get_session, save_session
|
|
|
|
router = APIRouter()
|
|
|
|
# Adobe Sign registers https://localhost:8080/callback — same as the CLI script.
|
|
_ADOBE_REDIRECT_URI = "https://localhost:8080/callback"
|
|
_ADOBE_AUTH_URL = "https://secure.eu2.adobesign.com/public/oauth/v2"
|
|
_ADOBE_TOKEN_URL = "https://api.eu2.adobesign.com/oauth/v2/token"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/status")
|
|
def auth_status(request: Request):
|
|
"""Returns which platforms the current session is connected to."""
|
|
session = get_session(request)
|
|
return {
|
|
"adobe": bool(session.get("adobe_access_token")),
|
|
"docusign": bool(session.get("docusign_access_token")),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Adobe Sign — manual paste flow (matches CLI behaviour)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/adobe/url")
|
|
def adobe_auth_url():
|
|
"""
|
|
Return the Adobe Sign authorization URL for the frontend to open in a new tab.
|
|
The user authorizes, lands on a failed page (nothing runs on :8080), copies
|
|
the URL, and submits it to POST /api/auth/adobe/exchange.
|
|
"""
|
|
params = (
|
|
f"?response_type=code"
|
|
f"&client_id={settings.adobe_client_id}"
|
|
f"&redirect_uri={_ADOBE_REDIRECT_URI}"
|
|
f"&scope=library_read:self+library_write:self+user_read:self"
|
|
)
|
|
return {"url": _ADOBE_AUTH_URL + params}
|
|
|
|
|
|
class AdobeExchangeRequest(BaseModel):
|
|
redirect_url: str
|
|
|
|
|
|
@router.post("/adobe/exchange")
|
|
async def adobe_exchange(body: AdobeExchangeRequest, request: Request):
|
|
"""
|
|
Accept the full redirect URL that the user copied after Adobe Sign authorization
|
|
(e.g. https://localhost:8080/callback?code=...&api_access_point=...).
|
|
Extract the code and exchange it for tokens.
|
|
"""
|
|
parsed = urlparse(body.redirect_url)
|
|
params = parse_qs(parsed.query)
|
|
|
|
if "error" in params:
|
|
error = params.get("error_description", params.get("error", ["unknown"]))[0]
|
|
return JSONResponse({"error": f"Adobe Sign returned error: {error}"}, status_code=400)
|
|
|
|
code_list = params.get("code")
|
|
if not code_list:
|
|
return JSONResponse({"error": "No code found in the URL. Did you paste the correct redirect URL?"}, status_code=400)
|
|
|
|
code = code_list[0]
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
_ADOBE_TOKEN_URL,
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"client_id": settings.adobe_client_id,
|
|
"client_secret": settings.adobe_client_secret,
|
|
"redirect_uri": _ADOBE_REDIRECT_URI,
|
|
"code": code,
|
|
},
|
|
)
|
|
|
|
if not resp.is_success:
|
|
return JSONResponse({"error": "Token exchange failed", "detail": resp.text}, status_code=502)
|
|
|
|
token_data = resp.json()
|
|
if "error" in token_data:
|
|
return JSONResponse({"error": token_data.get("error_description", token_data["error"])}, status_code=400)
|
|
|
|
session = get_session(request)
|
|
session["adobe_access_token"] = token_data.get("access_token")
|
|
session["adobe_refresh_token"] = token_data.get("refresh_token")
|
|
|
|
response = JSONResponse({"connected": True})
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
@router.get("/adobe/disconnect")
|
|
def adobe_disconnect(request: Request):
|
|
session = get_session(request)
|
|
session.pop("adobe_access_token", None)
|
|
session.pop("adobe_refresh_token", None)
|
|
response = JSONResponse({"disconnected": "adobe"})
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DocuSign — JWT grant using existing .env credentials
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.get("/docusign/connect")
|
|
def docusign_connect(request: Request):
|
|
"""
|
|
Obtain a DocuSign access token via JWT grant using the credentials already
|
|
in .env (DOCUSIGN_CLIENT_ID, DOCUSIGN_USER_ID, DOCUSIGN_PRIVATE_KEY_PATH).
|
|
No browser sign-in needed — consent was already granted via the CLI setup.
|
|
"""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
|
|
from docusign_auth import get_access_token
|
|
|
|
try:
|
|
token = get_access_token()
|
|
except RuntimeError as e:
|
|
return JSONResponse({"error": str(e)}, status_code=500)
|
|
|
|
session = get_session(request)
|
|
session["docusign_access_token"] = token
|
|
|
|
response = JSONResponse({"connected": True})
|
|
save_session(response, session)
|
|
return response
|
|
|
|
|
|
@router.get("/docusign/disconnect")
|
|
def docusign_disconnect(request: Request):
|
|
session = get_session(request)
|
|
session.pop("docusign_access_token", None)
|
|
response = JSONResponse({"disconnected": "docusign"})
|
|
save_session(response, session)
|
|
return response
|