adobe-to-docusign-migrator/web/routers/migrate.py

453 lines
16 KiB
Python

"""
web/routers/migrate.py
----------------------
Migration trigger and history endpoints.
POST /api/migrate — run the pipeline for one or more Adobe template IDs
POST /api/migrate/batch — batch migration with async progress tracking
GET /api/migrate/batch/{id} — poll batch job status
GET /api/migrate/history — return past migration records
"""
import asyncio
import json
import os
import sys
import tempfile
import uuid
from datetime import datetime, timezone
from typing import Dict, List, Optional
import httpx
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from web.config import settings
from web.session import get_session
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
router = APIRouter()
_HISTORY_FILE = os.path.join(
os.path.dirname(__file__), "..", "..", "migration-output", ".history.json"
)
# In-memory batch job store (keyed by job_id)
_batch_jobs: Dict[str, dict] = {}
class MigrationOptions(BaseModel):
dry_run: bool = False
overwrite_if_exists: bool = False
include_documents: bool = True
class MigrateRequest(BaseModel):
# Primary API (blueprint-aligned)
source_template_ids: Optional[List[str]] = None
target_folder: Optional[str] = None
options: MigrationOptions = MigrationOptions()
# Legacy field kept for backward compatibility
adobe_template_ids: Optional[List[str]] = None
def resolved_ids(self) -> List[str]:
return self.source_template_ids or self.adobe_template_ids or []
def _load_history() -> list:
if not os.path.exists(_HISTORY_FILE):
return []
with open(_HISTORY_FILE) as f:
return json.load(f)
def _save_history(records: list) -> None:
os.makedirs(os.path.dirname(_HISTORY_FILE), exist_ok=True)
with open(_HISTORY_FILE, "w") as f:
json.dump(records, f, indent=2)
def _load_compose():
"""Dynamically load compose_template from src/."""
import importlib.util
spec = importlib.util.spec_from_file_location(
"compose_docusign_template",
os.path.join(os.path.dirname(__file__), "..", "..", "src", "compose_docusign_template.py"),
)
compose_mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(compose_mod)
return compose_mod.compose_template
async def _download_adobe_template(template_id: str, access_token: str, output_dir: str) -> bool:
"""Download Adobe Sign template files into output_dir. Returns True on success."""
headers = {"Authorization": f"Bearer {access_token}"}
base = settings.adobe_sign_base_url
async with httpx.AsyncClient() as client:
meta_resp = await client.get(f"{base}/libraryDocuments/{template_id}", headers=headers)
if not meta_resp.is_success:
return False
metadata = meta_resp.json()
fields_resp = await client.get(f"{base}/libraryDocuments/{template_id}/formFields", headers=headers)
form_fields = fields_resp.json() if fields_resp.is_success else {"fields": []}
docs_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents", headers=headers)
documents = docs_resp.json() if docs_resp.is_success else {"documents": []}
doc_list = documents.get("documents", [])
pdf_bytes = b""
if doc_list:
doc_id = doc_list[0].get("id")
pdf_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents/{doc_id}", headers=headers)
if pdf_resp.is_success:
pdf_bytes = pdf_resp.content
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump(metadata, f, indent=2)
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
json.dump(form_fields, f, indent=2)
with open(os.path.join(output_dir, "documents.json"), "w") as f:
json.dump(documents, f, indent=2)
if pdf_bytes:
pdf_name = doc_list[0].get("name", "document.pdf") if doc_list else "document.pdf"
if not pdf_name.endswith(".pdf"):
pdf_name += ".pdf"
with open(os.path.join(output_dir, pdf_name), "wb") as f:
f.write(pdf_bytes)
return True
def _run_validation(download_dir: str) -> dict:
"""Run validation service on downloaded template, return summary."""
try:
from src.services.mapping_service import adobe_folder_to_normalized
from src.services.validation_service import validate_template
norm, _ = adobe_folder_to_normalized(download_dir)
result = validate_template(norm)
return {
"blockers": result.blockers,
"warnings": result.warnings,
"has_blockers": result.has_blockers(),
}
except Exception as exc:
return {"blockers": [], "warnings": [f"Validation skipped: {exc}"], "has_blockers": False}
async def _migrate_one(
adobe_id: str,
adobe_access_token: str,
docusign_access_token: str,
options: MigrationOptions,
) -> dict:
"""Run the full pipeline for one Adobe template. Returns a result record."""
timestamp = datetime.now(timezone.utc).isoformat()
with tempfile.TemporaryDirectory() as tmpdir:
download_dir = os.path.join(tmpdir, "download")
output_dir = os.path.join(tmpdir, "output")
# 1. Download
ok = await _download_adobe_template(adobe_id, adobe_access_token, download_dir)
if not ok:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": None,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": "Adobe Sign download failed",
"warnings": [],
"blockers": [],
"dry_run": options.dry_run,
}
with open(os.path.join(download_dir, "metadata.json")) as f:
metadata = json.load(f)
template_name = metadata.get("name", adobe_id)
# 2. Validate
validation = _run_validation(download_dir)
if validation["has_blockers"]:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": "blocked",
"status": "blocked",
"error": f"Validation blockers: {'; '.join(validation['blockers'])}",
"warnings": validation["warnings"],
"blockers": validation["blockers"],
"dry_run": options.dry_run,
}
# 3. Compose
composed_file = os.path.join(tmpdir, "docusign-template.json")
try:
compose_fn = _load_compose()
compose_fn(download_dir, composed_file)
except Exception as exc:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": f"Compose failed: {exc}",
"warnings": validation["warnings"],
"blockers": [],
"dry_run": options.dry_run,
}
if not os.path.exists(composed_file):
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": "Compose produced no output file",
"warnings": validation["warnings"],
"blockers": [],
"dry_run": options.dry_run,
}
# 4. Dry run — stop here, do not upload
if options.dry_run:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": "dry_run",
"status": "dry_run",
"error": None,
"warnings": validation["warnings"],
"blockers": [],
"dry_run": True,
}
# 5. Upload (upsert) to DocuSign
with open(composed_file) as f:
template_json = json.load(f)
if not options.include_documents:
for doc in template_json.get("documents", []):
doc.pop("documentBase64", None)
ds_headers = {
"Authorization": f"Bearer {docusign_access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
list_url = f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates"
async with httpx.AsyncClient() as client:
# Duplicate detection
list_resp = await client.get(
list_url, headers=ds_headers, params={"search_text": template_name, "count": 100}
)
existing_id = None
if list_resp.is_success:
raw = list_resp.json().get("envelopeTemplates") or list_resp.json().get("templates") or []
exact = [t for t in raw if t.get("name") == template_name]
if exact:
exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True)
existing_id = exact[0]["templateId"]
# Skip if already exists and overwrite is disabled
if existing_id and not options.overwrite_if_exists:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": existing_id,
"action": "skipped",
"status": "skipped",
"error": None,
"warnings": validation["warnings"] + ["Skipped: template already exists (overwrite_if_exists=false)"],
"blockers": [],
"dry_run": False,
}
if existing_id:
up_resp = await client.put(
f"{list_url}/{existing_id}", headers=ds_headers, json=template_json
)
action = "updated"
template_id = existing_id
else:
up_resp = await client.post(list_url, headers=ds_headers, json=template_json)
action = "created"
template_id = up_resp.json().get("templateId") if up_resp.is_success else None
if not up_resp.is_success:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": f"DocuSign upload failed ({up_resp.status_code}): {up_resp.text[:200]}",
"warnings": validation["warnings"],
"blockers": [],
"dry_run": False,
}
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": template_id,
"action": action,
"status": "success",
"error": None,
"warnings": validation["warnings"],
"blockers": [],
"dry_run": False,
}
@router.post("")
async def run_migration(body: MigrateRequest, request: Request):
"""Migrate one or more Adobe Sign templates to DocuSign."""
session = get_session(request)
if not session.get("adobe_access_token"):
return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401)
if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
ids = body.resolved_ids()
if not ids:
return JSONResponse({"error": "no template IDs provided"}, status_code=400)
tasks = [
_migrate_one(
aid,
session["adobe_access_token"],
session["docusign_access_token"],
body.options,
)
for aid in ids
]
results = await asyncio.gather(*tasks)
history = _load_history()
history.extend(results)
_save_history(history)
return {"results": list(results)}
@router.get("/history")
def migration_history():
"""Return all past migration records."""
return {"history": _load_history()}
# ---------------------------------------------------------------------------
# Batch migration
# ---------------------------------------------------------------------------
async def _run_batch_job(
job_id: str,
ids: List[str],
adobe_token: str,
ds_token: str,
options: MigrationOptions,
) -> None:
"""Background coroutine that processes a batch job and updates _batch_jobs."""
job = _batch_jobs[job_id]
job["status"] = "running"
results = []
for i, adobe_id in enumerate(ids):
job["progress"] = {"completed": i, "total": len(ids), "current_id": adobe_id}
result = await _migrate_one(adobe_id, adobe_token, ds_token, options)
# Retry once on transient failures (network errors, not validation blockers)
if result["status"] == "failed" and "upload failed" in (result.get("error") or ""):
result = await _migrate_one(adobe_id, adobe_token, ds_token, options)
if result["status"] != "failed":
result["retried"] = True
results.append(result)
job["results"] = results
# Persist to history
history = _load_history()
history.extend(results)
_save_history(history)
success = sum(1 for r in results if r["status"] == "success")
failed = sum(1 for r in results if r["status"] in ("failed", "blocked"))
skipped = sum(1 for r in results if r["status"] == "skipped")
dry_runs = sum(1 for r in results if r["status"] == "dry_run")
job["status"] = "completed"
job["progress"] = {"completed": len(ids), "total": len(ids), "current_id": None}
job["summary"] = {
"total": len(ids),
"success": success,
"failed": failed,
"skipped": skipped,
"dry_run": dry_runs,
}
@router.post("/batch")
async def run_batch_migration(body: MigrateRequest, request: Request):
"""
Start an async batch migration job. Returns a job_id immediately.
Poll GET /api/migrate/batch/{job_id} for status.
"""
session = get_session(request)
if not session.get("adobe_access_token"):
return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401)
if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
ids = body.resolved_ids()
if not ids:
return JSONResponse({"error": "no template IDs provided"}, status_code=400)
job_id = str(uuid.uuid4())
_batch_jobs[job_id] = {
"job_id": job_id,
"status": "queued",
"total": len(ids),
"results": [],
"progress": {"completed": 0, "total": len(ids), "current_id": None},
"summary": None,
"created_at": datetime.now(timezone.utc).isoformat(),
}
asyncio.create_task(
_run_batch_job(
job_id, ids,
session["adobe_access_token"],
session["docusign_access_token"],
body.options,
)
)
return {"job_id": job_id, "total": len(ids), "status": "queued"}
@router.get("/batch/{job_id}")
def get_batch_status(job_id: str):
"""Poll the status of a batch migration job."""
job = _batch_jobs.get(job_id)
if not job:
return JSONResponse({"error": "batch job not found"}, status_code=404)
return job