adobe-to-docusign-migrator/web/routers/migrate.py

258 lines
8.9 KiB
Python

"""
web/routers/migrate.py
----------------------
Migration trigger and history endpoints.
POST /api/migrate — run the pipeline for one or more Adobe template IDs
GET /api/migrate/history — return past migration records
"""
import asyncio
import json
import os
import sys
import tempfile
from datetime import datetime, timezone
from typing import List, Optional
import httpx
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from web.config import settings
from web.session import get_session
# Ensure src/ is on path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
router = APIRouter()
_HISTORY_FILE = os.path.join(
os.path.dirname(__file__), "..", "..", "migration-output", ".history.json"
)
class MigrateRequest(BaseModel):
adobe_template_ids: List[str]
def _load_history() -> list:
if not os.path.exists(_HISTORY_FILE):
return []
with open(_HISTORY_FILE) as f:
return json.load(f)
def _save_history(records: list) -> None:
os.makedirs(os.path.dirname(_HISTORY_FILE), exist_ok=True)
with open(_HISTORY_FILE, "w") as f:
json.dump(records, f, indent=2)
def _load_compose():
"""
Dynamically load and return the compose_template function from src/.
Isolated in its own function so tests can patch it without touching the file system.
"""
import importlib.util
spec = importlib.util.spec_from_file_location(
"compose_docusign_template",
os.path.join(os.path.dirname(__file__), "..", "..", "src", "compose_docusign_template.py"),
)
compose_mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(compose_mod)
return compose_mod.compose_template
async def _download_adobe_template(template_id: str, access_token: str, output_dir: str) -> bool:
"""Download Adobe Sign template files into output_dir. Returns True on success."""
headers = {"Authorization": f"Bearer {access_token}"}
base = settings.adobe_sign_base_url
async with httpx.AsyncClient() as client:
# Metadata
meta_resp = await client.get(f"{base}/libraryDocuments/{template_id}", headers=headers)
if not meta_resp.is_success:
return False
metadata = meta_resp.json()
# Form fields
fields_resp = await client.get(f"{base}/libraryDocuments/{template_id}/formFields", headers=headers)
form_fields = fields_resp.json() if fields_resp.is_success else {"fields": []}
# Documents list
docs_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents", headers=headers)
documents = docs_resp.json() if docs_resp.is_success else {"documents": []}
# Download first PDF
doc_list = documents.get("documents", [])
pdf_bytes = b""
if doc_list:
doc_id = doc_list[0].get("id")
pdf_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents/{doc_id}", headers=headers)
if pdf_resp.is_success:
pdf_bytes = pdf_resp.content
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump(metadata, f, indent=2)
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
json.dump(form_fields, f, indent=2)
with open(os.path.join(output_dir, "documents.json"), "w") as f:
json.dump(documents, f, indent=2)
if pdf_bytes:
pdf_name = doc_list[0].get("name", "document.pdf") if doc_list else "document.pdf"
if not pdf_name.endswith(".pdf"):
pdf_name += ".pdf"
with open(os.path.join(output_dir, pdf_name), "wb") as f:
f.write(pdf_bytes)
return True
async def _migrate_one(
adobe_id: str,
adobe_access_token: str,
docusign_access_token: str,
) -> dict:
"""Run the full pipeline for one Adobe template. Returns a result record."""
timestamp = datetime.now(timezone.utc).isoformat()
with tempfile.TemporaryDirectory() as tmpdir:
download_dir = os.path.join(tmpdir, "download")
output_dir = os.path.join(tmpdir, "output")
# 1. Download
ok = await _download_adobe_template(adobe_id, adobe_access_token, download_dir)
if not ok:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": None,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": "Adobe Sign download failed",
}
# Read template name from metadata
with open(os.path.join(download_dir, "metadata.json")) as f:
metadata = json.load(f)
template_name = metadata.get("name", adobe_id)
# 2. Compose DocuSign template JSON
composed_file = os.path.join(tmpdir, "docusign-template.json")
try:
compose_fn = _load_compose()
compose_fn(download_dir, composed_file)
except Exception as exc:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": f"Compose failed: {exc}",
}
if not os.path.exists(composed_file):
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": "Compose produced no output file",
}
# 3. Upload (upsert) to DocuSign using web session token
with open(composed_file) as f:
template_json = json.load(f)
ds_headers = {
"Authorization": f"Bearer {docusign_access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
list_url = f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates"
async with httpx.AsyncClient() as client:
# Find existing
list_resp = await client.get(
list_url, headers=ds_headers, params={"search_text": template_name, "count": 100}
)
existing_id = None
if list_resp.is_success:
raw = list_resp.json().get("envelopeTemplates") or list_resp.json().get("templates") or []
exact = [t for t in raw if t.get("name") == template_name]
if exact:
exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True)
existing_id = exact[0]["templateId"]
if existing_id:
up_resp = await client.put(
f"{list_url}/{existing_id}", headers=ds_headers, json=template_json
)
action = "updated"
template_id = existing_id
else:
up_resp = await client.post(list_url, headers=ds_headers, json=template_json)
action = "created"
template_id = up_resp.json().get("templateId") if up_resp.is_success else None
if not up_resp.is_success:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": f"DocuSign upload failed ({up_resp.status_code}): {up_resp.text[:200]}",
}
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": template_id,
"action": action,
"status": "success",
"error": None,
}
@router.post("")
async def run_migration(body: MigrateRequest, request: Request):
"""Migrate one or more Adobe Sign templates to DocuSign."""
session = get_session(request)
if not session.get("adobe_access_token"):
return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401)
if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
tasks = [
_migrate_one(
aid,
session["adobe_access_token"],
session["docusign_access_token"],
)
for aid in body.adobe_template_ids
]
results = await asyncio.gather(*tasks)
# Append to history
history = _load_history()
history.extend(results)
_save_history(history)
return {"results": list(results)}
@router.get("/history")
def migration_history():
"""Return all past migration records."""
return {"history": _load_history()}