feat: idempotent upload + FastAPI web UI with full test coverage

Phase 1 — Idempotent upload:
- upload_docusign_template.py now upserts: PUT if template with same name
  exists (most recently modified), POST otherwise
- --force-create flag to bypass upsert

Phase 2-6 — FastAPI web UI:
- web/app.py: FastAPI app with /health, static file serving
- web/routers/auth.py: Adobe Sign + DocuSign OAuth start/callback/disconnect
- web/routers/templates.py: template listing + migration status badges
  (not_migrated / migrated / needs_update)
- web/routers/migrate.py: POST /api/migrate pipeline + GET /api/migrate/history
- web/static/: vanilla HTML/CSS/JS side-by-side template browser UI

Phase 7 — Tests (29/29 passing):
- test_upload_upsert.py: 4 upsert unit tests
- test_api_health/auth/templates/migrate.py: full API coverage
- test_e2e.py: 7-step full pipeline end-to-end test
- test_regression.py: compose output vs snapshots for 3 real templates
- conftest.py: --update-snapshots CLI option

Docs: IMPLEMENTATION-PLAN.md, updated EXECUTION-BOARD.md + architecture.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Paul Huliganga 2026-04-17 14:47:27 -04:00
parent 2d167421b2
commit 51f532f452
27 changed files with 3145 additions and 28 deletions

5
conftest.py Normal file
View File

@ -0,0 +1,5 @@
def pytest_addoption(parser):
parser.addoption(
"--update-snapshots", action="store_true", default=False,
help="Overwrite regression snapshot files with current compose output"
)

267
docs/IMPLEMENTATION-PLAN.md Normal file
View File

@ -0,0 +1,267 @@
# Implementation Plan — Adobe → DocuSign Migrator v2
*Created: 2026-04-17*
---
## Objective
Extend the CLI migration pipeline with:
1. **Idempotent upload** — update the most recently modified DocuSign template with the same name instead of always creating a new one.
2. **Web UI** — browser-based app that lets users authenticate to both platforms, browse templates side-by-side, and run migrations with live feedback.
---
## Architecture Overview
```
adobe-to-docusign-migrator/
├── src/ # Core pipeline (existing)
│ ├── compose_docusign_template.py
│ ├── upload_docusign_template.py ← Phase 1: upsert logic added
│ ├── adobe_api.py
│ ├── docusign_auth.py
│ └── ...
├── web/ # New — FastAPI web app
│ ├── app.py # FastAPI entrypoint
│ ├── config.py # Env/settings
│ ├── session.py # Session middleware
│ ├── routers/
│ │ ├── auth.py # Adobe + DocuSign OAuth
│ │ ├── templates.py # Listing + status API
│ │ └── migrate.py # Migration trigger + history
│ └── static/
│ ├── index.html # Main SPA page
│ ├── app.js # Vanilla JS app
│ └── style.css
├── tests/
│ ├── test_mapping.py # Existing field-mapping unit tests
│ ├── test_upload_upsert.py # Phase 1: upsert logic
│ ├── test_api_health.py # Phase 2: health endpoint
│ ├── test_api_auth.py # Phase 3: auth endpoints
│ ├── test_api_templates.py # Phase 4: template listing
│ ├── test_api_migrate.py # Phase 5: migration API
│ └── test_e2e.py # Phase 7: full pipeline e2e
└── docs/
├── IMPLEMENTATION-PLAN.md ← this file
└── agent-harness/
└── EXECUTION-BOARD.md
```
---
## Tech Stack
| Layer | Choice | Reason |
|---|---|---|
| Backend | FastAPI | Matches existing Python; async; auto-generates OpenAPI docs |
| Sessions | `starlette-sessions` + `itsdangerous` | Lightweight, no DB needed |
| Frontend | Vanilla HTML/CSS/JS | No build tooling; straightforward for this scope |
| Testing | `pytest` + `httpx` + `respx` | FastAPI's recommended test stack; respx for mocking HTTP |
| HTTP mocking | `respx` | Intercepts `httpx` calls for Adobe/DocuSign API mocks |
---
## Phase 1 — Idempotent Upload
**Goal:** `upload_docusign_template.py` should update the most recently modified DocuSign template with the same name, rather than always creating a new one.
**Logic:**
1. After loading the template JSON, extract its `name`.
2. Call `GET /v2.1/accounts/{accountId}/templates?search_text={name}` to list matches.
3. Filter to exact name matches; sort by `lastModified` descending; take the first.
4. If found: `PUT /v2.1/accounts/{accountId}/templates/{templateId}` (update).
5. If not found: `POST /v2.1/accounts/{accountId}/templates` (create).
6. Print `Updated template {id}` or `Created template {id}`.
7. Add `--force-create` flag to bypass upsert and always create.
**Tests (`tests/test_upload_upsert.py`):**
- `test_creates_when_no_match` — no existing templates; POST called.
- `test_updates_most_recent_when_match` — two existing templates with same name; PUT called on newer one.
- `test_force_create_bypasses_upsert``--force-create` always POSTs.
- `test_partial_name_match_ignored` — template name contains search term but isn't exact; still creates.
---
## Phase 2 — FastAPI Backend Foundation
**Goal:** Runnable FastAPI app with health endpoint, env config, and session middleware.
**Endpoints:**
- `GET /health``{"status": "ok", "version": "2.0"}`
**Tests (`tests/test_api_health.py`):**
- `test_health_returns_200`
- `test_health_response_shape`
---
## Phase 3 — Auth Endpoints
**Goal:** Users can connect to Adobe Sign and DocuSign from the browser; tokens stored in server-side session.
**Adobe Sign (OAuth 2.0 Authorization Code):**
- `GET /api/auth/adobe/start` → redirect to Adobe Sign OAuth URL
- `GET /api/auth/adobe/callback?code=...` → exchange code for tokens; store in session
- `GET /api/auth/adobe/disconnect` → clear Adobe tokens from session
**DocuSign (OAuth 2.0 Authorization Code — demo sandbox):**
- `GET /api/auth/docusign/start` → redirect to DocuSign OAuth URL
- `GET /api/auth/docusign/callback?code=...` → exchange code for tokens; store in session
- `GET /api/auth/docusign/disconnect` → clear DocuSign tokens from session
**Status:**
- `GET /api/auth/status``{"adobe": true/false, "docusign": true/false}`
**Tests (`tests/test_api_auth.py`):**
- `test_status_unauthenticated` — both false on fresh session.
- `test_adobe_callback_stores_token` — mock token exchange; session updated.
- `test_docusign_callback_stores_token` — same for DocuSign.
- `test_disconnect_clears_token` — after disconnect, status shows false.
---
## Phase 4 — Template Listing API
**Goal:** Expose Adobe and DocuSign template lists; compute per-template migration status.
**Endpoints:**
- `GET /api/templates/adobe` → list of Adobe Sign library documents
- `GET /api/templates/docusign` → list of DocuSign templates
- `GET /api/templates/status` → merged view: each Adobe template tagged as:
- `not_migrated` — no DocuSign template with same name
- `migrated` — at least one DocuSign template with exact name match
- `needs_update` — Adobe template modified after the matched DocuSign template
**Tests (`tests/test_api_templates.py`):**
- `test_adobe_list_requires_auth` — 401 if not authenticated.
- `test_adobe_list_returns_templates` — mock Adobe API; correct shape.
- `test_docusign_list_returns_templates` — mock DocuSign API.
- `test_status_not_migrated` — Adobe template with no DS match → `not_migrated`.
- `test_status_migrated` — name match exists → `migrated`.
- `test_status_needs_update` — Adobe modified after DS template → `needs_update`.
---
## Phase 5 — Migration API
**Goal:** Trigger migration of selected Adobe templates and retrieve history.
**Endpoints:**
- `POST /api/migrate` — body: `{"adobe_template_ids": ["id1", "id2"]}`
- Downloads each template, runs `compose_docusign_template.py`, uploads via upsert
- Returns `{"results": [{"adobe_id": "...", "docusign_id": "...", "status": "created|updated|failed", "error": null}]}`
- `GET /api/migrate/history` — reads `migration-output/.history.json`; returns past runs
**History record schema:**
```json
{
"timestamp": "2026-04-17T10:30:00Z",
"adobe_template_name": "NDA",
"adobe_template_id": "CBJ...",
"docusign_template_id": "7dfd...",
"action": "created|updated",
"status": "success|failed",
"error": null
}
```
**Tests (`tests/test_api_migrate.py`):**
- `test_migrate_requires_auth` — 401 if not authenticated.
- `test_migrate_single_template_creates` — mock Adobe download + DS upload (no existing); returns created.
- `test_migrate_single_template_updates` — mock with existing DS template; returns updated.
- `test_migrate_records_history` — after run, history file updated.
- `test_history_returns_past_runs` — GET history returns written records.
- `test_migrate_handles_partial_failure` — one template fails; others succeed; partial results returned.
---
## Phase 6 — Frontend
**Goal:** Single-page app served at `/`; no build step.
**Layout:**
```
┌─────────────────────────────────────────────────────┐
│ Adobe Sign → DocuSign Migrator [auth]│
├───────────────────────┬─────────────────────────────┤
│ Adobe Sign Templates │ DocuSign Templates │
│ [connect] │ [connect] │
│ ───────────────── │ ───────────────────────── │
│ ● NDA [●] │ ● NDA │
│ ○ Sales Agmt [○] │ ● David Tag Demo │
│ ○ Rob Test [○] │ │
│ │ │
│ [Migrate Selected] │ │
└───────────────────────┴─────────────────────────────┘
```
**Status badges:**
- Green dot = Migrated
- Yellow dot = Needs Update
- Red dot = Not Migrated
**Migrate flow:**
1. User checks templates to migrate.
2. Clicks "Migrate Selected."
3. Progress shown inline per template (spinner → check/error).
4. History section at bottom shows past runs.
---
## Phase 7 — End-to-End & Regression Tests
**Goal:** Ensure the full pipeline works together and existing behaviour doesn't regress.
**`tests/test_e2e.py`:**
- `test_full_migration_flow` — using TestClient + respx mocks:
1. Connect Adobe (mock OAuth callback)
2. Connect DocuSign (mock OAuth callback)
3. GET /api/templates/status → at least one `not_migrated`
4. POST /api/migrate → status `created`
5. GET /api/templates/status → now `migrated`
6. POST /api/migrate again → status `updated`
7. GET /api/migrate/history → two entries
**`tests/test_regression.py`:**
- Runs `compose_docusign_template.py` on all fixtures in `sample-templates/`
- Validates output against expected JSON snapshots in `tests/fixtures/expected/`
- Any field type regression fails the test
- Run via `pytest tests/test_regression.py` — no live API calls needed
---
## Running Tests
```bash
# All tests
pytest tests/ -v
# Unit only (no live API)
pytest tests/ -v -m "not integration"
# Regression only
pytest tests/test_regression.py -v
# With coverage
pytest tests/ --cov=src --cov=web --cov-report=term-missing
```
---
## Dependencies to Add
```
fastapi
uvicorn[standard]
starlette-sessions
itsdangerous
httpx
respx
pytest-asyncio
pytest-cov
```
---
*Last updated: 2026-04-17*

View File

@ -0,0 +1,93 @@
# Execution Board (Living Kanban)
*Last updated: 2026-04-17*
---
## Completed (v1 — CLI Pipeline)
- [x] Adobe Sign OAuth setup + token auto-refresh (`src/auth_adobe.py`, `src/adobe_api.py`) ✅
- [x] Template download pipeline — metadata, fields, docs, PDF (`src/download_templates.py`) ✅
- [x] Field type mapping — all major Adobe field types → DocuSign tabs ✅ (see `field-mapping.md`)
- [x] Coordinate system fix — y passthrough (top-origin, no conversion needed) ✅
- [x] Conditional logic mapping → `conditionalParentLabel/Value` ✅ (see `validation/conditional-logic-eval.md`)
- [x] DocuSign template JSON composition (`src/compose_docusign_template.py`) ✅
- [x] DocuSign JWT upload (`src/upload_docusign_template.py`) ✅
- [x] Regression checklist for all field types ✅ (see `tests/FIELD-TYPE-REGRESSION.md`)
- [x] End-to-end round-trip test (3 real Adobe templates → DocuSign) ✅
---
## Phase 1 — Idempotent Upload ✅ (2026-04-17)
- [x] Add `find_existing_template(name)` to `upload_docusign_template.py` — lists DS templates by name, returns most-recently-modified match
- [x] Change `upload_template()` to upsert: PUT if match found, POST if not
- [x] Add `--force-create` CLI flag to bypass upsert
- [x] Write `tests/test_upload_upsert.py` — 4 tests passing
---
## Phase 2 — FastAPI Backend Foundation ✅ (2026-04-17)
- [x] Add new dependencies to `requirements.txt`
- [x] Create `web/` directory structure (app.py, config.py, session.py, routers/, static/)
- [x] Implement `GET /health` endpoint
- [x] Write `tests/test_api_health.py` — 2 tests passing
---
## Phase 3 — Auth Endpoints ✅ (2026-04-17)
- [x] Implement Adobe Sign OAuth start + callback + disconnect in `web/routers/auth.py`
- [x] Implement DocuSign OAuth start + callback + disconnect
- [x] Implement `GET /api/auth/status`
- [x] Write `tests/test_api_auth.py` — 4 tests passing
---
## Phase 4 — Template Listing API ✅ (2026-04-17)
- [x] Implement `GET /api/templates/adobe` in `web/routers/templates.py`
- [x] Implement `GET /api/templates/docusign`
- [x] Implement `GET /api/templates/status` — computes `not_migrated / migrated / needs_update`
- [x] Write `tests/test_api_templates.py` — 7 tests passing
---
## Phase 5 — Migration API ✅ (2026-04-17)
- [x] Implement `POST /api/migrate` in `web/routers/migrate.py` — download → compose → upsert pipeline
- [x] Implement `GET /api/migrate/history` — reads/writes `migration-output/.history.json`
- [x] Write `tests/test_api_migrate.py` — 7 tests passing
---
## Phase 6 — Frontend ✅ (2026-04-17)
- [x] Create `web/static/index.html` — side-by-side template browser layout
- [x] Create `web/static/app.js` — auth status check, template listing, migrate flow, history
- [x] Create `web/static/style.css` — status badges, layout
---
## Phase 7 — End-to-End & Regression ✅ (2026-04-17)
- [x] Write `tests/test_e2e.py` — 7-step full pipeline test, 1 test passing
- [x] Write `tests/test_regression.py` — compose pipeline vs snapshots, 4 tests passing
- [x] Create `tests/fixtures/expected/` — snapshot JSONs for David Tag Demo, NDA, Rob Test
- [x] Full suite: **29/29 passing**
---
## Gitea
- [x] Committed and pushed all changes (2026-04-17)
---
## Results & Lessons Learned
- (2026-04-14) NDA, David Tag Demo, Rob Test all converted cleanly
- (2026-04-15) Coordinate bug fixed — y is top-origin in both platforms, no conversion needed
- (2026-04-15) Paul Adobe Template created via API; Company/Title fields require manual UI fix (API limitation)
- (2026-04-17) v2 planning complete — idempotent upload + web UI implementation begins

View File

@ -39,4 +39,30 @@ graph TD
--- ---
*Update as architecture/requirements change. Generated by Cleo (2026-04-14).* ## v2 Architecture — Web UI (2026-04-17)
The pipeline is extended with a FastAPI web layer that wraps all existing src/ modules.
```mermaid
graph TD
Browser -->|HTTP| FastAPI
FastAPI -->|OAuth| AdobeSign[Adobe Sign API]
FastAPI -->|OAuth/JWT| DocuSign[DocuSign API]
FastAPI -->|calls| Compose[compose_docusign_template.py]
FastAPI -->|calls| Upload[upload_docusign_template.py]
Upload -->|upsert| DocuSign
FastAPI -->|reads/writes| History[migration-output/.history.json]
```
**New layers:**
- `web/routers/auth.py` — browser-initiated OAuth for Adobe Sign and DocuSign
- `web/routers/templates.py` — template listing + migration status computation
- `web/routers/migrate.py` — triggers pipeline; records history
- `web/static/` — vanilla HTML/JS SPA (no build step)
**Idempotent Upload (v2):**
`upload_docusign_template.py` now searches for an existing DocuSign template by exact name match and updates the most recently modified one (PUT). Falls back to create (POST) if no match. `--force-create` flag bypasses upsert.
---
*Update as architecture/requirements change. Generated by Cleo (2026-04-14). Updated 2026-04-17.*

View File

@ -3,3 +3,15 @@ python-dotenv
pydantic pydantic
PyJWT>=2.0 PyJWT>=2.0
cryptography cryptography
# Web UI
fastapi
uvicorn[standard]
itsdangerous
httpx
# Testing
responses
respx
pytest-asyncio
pytest-cov

View File

@ -4,8 +4,13 @@ upload_docusign_template.py
Uploads a DocuSign template JSON file to DocuSign via the REST API. Uploads a DocuSign template JSON file to DocuSign via the REST API.
Authenticates using JWT grant (no Node.js dependency required). Authenticates using JWT grant (no Node.js dependency required).
By default uses upsert: if a template with the same name already exists,
the most recently modified one is updated (PUT). Use --force-create to
always create a new template instead.
Usage: Usage:
python3 src/upload_docusign_template.py --file migration-output/<name>/docusign-template.json python3 src/upload_docusign_template.py --file migration-output/<name>/docusign-template.json
python3 src/upload_docusign_template.py --file <path> --force-create
First-time setup: First-time setup:
python3 src/docusign_auth.py --consent # grant consent once python3 src/docusign_auth.py --consent # grant consent once
@ -20,6 +25,7 @@ import argparse
import json import json
import os import os
import sys import sys
from typing import Optional
import requests import requests
from dotenv import load_dotenv from dotenv import load_dotenv
@ -30,10 +36,61 @@ sys.path.insert(0, os.path.dirname(__file__))
from docusign_auth import get_access_token from docusign_auth import get_access_token
def upload_template(file_path: str) -> str: def _make_headers(token: str) -> dict:
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _refresh_token_once(headers: dict) -> dict:
"""Clear cached token and return new headers with a fresh token."""
os.environ.pop("DOCUSIGN_ACCESS_TOKEN", None)
os.environ.pop("DOCUSIGN_TOKEN_EXPIRY", None)
return _make_headers(get_access_token())
def find_existing_template(
name: str,
account_id: str,
base_url: str,
headers: dict,
) -> Optional[str]:
""" """
POST a template JSON file to the DocuSign Templates API. Search DocuSign for templates matching `name` exactly.
Returns the created templateId. Returns the templateId of the most recently modified match, or None.
"""
url = f"{base_url}/v2.1/accounts/{account_id}/templates"
resp = requests.get(url, headers=headers, params={"search_text": name, "count": 100})
if resp.status_code == 401:
headers.update(_refresh_token_once(headers))
resp = requests.get(url, headers=headers, params={"search_text": name, "count": 100})
if not resp.ok:
return None
data = resp.json()
templates = data.get("envelopeTemplates") or data.get("templates") or []
# Exact name match only — search_text is a substring filter on DocuSign's side
exact = [t for t in templates if t.get("name") == name]
if not exact:
return None
# Most recently modified first
exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True)
return exact[0]["templateId"]
def upload_template(file_path: str, force_create: bool = False) -> str:
"""
Upsert a template JSON file to DocuSign.
- If a template with the same name exists and force_create is False,
the most recently modified one is updated (PUT).
- Otherwise a new template is created (POST).
Returns the templateId.
""" """
if not os.path.exists(file_path): if not os.path.exists(file_path):
print(f"ERROR: File not found: {file_path}") print(f"ERROR: File not found: {file_path}")
@ -49,35 +106,48 @@ def upload_template(file_path: str) -> str:
print("ERROR: DOCUSIGN_ACCOUNT_ID must be set in .env") print("ERROR: DOCUSIGN_ACCOUNT_ID must be set in .env")
sys.exit(1) sys.exit(1)
token = get_access_token() headers = _make_headers(get_access_token())
headers = { template_name = template.get("name", file_path)
"Authorization": f"Bearer {token}", print(f"Uploading '{template_name}' to DocuSign...")
"Content-Type": "application/json",
"Accept": "application/json",
}
url = f"{base_url}/v2.1/accounts/{account_id}/templates" existing_id: Optional[str] = None
print(f"Uploading '{template.get('name', file_path)}' to DocuSign...") if not force_create:
existing_id = find_existing_template(template_name, account_id, base_url, headers)
resp = requests.post(url, headers=headers, json=template) if existing_id:
# Update existing template
url = f"{base_url}/v2.1/accounts/{account_id}/templates/{existing_id}"
resp = requests.put(url, headers=headers, json=template)
if resp.status_code == 401: if resp.status_code == 401:
# Token may have just expired — clear cache and retry once headers = _refresh_token_once(headers)
os.environ.pop("DOCUSIGN_ACCESS_TOKEN", None) resp = requests.put(url, headers=headers, json=template)
os.environ.pop("DOCUSIGN_TOKEN_EXPIRY", None)
token = get_access_token() if not resp.ok:
headers["Authorization"] = f"Bearer {token}" print(f"ERROR: Update failed ({resp.status_code})")
print(resp.text)
sys.exit(1)
print(f"Template updated: {existing_id}")
return existing_id
else:
# Create new template
url = f"{base_url}/v2.1/accounts/{account_id}/templates"
resp = requests.post(url, headers=headers, json=template) resp = requests.post(url, headers=headers, json=template)
if not resp.ok: if resp.status_code == 401:
print(f"ERROR: Upload failed ({resp.status_code})") headers = _refresh_token_once(headers)
print(resp.text) resp = requests.post(url, headers=headers, json=template)
sys.exit(1)
result = resp.json() if not resp.ok:
template_id = result.get("templateId") print(f"ERROR: Upload failed ({resp.status_code})")
print(f"Template created: {template_id}") print(resp.text)
return template_id sys.exit(1)
result = resp.json()
template_id = result.get("templateId")
print(f"Template created: {template_id}")
return template_id
def main(): def main():
@ -88,8 +158,12 @@ def main():
"--file", required=True, "--file", required=True,
help="Path to the docusign-template.json file to upload" help="Path to the docusign-template.json file to upload"
) )
parser.add_argument(
"--force-create", action="store_true",
help="Always create a new template instead of updating an existing one"
)
args = parser.parse_args() args = parser.parse_args()
upload_template(args.file) upload_template(args.file, force_create=args.force_create)
if __name__ == "__main__": if __name__ == "__main__":

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

92
tests/test_api_auth.py Normal file
View File

@ -0,0 +1,92 @@
"""
tests/test_api_auth.py
----------------------
Tests for /api/auth/* endpoints.
All external OAuth calls are mocked with respx.
"""
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from web.app import app
client = TestClient(app, raise_server_exceptions=True)
def test_status_unauthenticated():
"""Fresh session → both platforms disconnected."""
resp = client.get("/api/auth/status", cookies={})
assert resp.status_code == 200
data = resp.json()
assert data["adobe"] is False
assert data["docusign"] is False
@respx.mock
def test_adobe_callback_stores_token():
"""Successful Adobe OAuth callback → session has adobe_access_token."""
respx.post("https://api.eu2.adobesign.com/oauth/v2/token").mock(
return_value=httpx.Response(200, json={
"access_token": "adobe-test-token",
"refresh_token": "adobe-refresh",
})
)
resp = client.get("/api/auth/adobe/callback?code=authcode123", follow_redirects=False)
# Should redirect to /
assert resp.status_code in (302, 307)
# Session cookie should now contain the token
session_cookie = resp.cookies.get("migrator_session")
assert session_cookie is not None
# Follow up with status check using the same session cookie
status_resp = client.get("/api/auth/status", cookies={"migrator_session": session_cookie})
assert status_resp.json()["adobe"] is True
@respx.mock
def test_docusign_callback_stores_token():
"""Successful DocuSign OAuth callback → session has docusign_access_token."""
from web.config import settings
respx.post(f"https://{settings.docusign_auth_server}/oauth/token").mock(
return_value=httpx.Response(200, json={
"access_token": "ds-test-token",
"refresh_token": "ds-refresh",
})
)
resp = client.get("/api/auth/docusign/callback?code=dscode123", follow_redirects=False)
assert resp.status_code in (302, 307)
session_cookie = resp.cookies.get("migrator_session")
assert session_cookie is not None
status_resp = client.get("/api/auth/status", cookies={"migrator_session": session_cookie})
assert status_resp.json()["docusign"] is True
@respx.mock
def test_disconnect_clears_token():
"""After disconnect, status shows platform as disconnected."""
# First connect Adobe
respx.post("https://api.eu2.adobesign.com/oauth/v2/token").mock(
return_value=httpx.Response(200, json={"access_token": "tok", "refresh_token": "ref"})
)
connect_resp = client.get("/api/auth/adobe/callback?code=abc", follow_redirects=False)
session_cookie = connect_resp.cookies["migrator_session"]
# Verify connected
status_resp = client.get("/api/auth/status", cookies={"migrator_session": session_cookie})
assert status_resp.json()["adobe"] is True
# Disconnect
disc_resp = client.get("/api/auth/adobe/disconnect", cookies={"migrator_session": session_cookie})
assert disc_resp.status_code == 200
new_cookie = disc_resp.cookies.get("migrator_session", session_cookie)
# Verify disconnected
status_resp2 = client.get("/api/auth/status", cookies={"migrator_session": new_cookie})
assert status_resp2.json()["adobe"] is False

25
tests/test_api_health.py Normal file
View File

@ -0,0 +1,25 @@
"""
tests/test_api_health.py
------------------------
Tests for the /health endpoint and basic app startup.
"""
import pytest
from fastapi.testclient import TestClient
from web.app import app
client = TestClient(app)
def test_health_returns_200():
resp = client.get("/health")
assert resp.status_code == 200
def test_health_response_shape():
resp = client.get("/health")
data = resp.json()
assert data["status"] == "ok"
assert "version" in data
assert data["version"] == "2.0"

239
tests/test_api_migrate.py Normal file
View File

@ -0,0 +1,239 @@
"""
tests/test_api_migrate.py
--------------------------
Tests for /api/migrate (POST) and /api/migrate/history (GET).
All Adobe Sign and DocuSign HTTP calls are mocked with respx.
The compose pipeline is mocked at the module level to avoid PDF/file I/O.
"""
import json
import os
import tempfile
from unittest.mock import patch, MagicMock
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from web.app import app
from web.session import _serializer, _COOKIE_NAME
import web.routers.migrate as migrate_module
client = TestClient(app, raise_server_exceptions=True)
ADOBE_BASE = "https://api.eu2.adobesign.com/api/rest/v6"
DS_BASE = "https://demo.docusign.net/restapi"
DS_ACCOUNT = "test-account-id"
TEMPLATE_NAME = "Test NDA"
ADOBE_ID = "adobe-123"
DS_NEW_ID = "ds-new-456"
DS_EXISTING_ID = "ds-existing-789"
def _full_session():
return _serializer.dumps({
"adobe_access_token": "adobe-tok",
"docusign_access_token": "ds-tok",
})
def _adobe_only_session():
return _serializer.dumps({"adobe_access_token": "adobe-tok"})
@pytest.fixture(autouse=True)
def patch_settings(monkeypatch):
import web.config as cfg
monkeypatch.setattr(cfg.settings, "docusign_account_id", DS_ACCOUNT)
monkeypatch.setattr(cfg.settings, "docusign_base_url", DS_BASE)
monkeypatch.setattr(cfg.settings, "adobe_sign_base_url", ADOBE_BASE)
@pytest.fixture(autouse=True)
def temp_history(tmp_path, monkeypatch):
"""Redirect history file to a temp path for each test."""
history_path = str(tmp_path / ".history.json")
monkeypatch.setattr(migrate_module, "_HISTORY_FILE", history_path)
return history_path
def _mock_compose(template_dir: str, output_path: str):
"""Write a minimal DocuSign template JSON so the pipeline continues."""
with open(output_path, "w") as f:
json.dump({"name": TEMPLATE_NAME, "description": "mocked"}, f)
def _mock_download(template_id, access_token, output_dir):
"""Write stub Adobe Sign files so compose has something to read."""
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump({"name": TEMPLATE_NAME, "id": template_id}, f)
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
json.dump({"fields": []}, f)
with open(os.path.join(output_dir, "documents.json"), "w") as f:
json.dump({"documents": []}, f)
return True
def test_migrate_requires_auth():
"""No session → 401."""
resp = client.post("/api/migrate", json={"adobe_template_ids": [ADOBE_ID]}, cookies={})
assert resp.status_code == 401
def test_migrate_requires_docusign_auth():
"""Only Adobe auth → 401."""
resp = client.post(
"/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: _adobe_only_session()},
)
assert resp.status_code == 401
@respx.mock
def test_migrate_single_template_creates():
"""No existing DS template → POST creates; result action=created."""
# DS list: no match
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
# DS create
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(201, json={"templateId": DS_NEW_ID})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
):
resp = client.post(
"/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
results = resp.json()["results"]
assert len(results) == 1
assert results[0]["action"] == "created"
assert results[0]["docusign_template_id"] == DS_NEW_ID
assert results[0]["status"] == "success"
@respx.mock
def test_migrate_single_template_updates():
"""Existing DS template with same name → PUT updates; result action=updated."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": DS_EXISTING_ID, "name": TEMPLATE_NAME, "lastModified": "2026-04-10T00:00:00Z"},
]
})
)
respx.put(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates/{DS_EXISTING_ID}").mock(
return_value=httpx.Response(200, json={})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
):
resp = client.post(
"/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
results = resp.json()["results"]
assert results[0]["action"] == "updated"
assert results[0]["docusign_template_id"] == DS_EXISTING_ID
@respx.mock
def test_migrate_records_history(temp_history):
"""After successful migration, history file is written."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(201, json={"templateId": DS_NEW_ID})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
):
client.post(
"/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: _full_session()},
)
assert os.path.exists(temp_history)
with open(temp_history) as f:
history = json.load(f)
assert len(history) == 1
assert history[0]["adobe_template_id"] == ADOBE_ID
def test_history_returns_past_runs(temp_history):
"""GET /api/migrate/history returns written records."""
records = [
{"timestamp": "2026-04-17T10:00:00Z", "adobe_template_id": "a1", "status": "success"},
{"timestamp": "2026-04-17T11:00:00Z", "adobe_template_id": "a2", "status": "failed"},
]
with open(temp_history, "w") as f:
json.dump(records, f)
resp = client.get("/api/migrate/history")
assert resp.status_code == 200
assert len(resp.json()["history"]) == 2
@respx.mock
def test_migrate_handles_partial_failure():
"""One template fails (download error), others succeed."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(201, json={"templateId": DS_NEW_ID})
)
call_count = {"n": 0}
async def mock_download_partial(template_id, access_token, output_dir):
call_count["n"] += 1
if call_count["n"] == 1:
return False # first template fails
return await _async_wrap(_mock_download)(template_id, access_token, output_dir)
with (
patch.object(migrate_module, "_download_adobe_template", new=mock_download_partial),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
):
resp = client.post(
"/api/migrate",
json={"adobe_template_ids": ["fail-id", ADOBE_ID]},
cookies={_COOKIE_NAME: _full_session()},
)
assert resp.status_code == 200
results = resp.json()["results"]
assert results[0]["status"] == "failed"
assert results[1]["status"] == "success"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _async_wrap(sync_fn):
"""Wrap a sync function to be awaitable (for patching async functions in tests)."""
import asyncio
async def wrapper(*args, **kwargs):
return sync_fn(*args, **kwargs)
return wrapper

157
tests/test_api_templates.py Normal file
View File

@ -0,0 +1,157 @@
"""
tests/test_api_templates.py
----------------------------
Tests for /api/templates/* endpoints.
All external API calls are mocked with respx.
"""
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from web.app import app
from web.session import _serializer, _COOKIE_NAME
client = TestClient(app, raise_server_exceptions=True)
def _make_session_cookie(data: dict) -> str:
"""Build a valid signed session cookie for testing."""
return _serializer.dumps(data)
def _adobe_session():
return _make_session_cookie({"adobe_access_token": "adobe-tok", "docusign_access_token": "ds-tok"})
def _ds_only_session():
return _make_session_cookie({"docusign_access_token": "ds-tok"})
def _adobe_only_session():
return _make_session_cookie({"adobe_access_token": "adobe-tok"})
ADOBE_BASE = "https://api.eu2.adobesign.com/api/rest/v6"
DS_BASE = "https://demo.docusign.net/restapi"
DS_ACCOUNT = "test-account-id"
@pytest.fixture(autouse=True)
def patch_account_id(monkeypatch):
import web.config as cfg
monkeypatch.setattr(cfg.settings, "docusign_account_id", DS_ACCOUNT)
monkeypatch.setattr(cfg.settings, "docusign_base_url", DS_BASE)
monkeypatch.setattr(cfg.settings, "adobe_sign_base_url", ADOBE_BASE)
def test_adobe_list_requires_auth():
"""No session → 401."""
resp = client.get("/api/templates/adobe", cookies={})
assert resp.status_code == 401
def test_docusign_list_requires_auth():
"""No session → 401."""
resp = client.get("/api/templates/docusign", cookies={})
assert resp.status_code == 401
@respx.mock
def test_adobe_list_returns_templates():
"""Authenticated → list of templates returned."""
respx.get(f"{ADOBE_BASE}/libraryDocuments").mock(
return_value=httpx.Response(200, json={
"libraryDocumentList": [
{"id": "abc1", "name": "NDA", "modifiedDate": "2026-04-10", "sharingMode": "USER"},
{"id": "abc2", "name": "Sales Agmt", "modifiedDate": "2026-04-12", "sharingMode": "USER"},
]
})
)
resp = client.get("/api/templates/adobe", cookies={_COOKIE_NAME: _adobe_only_session()})
assert resp.status_code == 200
data = resp.json()
assert len(data["templates"]) == 2
assert data["templates"][0]["name"] == "NDA"
@respx.mock
def test_docusign_list_returns_templates():
"""Authenticated → list of DocuSign templates returned."""
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": "ds1", "name": "NDA", "lastModified": "2026-04-11"},
]
})
)
resp = client.get("/api/templates/docusign", cookies={_COOKIE_NAME: _ds_only_session()})
assert resp.status_code == 200
data = resp.json()
assert data["templates"][0]["id"] == "ds1"
@respx.mock
def test_status_not_migrated():
"""Adobe template with no matching DS name → not_migrated."""
respx.get(f"{ADOBE_BASE}/libraryDocuments").mock(
return_value=httpx.Response(200, json={
"libraryDocumentList": [
{"id": "adobe1", "name": "Onboarding", "modifiedDate": "2026-04-10"},
]
})
)
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
resp = client.get("/api/templates/status", cookies={_COOKIE_NAME: _adobe_session()})
assert resp.status_code == 200
t = resp.json()["templates"][0]
assert t["status"] == "not_migrated"
assert t["docusign_id"] is None
@respx.mock
def test_status_migrated():
"""Adobe template with same name in DS and DS is newer → migrated."""
respx.get(f"{ADOBE_BASE}/libraryDocuments").mock(
return_value=httpx.Response(200, json={
"libraryDocumentList": [
{"id": "adobe1", "name": "NDA", "modifiedDate": "2026-04-10T00:00:00Z"},
]
})
)
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": "ds1", "name": "NDA", "lastModified": "2026-04-11T00:00:00Z"},
]
})
)
resp = client.get("/api/templates/status", cookies={_COOKIE_NAME: _adobe_session()})
t = resp.json()["templates"][0]
assert t["status"] == "migrated"
assert t["docusign_id"] == "ds1"
@respx.mock
def test_status_needs_update():
"""Adobe template modified after the DS template → needs_update."""
respx.get(f"{ADOBE_BASE}/libraryDocuments").mock(
return_value=httpx.Response(200, json={
"libraryDocumentList": [
{"id": "adobe1", "name": "NDA", "modifiedDate": "2026-04-15T00:00:00Z"},
]
})
)
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": "ds1", "name": "NDA", "lastModified": "2026-04-10T00:00:00Z"},
]
})
)
resp = client.get("/api/templates/status", cookies={_COOKIE_NAME: _adobe_session()})
t = resp.json()["templates"][0]
assert t["status"] == "needs_update"

192
tests/test_e2e.py Normal file
View File

@ -0,0 +1,192 @@
"""
tests/test_e2e.py
-----------------
End-to-end test: full migration pipeline through the web API.
Simulates:
1. Connect Adobe Sign (mock OAuth callback)
2. Connect DocuSign (mock OAuth callback)
3. GET /api/templates/status at least one template shown
4. POST /api/migrate status created
5. GET /api/templates/status same template now migrated
6. POST /api/migrate again status updated
7. GET /api/migrate/history two records for the same template
"""
import json
import os
from unittest.mock import patch
import pytest
import respx
import httpx
from fastapi.testclient import TestClient
from web.app import app
import web.routers.migrate as migrate_module
ADOBE_BASE = "https://api.eu2.adobesign.com/api/rest/v6"
DS_BASE = "https://demo.docusign.net/restapi"
DS_ACCOUNT = "e2e-account-id"
TEMPLATE_NAME = "E2E NDA"
ADOBE_ID = "e2e-adobe-001"
DS_CREATED_ID = "e2e-ds-created"
@pytest.fixture(autouse=True)
def patch_settings(monkeypatch):
import web.config as cfg
monkeypatch.setattr(cfg.settings, "docusign_account_id", DS_ACCOUNT)
monkeypatch.setattr(cfg.settings, "docusign_base_url", DS_BASE)
monkeypatch.setattr(cfg.settings, "adobe_sign_base_url", ADOBE_BASE)
monkeypatch.setattr(cfg.settings, "adobe_client_id", "test-client")
monkeypatch.setattr(cfg.settings, "adobe_client_secret", "test-secret")
monkeypatch.setattr(cfg.settings, "docusign_client_id", "test-ds-client")
monkeypatch.setattr(cfg.settings, "docusign_client_secret", "test-ds-secret")
monkeypatch.setattr(cfg.settings, "docusign_auth_server", "account-d.docusign.com")
@pytest.fixture(autouse=True)
def temp_history(tmp_path, monkeypatch):
history_path = str(tmp_path / ".history.json")
monkeypatch.setattr(migrate_module, "_HISTORY_FILE", history_path)
return history_path
def _mock_compose(template_dir, output_path):
with open(output_path, "w") as f:
json.dump({"name": TEMPLATE_NAME}, f)
def _mock_download(template_id, access_token, output_dir):
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump({"name": TEMPLATE_NAME, "id": template_id}, f)
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
json.dump({"fields": []}, f)
with open(os.path.join(output_dir, "documents.json"), "w") as f:
json.dump({"documents": []}, f)
return True
async def _async_mock_download(*args, **kwargs):
return _mock_download(*args, **kwargs)
@respx.mock
def test_full_migration_flow(temp_history):
"""Full 7-step end-to-end pipeline test."""
from web.session import _serializer, _COOKIE_NAME
from web.config import settings
test_client = TestClient(app, raise_server_exceptions=True)
# ── Step 1 & 2: Simulate already-authenticated session ──────────────────
# (OAuth callback tested in test_api_auth.py; here we inject the session directly)
session_cookie = _serializer.dumps({
"adobe_access_token": "e2e-adobe-tok",
"docusign_access_token": "e2e-ds-tok",
})
# ── Step 3: GET /api/templates/status → template visible ────────────────
respx.get(f"{ADOBE_BASE}/libraryDocuments").mock(
return_value=httpx.Response(200, json={
"libraryDocumentList": [
{"id": ADOBE_ID, "name": TEMPLATE_NAME, "modifiedDate": "2026-04-17T10:00:00Z"},
]
})
)
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
status_resp = test_client.get(
"/api/templates/status",
cookies={_COOKIE_NAME: session_cookie}
)
assert status_resp.status_code == 200
templates = status_resp.json()["templates"]
assert any(t["adobe_id"] == ADOBE_ID for t in templates)
adobe_t = next(t for t in templates if t["adobe_id"] == ADOBE_ID)
assert adobe_t["status"] == "not_migrated"
# ── Step 4: POST /api/migrate → created ─────────────────────────────────
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates", name="list1").mock(
return_value=httpx.Response(200, json={"envelopeTemplates": []})
)
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(201, json={"templateId": DS_CREATED_ID})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_mock_download),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
):
migrate_resp = test_client.post(
"/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: session_cookie},
)
assert migrate_resp.status_code == 200
result = migrate_resp.json()["results"][0]
assert result["status"] == "success"
assert result["action"] == "created"
assert result["docusign_template_id"] == DS_CREATED_ID
# ── Step 5: GET /api/templates/status → now migrated ────────────────────
respx.get(f"{ADOBE_BASE}/libraryDocuments").mock(
return_value=httpx.Response(200, json={
"libraryDocumentList": [
{"id": ADOBE_ID, "name": TEMPLATE_NAME, "modifiedDate": "2026-04-17T10:00:00Z"},
]
})
)
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": DS_CREATED_ID, "name": TEMPLATE_NAME, "lastModified": "2026-04-17T12:00:00Z"},
]
})
)
status_resp2 = test_client.get(
"/api/templates/status",
cookies={_COOKIE_NAME: session_cookie}
)
templates2 = status_resp2.json()["templates"]
t2 = next(t for t in templates2 if t["adobe_id"] == ADOBE_ID)
assert t2["status"] == "migrated"
# ── Step 6: POST /api/migrate again → updated ───────────────────────────
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
return_value=httpx.Response(200, json={
"envelopeTemplates": [
{"templateId": DS_CREATED_ID, "name": TEMPLATE_NAME, "lastModified": "2026-04-17T12:00:00Z"},
]
})
)
respx.put(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates/{DS_CREATED_ID}").mock(
return_value=httpx.Response(200, json={})
)
with (
patch.object(migrate_module, "_download_adobe_template", new=_async_mock_download),
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
):
migrate_resp2 = test_client.post(
"/api/migrate",
json={"adobe_template_ids": [ADOBE_ID]},
cookies={_COOKIE_NAME: session_cookie},
)
result2 = migrate_resp2.json()["results"][0]
assert result2["action"] == "updated"
assert result2["docusign_template_id"] == DS_CREATED_ID
# ── Step 7: GET /api/migrate/history → two records ──────────────────────
history_resp = test_client.get("/api/migrate/history")
history = history_resp.json()["history"]
assert len(history) == 2
actions = [r["action"] for r in history]
assert "created" in actions
assert "updated" in actions

129
tests/test_regression.py Normal file
View File

@ -0,0 +1,129 @@
"""
tests/test_regression.py
------------------------
Regression tests for the compose pipeline.
For each downloaded template in downloads/, run compose_template() and
compare the output against the snapshot in tests/fixtures/expected/.
These tests require no live API calls. They verify that changes to the
compose pipeline don't silently break existing template conversions.
To update snapshots after an intentional change:
pytest tests/test_regression.py --update-snapshots
"""
import json
import os
import sys
import tempfile
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
from compose_docusign_template import compose_template
DOWNLOADS_DIR = os.path.join(os.path.dirname(__file__), "..", "downloads")
FIXTURES_DIR = os.path.join(os.path.dirname(__file__), "fixtures", "expected")
# Templates with real downloaded data to test against
REGRESSION_TEMPLATES = [
"David Tag Demo Form__CBJCHBCA",
"_DEMO USE ONLY_ NDA__CBJCHBCA",
"Rob Test__CBJCHBCA",
]
@pytest.fixture
def update_snapshots(request):
return request.config.getoption("--update-snapshots", default=False)
@pytest.mark.parametrize("template_name", REGRESSION_TEMPLATES)
def test_compose_regression(template_name, update_snapshots):
"""
Compose output for each template must match the stored snapshot.
Run with --update-snapshots to regenerate.
"""
template_dir = os.path.join(DOWNLOADS_DIR, template_name)
if not os.path.isdir(template_dir):
pytest.skip(f"Downloaded template not found: {template_name}")
snapshot_path = os.path.join(FIXTURES_DIR, f"{template_name}.json")
with tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w") as tf:
output_path = tf.name
try:
result, warnings = compose_template(template_dir, output_path)
if update_snapshots:
os.makedirs(FIXTURES_DIR, exist_ok=True)
with open(snapshot_path, "w") as f:
json.dump(result, f, indent=2)
pytest.skip(f"Snapshot updated for {template_name}")
if not os.path.exists(snapshot_path):
pytest.fail(
f"No snapshot for '{template_name}'. "
f"Run with --update-snapshots to create it."
)
with open(snapshot_path) as f:
expected = json.load(f)
# Compare key structural properties
assert result.get("name") == expected.get("name"), \
f"Template name mismatch for {template_name}"
# Recipients
result_roles = sorted([r.get("roleName", "") for r in result.get("recipients", {}).get("signers", [])])
expected_roles = sorted([r.get("roleName", "") for r in expected.get("recipients", {}).get("signers", [])])
assert result_roles == expected_roles, \
f"Recipient roles changed for {template_name}: {result_roles} != {expected_roles}"
# Tab counts per type — must not regress
result_tabs = _count_tabs(result)
expected_tabs = _count_tabs(expected)
for tab_type, count in expected_tabs.items():
actual = result_tabs.get(tab_type, 0)
assert actual == count, (
f"Tab count regression in {template_name}: "
f"{tab_type} expected {count}, got {actual}"
)
finally:
if os.path.exists(output_path):
os.unlink(output_path)
def _count_tabs(template: dict) -> dict:
"""Count total tabs of each type across all signers."""
counts = {}
for signer in template.get("recipients", {}).get("signers", []):
tabs = signer.get("tabs", {})
for tab_type, items in tabs.items():
if isinstance(items, list):
counts[tab_type] = counts.get(tab_type, 0) + len(items)
return counts
def test_no_tabs_lost_on_recompose():
"""
Sanity check: every downloaded template must produce at least one tab.
Catches complete compose failures silently returning empty output.
"""
for template_name in REGRESSION_TEMPLATES:
template_dir = os.path.join(DOWNLOADS_DIR, template_name)
if not os.path.isdir(template_dir):
continue
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tf:
output_path = tf.name
try:
result, _ = compose_template(template_dir, output_path)
total_tabs = sum(_count_tabs(result).values())
assert total_tabs > 0, f"No tabs produced for {template_name}"
finally:
if os.path.exists(output_path):
os.unlink(output_path)

127
tests/test_upload_upsert.py Normal file
View File

@ -0,0 +1,127 @@
"""
tests/test_upload_upsert.py
---------------------------
Tests for idempotent (upsert) upload logic in upload_docusign_template.py.
All DocuSign API calls are mocked with responses; no live account needed.
"""
import json
import os
import sys
import tempfile
from unittest.mock import patch
import pytest
import responses as rsps_lib
# Ensure src/ is importable before import
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
# Patch get_access_token at import time so the module never tries to load a private key
with patch("docusign_auth.get_access_token", return_value="fake-token"):
import upload_docusign_template
BASE_URL = "https://demo.docusign.net/restapi"
ACCOUNT_ID = "test-account-id"
TEMPLATE_NAME = "My NDA Template"
@pytest.fixture(autouse=True)
def env_vars(monkeypatch):
monkeypatch.setenv("DOCUSIGN_ACCOUNT_ID", ACCOUNT_ID)
monkeypatch.setenv("DOCUSIGN_BASE_URL", BASE_URL)
@pytest.fixture()
def template_file():
"""Write a minimal template JSON to a temp file."""
template = {"name": TEMPLATE_NAME, "description": "test template"}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(template, f)
path = f.name
yield path
os.unlink(path)
def _list_url():
return f"{BASE_URL}/v2.1/accounts/{ACCOUNT_ID}/templates"
def _update_url(template_id):
return f"{BASE_URL}/v2.1/accounts/{ACCOUNT_ID}/templates/{template_id}"
@rsps_lib.activate
def test_creates_when_no_match(template_file):
"""No existing templates with this name → POST called, new ID returned."""
new_id = "new-template-abc"
rsps_lib.add(rsps_lib.GET, _list_url(), json={"envelopeTemplates": []}, status=200)
rsps_lib.add(rsps_lib.POST, _list_url(), json={"templateId": new_id}, status=201)
with patch.object(upload_docusign_template, "get_access_token", return_value="fake-token"):
result = upload_docusign_template.upload_template(template_file)
assert result == new_id
methods = [c.request.method for c in rsps_lib.calls]
assert methods == ["GET", "POST"]
@rsps_lib.activate
def test_updates_most_recent_when_match(template_file):
"""Two exact-name matches → PUT called on the most recently modified one."""
older_id = "template-older"
newer_id = "template-newer"
existing = [
{"templateId": older_id, "name": TEMPLATE_NAME, "lastModified": "2026-04-10T10:00:00.000Z"},
{"templateId": newer_id, "name": TEMPLATE_NAME, "lastModified": "2026-04-15T10:00:00.000Z"},
]
rsps_lib.add(rsps_lib.GET, _list_url(), json={"envelopeTemplates": existing}, status=200)
rsps_lib.add(rsps_lib.PUT, _update_url(newer_id), json={}, status=200)
with patch.object(upload_docusign_template, "get_access_token", return_value="fake-token"):
result = upload_docusign_template.upload_template(template_file)
assert result == newer_id
put_calls = [c for c in rsps_lib.calls if c.request.method == "PUT"]
assert len(put_calls) == 1
assert newer_id in put_calls[0].request.url
@rsps_lib.activate
def test_force_create_bypasses_upsert(template_file):
"""force_create=True → always POST, no GET for existing templates."""
new_id = "force-created-id"
rsps_lib.add(rsps_lib.POST, _list_url(), json={"templateId": new_id}, status=201)
with patch.object(upload_docusign_template, "get_access_token", return_value="fake-token"):
result = upload_docusign_template.upload_template(template_file, force_create=True)
assert result == new_id
get_calls = [c for c in rsps_lib.calls if c.request.method == "GET"]
assert len(get_calls) == 0
post_calls = [c for c in rsps_lib.calls if c.request.method == "POST"]
assert len(post_calls) == 1
@rsps_lib.activate
def test_partial_name_match_ignored(template_file):
"""DocuSign search_text is substring; we must reject partial-name results and POST."""
partial_id = "partial-match-id"
existing = [
{"templateId": partial_id, "name": "My NDA Template (Copy)", "lastModified": "2026-04-15T10:00:00.000Z"},
]
rsps_lib.add(rsps_lib.GET, _list_url(), json={"envelopeTemplates": existing}, status=200)
new_id = "created-new-id"
rsps_lib.add(rsps_lib.POST, _list_url(), json={"templateId": new_id}, status=201)
with patch.object(upload_docusign_template, "get_access_token", return_value="fake-token"):
result = upload_docusign_template.upload_template(template_file)
assert result == new_id
put_calls = [c for c in rsps_lib.calls if c.request.method == "PUT"]
assert len(put_calls) == 0

0
web/__init__.py Normal file
View File

47
web/app.py Normal file
View File

@ -0,0 +1,47 @@
"""
web/app.py
----------
FastAPI entrypoint for the Adobe Sign DocuSign Migrator web app.
Run with:
uvicorn web.app:app --reload --port 8000
From the project root.
"""
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import os
from web.config import settings
from web.routers import auth, templates, migrate
app = FastAPI(
title="Adobe Sign → DocuSign Migrator",
version=settings.version,
docs_url="/api/docs",
)
# Routers
app.include_router(auth.router, prefix="/api/auth", tags=["auth"])
app.include_router(templates.router, prefix="/api/templates", tags=["templates"])
app.include_router(migrate.router, prefix="/api/migrate", tags=["migrate"])
# Static files (frontend)
_static_dir = os.path.join(os.path.dirname(__file__), "static")
if os.path.isdir(_static_dir):
app.mount("/static", StaticFiles(directory=_static_dir), name="static")
@app.get("/health", tags=["health"])
def health():
return {"status": "ok", "version": settings.version}
@app.get("/", include_in_schema=False)
def index():
index_path = os.path.join(_static_dir, "index.html")
if os.path.exists(index_path):
return FileResponse(index_path)
return {"message": "Adobe Sign → DocuSign Migrator API", "docs": "/api/docs"}

36
web/config.py Normal file
View File

@ -0,0 +1,36 @@
"""
web/config.py
-------------
Environment-based configuration for the web app.
All values come from .env or environment variables.
"""
import os
from dotenv import load_dotenv
load_dotenv()
class Settings:
# Adobe Sign OAuth
adobe_client_id: str = os.getenv("ADOBE_CLIENT_ID", "")
adobe_client_secret: str = os.getenv("ADOBE_CLIENT_SECRET", "")
adobe_redirect_uri: str = os.getenv("ADOBE_REDIRECT_URI", "http://localhost:8000/api/auth/adobe/callback")
adobe_sign_base_url: str = os.getenv("ADOBE_SIGN_BASE_URL", "https://api.eu2.adobesign.com/api/rest/v6")
# DocuSign OAuth
docusign_client_id: str = os.getenv("DOCUSIGN_CLIENT_ID", "")
docusign_client_secret: str = os.getenv("DOCUSIGN_CLIENT_SECRET", "")
docusign_redirect_uri: str = os.getenv("DOCUSIGN_REDIRECT_URI", "http://localhost:8000/api/auth/docusign/callback")
docusign_account_id: str = os.getenv("DOCUSIGN_ACCOUNT_ID", "")
docusign_base_url: str = os.getenv("DOCUSIGN_BASE_URL", "https://demo.docusign.net/restapi")
docusign_auth_server: str = os.getenv("DOCUSIGN_AUTH_SERVER", "account-d.docusign.com")
# Session
session_secret_key: str = os.getenv("SESSION_SECRET_KEY", "dev-secret-change-in-production")
# App
version: str = "2.0"
settings = Settings()

0
web/routers/__init__.py Normal file
View File

153
web/routers/auth.py Normal file
View File

@ -0,0 +1,153 @@
"""
web/routers/auth.py
-------------------
OAuth endpoints for Adobe Sign and DocuSign.
Adobe Sign: Authorization Code flow
DocuSign: Authorization Code flow (demo sandbox)
Tokens are stored in a signed session cookie.
"""
import httpx
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse, RedirectResponse
from web.config import settings
from web.session import get_session, save_session, clear_session
router = APIRouter()
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
@router.get("/status")
def auth_status(request: Request):
"""Returns which platforms the current session is connected to."""
session = get_session(request)
return {
"adobe": bool(session.get("adobe_access_token")),
"docusign": bool(session.get("docusign_access_token")),
}
# ---------------------------------------------------------------------------
# Adobe Sign
# ---------------------------------------------------------------------------
@router.get("/adobe/start")
def adobe_start():
"""Redirect the browser to the Adobe Sign OAuth authorization page."""
params = (
f"?response_type=code"
f"&client_id={settings.adobe_client_id}"
f"&redirect_uri={settings.adobe_redirect_uri}"
f"&scope=library_read:self+library_write:self+user_read:self"
)
auth_url = "https://secure.eu2.adobesign.com/public/oauth/v2" + params
return RedirectResponse(auth_url)
@router.get("/adobe/callback")
async def adobe_callback(request: Request, code: str = ""):
"""Exchange authorization code for access + refresh tokens."""
if not code:
return JSONResponse({"error": "missing code"}, status_code=400)
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.eu2.adobesign.com/oauth/v2/token",
data={
"grant_type": "authorization_code",
"client_id": settings.adobe_client_id,
"client_secret": settings.adobe_client_secret,
"redirect_uri": settings.adobe_redirect_uri,
"code": code,
},
)
if not resp.is_success:
return JSONResponse({"error": "token exchange failed", "detail": resp.text}, status_code=502)
token_data = resp.json()
session = get_session(request)
session["adobe_access_token"] = token_data.get("access_token")
session["adobe_refresh_token"] = token_data.get("refresh_token")
response = RedirectResponse("/")
save_session(response, session)
return response
@router.get("/adobe/disconnect")
def adobe_disconnect(request: Request):
session = get_session(request)
session.pop("adobe_access_token", None)
session.pop("adobe_refresh_token", None)
response = JSONResponse({"disconnected": "adobe"})
save_session(response, session)
return response
# ---------------------------------------------------------------------------
# DocuSign
# ---------------------------------------------------------------------------
@router.get("/docusign/start")
def docusign_start():
"""Redirect the browser to the DocuSign OAuth authorization page."""
params = (
f"?response_type=code"
f"&scope=signature"
f"&client_id={settings.docusign_client_id}"
f"&redirect_uri={settings.docusign_redirect_uri}"
)
auth_url = f"https://{settings.docusign_auth_server}/oauth/auth" + params
return RedirectResponse(auth_url)
@router.get("/docusign/callback")
async def docusign_callback(request: Request, code: str = ""):
"""Exchange authorization code for access token."""
if not code:
return JSONResponse({"error": "missing code"}, status_code=400)
import base64
credentials = base64.b64encode(
f"{settings.docusign_client_id}:{settings.docusign_client_secret}".encode()
).decode()
async with httpx.AsyncClient() as client:
resp = await client.post(
f"https://{settings.docusign_auth_server}/oauth/token",
headers={"Authorization": f"Basic {credentials}"},
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": settings.docusign_redirect_uri,
},
)
if not resp.is_success:
return JSONResponse({"error": "token exchange failed", "detail": resp.text}, status_code=502)
token_data = resp.json()
session = get_session(request)
session["docusign_access_token"] = token_data.get("access_token")
session["docusign_refresh_token"] = token_data.get("refresh_token")
response = RedirectResponse("/")
save_session(response, session)
return response
@router.get("/docusign/disconnect")
def docusign_disconnect(request: Request):
session = get_session(request)
session.pop("docusign_access_token", None)
session.pop("docusign_refresh_token", None)
response = JSONResponse({"disconnected": "docusign"})
save_session(response, session)
return response

257
web/routers/migrate.py Normal file
View File

@ -0,0 +1,257 @@
"""
web/routers/migrate.py
----------------------
Migration trigger and history endpoints.
POST /api/migrate run the pipeline for one or more Adobe template IDs
GET /api/migrate/history return past migration records
"""
import asyncio
import json
import os
import sys
import tempfile
from datetime import datetime, timezone
from typing import List, Optional
import httpx
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from web.config import settings
from web.session import get_session
# Ensure src/ is on path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src"))
router = APIRouter()
_HISTORY_FILE = os.path.join(
os.path.dirname(__file__), "..", "..", "migration-output", ".history.json"
)
class MigrateRequest(BaseModel):
adobe_template_ids: List[str]
def _load_history() -> list:
if not os.path.exists(_HISTORY_FILE):
return []
with open(_HISTORY_FILE) as f:
return json.load(f)
def _save_history(records: list) -> None:
os.makedirs(os.path.dirname(_HISTORY_FILE), exist_ok=True)
with open(_HISTORY_FILE, "w") as f:
json.dump(records, f, indent=2)
def _load_compose():
"""
Dynamically load and return the compose_template function from src/.
Isolated in its own function so tests can patch it without touching the file system.
"""
import importlib.util
spec = importlib.util.spec_from_file_location(
"compose_docusign_template",
os.path.join(os.path.dirname(__file__), "..", "..", "src", "compose_docusign_template.py"),
)
compose_mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(compose_mod)
return compose_mod.compose_template
async def _download_adobe_template(template_id: str, access_token: str, output_dir: str) -> bool:
"""Download Adobe Sign template files into output_dir. Returns True on success."""
headers = {"Authorization": f"Bearer {access_token}"}
base = settings.adobe_sign_base_url
async with httpx.AsyncClient() as client:
# Metadata
meta_resp = await client.get(f"{base}/libraryDocuments/{template_id}", headers=headers)
if not meta_resp.is_success:
return False
metadata = meta_resp.json()
# Form fields
fields_resp = await client.get(f"{base}/libraryDocuments/{template_id}/formFields", headers=headers)
form_fields = fields_resp.json() if fields_resp.is_success else {"fields": []}
# Documents list
docs_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents", headers=headers)
documents = docs_resp.json() if docs_resp.is_success else {"documents": []}
# Download first PDF
doc_list = documents.get("documents", [])
pdf_bytes = b""
if doc_list:
doc_id = doc_list[0].get("id")
pdf_resp = await client.get(f"{base}/libraryDocuments/{template_id}/documents/{doc_id}", headers=headers)
if pdf_resp.is_success:
pdf_bytes = pdf_resp.content
os.makedirs(output_dir, exist_ok=True)
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
json.dump(metadata, f, indent=2)
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
json.dump(form_fields, f, indent=2)
with open(os.path.join(output_dir, "documents.json"), "w") as f:
json.dump(documents, f, indent=2)
if pdf_bytes:
pdf_name = doc_list[0].get("name", "document.pdf") if doc_list else "document.pdf"
if not pdf_name.endswith(".pdf"):
pdf_name += ".pdf"
with open(os.path.join(output_dir, pdf_name), "wb") as f:
f.write(pdf_bytes)
return True
async def _migrate_one(
adobe_id: str,
adobe_access_token: str,
docusign_access_token: str,
) -> dict:
"""Run the full pipeline for one Adobe template. Returns a result record."""
timestamp = datetime.now(timezone.utc).isoformat()
with tempfile.TemporaryDirectory() as tmpdir:
download_dir = os.path.join(tmpdir, "download")
output_dir = os.path.join(tmpdir, "output")
# 1. Download
ok = await _download_adobe_template(adobe_id, adobe_access_token, download_dir)
if not ok:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": None,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": "Adobe Sign download failed",
}
# Read template name from metadata
with open(os.path.join(download_dir, "metadata.json")) as f:
metadata = json.load(f)
template_name = metadata.get("name", adobe_id)
# 2. Compose DocuSign template JSON
composed_file = os.path.join(tmpdir, "docusign-template.json")
try:
compose_fn = _load_compose()
compose_fn(download_dir, composed_file)
except Exception as exc:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": f"Compose failed: {exc}",
}
if not os.path.exists(composed_file):
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": "Compose produced no output file",
}
# 3. Upload (upsert) to DocuSign using web session token
with open(composed_file) as f:
template_json = json.load(f)
ds_headers = {
"Authorization": f"Bearer {docusign_access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
list_url = f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates"
async with httpx.AsyncClient() as client:
# Find existing
list_resp = await client.get(
list_url, headers=ds_headers, params={"search_text": template_name, "count": 100}
)
existing_id = None
if list_resp.is_success:
raw = list_resp.json().get("envelopeTemplates") or list_resp.json().get("templates") or []
exact = [t for t in raw if t.get("name") == template_name]
if exact:
exact.sort(key=lambda t: t.get("lastModified", ""), reverse=True)
existing_id = exact[0]["templateId"]
if existing_id:
up_resp = await client.put(
f"{list_url}/{existing_id}", headers=ds_headers, json=template_json
)
action = "updated"
template_id = existing_id
else:
up_resp = await client.post(list_url, headers=ds_headers, json=template_json)
action = "created"
template_id = up_resp.json().get("templateId") if up_resp.is_success else None
if not up_resp.is_success:
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": None,
"action": None,
"status": "failed",
"error": f"DocuSign upload failed ({up_resp.status_code}): {up_resp.text[:200]}",
}
return {
"timestamp": timestamp,
"adobe_template_id": adobe_id,
"adobe_template_name": template_name,
"docusign_template_id": template_id,
"action": action,
"status": "success",
"error": None,
}
@router.post("")
async def run_migration(body: MigrateRequest, request: Request):
"""Migrate one or more Adobe Sign templates to DocuSign."""
session = get_session(request)
if not session.get("adobe_access_token"):
return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401)
if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
tasks = [
_migrate_one(
aid,
session["adobe_access_token"],
session["docusign_access_token"],
)
for aid in body.adobe_template_ids
]
results = await asyncio.gather(*tasks)
# Append to history
history = _load_history()
history.extend(results)
_save_history(history)
return {"results": list(results)}
@router.get("/history")
def migration_history():
"""Return all past migration records."""
return {"history": _load_history()}

167
web/routers/templates.py Normal file
View File

@ -0,0 +1,167 @@
"""
web/routers/templates.py
------------------------
Template listing endpoints for Adobe Sign and DocuSign.
Computes per-template migration status for the side-by-side UI.
"""
from datetime import datetime, timezone
from typing import Optional
import httpx
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from web.config import settings
from web.session import get_session
router = APIRouter()
def _require_adobe(session: dict) -> Optional[JSONResponse]:
if not session.get("adobe_access_token"):
return JSONResponse({"error": "not authenticated to Adobe Sign"}, status_code=401)
return None
def _require_docusign(session: dict) -> Optional[JSONResponse]:
if not session.get("docusign_access_token"):
return JSONResponse({"error": "not authenticated to DocuSign"}, status_code=401)
return None
@router.get("/adobe")
async def list_adobe_templates(request: Request):
"""List all Adobe Sign library documents (templates) for the current user."""
session = get_session(request)
err = _require_adobe(session)
if err:
return err
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{settings.adobe_sign_base_url}/libraryDocuments",
headers={"Authorization": f"Bearer {session['adobe_access_token']}"},
params={"pageSize": 100},
)
if not resp.is_success:
return JSONResponse({"error": "Adobe Sign API error", "detail": resp.text}, status_code=502)
data = resp.json()
templates = [
{
"id": t.get("id"),
"name": t.get("name"),
"modifiedDate": t.get("modifiedDate"),
"sharingMode": t.get("sharingMode"),
}
for t in data.get("libraryDocumentList", [])
]
return {"templates": templates}
@router.get("/docusign")
async def list_docusign_templates(request: Request):
"""List all DocuSign templates for the account."""
session = get_session(request)
err = _require_docusign(session)
if err:
return err
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates",
headers={"Authorization": f"Bearer {session['docusign_access_token']}"},
params={"count": 100},
)
if not resp.is_success:
return JSONResponse({"error": "DocuSign API error", "detail": resp.text}, status_code=502)
data = resp.json()
raw = data.get("envelopeTemplates") or data.get("templates") or []
templates = [
{
"id": t.get("templateId"),
"name": t.get("name"),
"lastModified": t.get("lastModified"),
}
for t in raw
]
return {"templates": templates}
@router.get("/status")
async def template_status(request: Request):
"""
Merged view: each Adobe template tagged with migration status.
Status values:
not_migrated no DocuSign template with the same name
migrated at least one exact name match in DocuSign
needs_update name match exists but Adobe template is newer
"""
session = get_session(request)
err = _require_adobe(session) or _require_docusign(session)
if err:
return err
# Fetch both lists concurrently
async with httpx.AsyncClient() as client:
adobe_resp, ds_resp = await asyncio.gather(
client.get(
f"{settings.adobe_sign_base_url}/libraryDocuments",
headers={"Authorization": f"Bearer {session['adobe_access_token']}"},
params={"pageSize": 100},
),
client.get(
f"{settings.docusign_base_url}/v2.1/accounts/{settings.docusign_account_id}/templates",
headers={"Authorization": f"Bearer {session['docusign_access_token']}"},
params={"count": 100},
),
)
if not adobe_resp.is_success:
return JSONResponse({"error": "Adobe Sign API error"}, status_code=502)
if not ds_resp.is_success:
return JSONResponse({"error": "DocuSign API error"}, status_code=502)
adobe_templates = adobe_resp.json().get("libraryDocumentList", [])
ds_raw = ds_resp.json().get("envelopeTemplates") or ds_resp.json().get("templates") or []
# Build a name → most-recently-modified DocuSign template lookup
ds_by_name: dict[str, dict] = {}
for t in ds_raw:
name = t.get("name", "")
existing = ds_by_name.get(name)
if not existing or t.get("lastModified", "") > existing.get("lastModified", ""):
ds_by_name[name] = t
results = []
for t in adobe_templates:
name = t.get("name", "")
adobe_modified = t.get("modifiedDate", "")
ds_match = ds_by_name.get(name)
if not ds_match:
status = "not_migrated"
else:
ds_modified = ds_match.get("lastModified", "")
# needs_update if Adobe was modified after the DS template
status = "needs_update" if adobe_modified > ds_modified else "migrated"
results.append({
"adobe_id": t.get("id"),
"name": name,
"adobe_modified": adobe_modified,
"docusign_id": ds_match.get("templateId") if ds_match else None,
"docusign_modified": ds_match.get("lastModified") if ds_match else None,
"status": status,
})
return {"templates": results}
# asyncio needed for gather — import at top of module
import asyncio

45
web/session.py Normal file
View File

@ -0,0 +1,45 @@
"""
web/session.py
--------------
Session helpers using signed cookies (itsdangerous).
Stores Adobe Sign and DocuSign tokens server-side in the cookie payload.
Sessions are short-lived (1 hour) and signed but not encrypted.
Do not store sensitive secrets here beyond access tokens.
"""
from itsdangerous import URLSafeTimedSerializer, BadSignature, SignatureExpired
from fastapi import Request, Response
from web.config import settings
_serializer = URLSafeTimedSerializer(settings.session_secret_key)
_COOKIE_NAME = "migrator_session"
_MAX_AGE = 3600 # 1 hour
def get_session(request: Request) -> dict:
"""Read and verify the session cookie. Returns an empty dict if missing or invalid."""
raw = request.cookies.get(_COOKIE_NAME)
if not raw:
return {}
try:
return _serializer.loads(raw, max_age=_MAX_AGE)
except (BadSignature, SignatureExpired):
return {}
def save_session(response: Response, data: dict) -> None:
"""Sign and write session data into a cookie on the response."""
signed = _serializer.dumps(data)
response.set_cookie(
_COOKIE_NAME,
signed,
max_age=_MAX_AGE,
httponly=True,
samesite="lax",
)
def clear_session(response: Response) -> None:
"""Delete the session cookie."""
response.delete_cookie(_COOKIE_NAME)

221
web/static/app.js Normal file
View File

@ -0,0 +1,221 @@
// Adobe Sign → DocuSign Migrator — frontend app
// Vanilla JS, no build step.
const $ = id => document.getElementById(id);
let adobeTemplates = []; // [{id, name, modifiedDate}]
let dsTemplates = []; // [{id, name, lastModified}]
let statusTemplates = []; // [{adobe_id, name, status, docusign_id, ...}]
let authState = { adobe: false, docusign: false };
// ── Init ────────────────────────────────────────────────────────────────────
document.addEventListener('DOMContentLoaded', async () => {
await refreshAuth();
await refreshTemplates();
await refreshHistory();
$('btn-migrate').addEventListener('click', onMigrate);
$('btn-refresh').addEventListener('click', async () => {
await refreshTemplates();
await refreshHistory();
});
});
// ── Auth ─────────────────────────────────────────────────────────────────────
async function refreshAuth() {
const resp = await fetch('/api/auth/status');
authState = await resp.json();
renderAuthBar();
}
function renderAuthBar() {
renderAuthBadge('badge-adobe', 'Adobe Sign', authState.adobe, '/api/auth/adobe/start', '/api/auth/adobe/disconnect');
renderAuthBadge('badge-docusign', 'DocuSign', authState.docusign, '/api/auth/docusign/start', '/api/auth/docusign/disconnect');
}
function renderAuthBadge(id, label, connected, connectUrl, disconnectUrl) {
const el = $(id);
el.textContent = connected ? `${label}` : `Connect ${label}`;
el.className = 'auth-badge' + (connected ? ' connected' : '');
el.onclick = () => {
if (connected) {
fetch(disconnectUrl).then(() => { authState[id.replace('badge-','')] = false; renderAuthBar(); refreshTemplates(); });
} else {
window.location.href = connectUrl;
}
};
}
// ── Templates ────────────────────────────────────────────────────────────────
async function refreshTemplates() {
renderAdobeList([]);
renderDsList([]);
if (!authState.adobe || !authState.docusign) {
setStatus(authState.adobe || authState.docusign
? 'Connect both platforms to see migration status.'
: 'Connect Adobe Sign and DocuSign to get started.');
$('btn-migrate').disabled = true;
return;
}
setStatus('Loading templates…');
try {
const [statusResp, dsResp] = await Promise.all([
fetch('/api/templates/status'),
fetch('/api/templates/docusign'),
]);
statusTemplates = (await statusResp.json()).templates || [];
dsTemplates = ((await dsResp.json()).templates || []);
renderAdobeList(statusTemplates);
renderDsList(dsTemplates);
setStatus(`${statusTemplates.length} Adobe template(s) loaded.`);
} catch (e) {
setStatus('Error loading templates: ' + e.message);
}
}
function renderAdobeList(items) {
const ul = $('adobe-list');
if (!items.length) {
ul.innerHTML = '<li class="empty-msg">No templates found.</li>';
return;
}
ul.innerHTML = items.map(t => `
<li class="template-item" data-id="${t.adobe_id}">
<input type="checkbox" data-id="${t.adobe_id}" />
<span class="template-name">${escHtml(t.name)}</span>
<span class="badge badge-${t.status}">${statusLabel(t.status)}</span>
<span class="template-spinner" id="spin-${t.adobe_id}"></span>
</li>
`).join('');
ul.querySelectorAll('.template-item').forEach(li => {
li.addEventListener('click', e => {
if (e.target.type === 'checkbox') return;
const cb = li.querySelector('input[type=checkbox]');
cb.checked = !cb.checked;
li.classList.toggle('selected', cb.checked);
updateMigrateButton();
});
li.querySelector('input').addEventListener('change', () => {
li.classList.toggle('selected', li.querySelector('input').checked);
updateMigrateButton();
});
});
}
function renderDsList(items) {
const ul = $('ds-list');
if (!items.length) {
ul.innerHTML = '<li class="empty-msg">No templates found.</li>';
return;
}
ul.innerHTML = items.map(t => `
<li class="template-item">
<span class="template-name">${escHtml(t.name)}</span>
<span style="font-size:11px;color:#999">${(t.lastModified || '').slice(0, 10)}</span>
</li>
`).join('');
}
function updateMigrateButton() {
const checked = document.querySelectorAll('#adobe-list input[type=checkbox]:checked');
$('btn-migrate').disabled = checked.length === 0;
}
// ── Migration ─────────────────────────────────────────────────────────────────
async function onMigrate() {
const checked = [...document.querySelectorAll('#adobe-list input[type=checkbox]:checked')];
const ids = checked.map(cb => cb.dataset.id);
if (!ids.length) return;
$('btn-migrate').disabled = true;
setStatus(`Migrating ${ids.length} template(s)…`);
// Show spinners
ids.forEach(id => {
const spin = $('spin-' + id);
if (spin) spin.textContent = '⏳';
});
try {
const resp = await fetch('/api/migrate', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ adobe_template_ids: ids }),
});
const data = await resp.json();
let successCount = 0;
(data.results || []).forEach(r => {
const spin = $('spin-' + r.adobe_template_id);
if (r.status === 'success') {
successCount++;
if (spin) spin.textContent = r.action === 'updated' ? '✏️' : '✅';
} else {
if (spin) spin.textContent = '❌';
}
});
setStatus(`Done: ${successCount}/${ids.length} succeeded.`);
await refreshTemplates();
await refreshHistory();
} catch (e) {
setStatus('Migration error: ' + e.message);
}
}
// ── History ───────────────────────────────────────────────────────────────────
async function refreshHistory() {
try {
const resp = await fetch('/api/migrate/history');
const { history } = await resp.json();
renderHistory(history || []);
} catch {
renderHistory([]);
}
}
function renderHistory(records) {
const tbody = $('history-tbody');
if (!records.length) {
tbody.innerHTML = '<tr><td colspan="5" class="empty-msg">No migrations yet.</td></tr>';
return;
}
tbody.innerHTML = [...records].reverse().slice(0, 50).map(r => `
<tr>
<td>${(r.timestamp || '').replace('T', ' ').slice(0, 19)}</td>
<td>${escHtml(r.adobe_template_name || r.adobe_template_id || '')}</td>
<td>${escHtml(r.docusign_template_id || '—')}</td>
<td>${escHtml(r.action || '—')}</td>
<td>
<span class="badge ${r.status === 'success' ? 'badge-migrated' : 'badge-not_migrated'}">
${r.status}
</span>
</td>
</tr>
`).join('');
}
// ── Utilities ─────────────────────────────────────────────────────────────────
function setStatus(msg) { $('status-msg').textContent = msg; }
function statusLabel(s) {
return { not_migrated: 'Not Migrated', migrated: 'Migrated', needs_update: 'Needs Update' }[s] || s;
}
function escHtml(str) {
return String(str)
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
}

79
web/static/index.html Normal file
View File

@ -0,0 +1,79 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Adobe Sign → DocuSign Migrator</title>
<link rel="stylesheet" href="/static/style.css" />
</head>
<body>
<header>
<h1>Adobe Sign → DocuSign Migrator</h1>
<div id="auth-bar">
<span id="badge-adobe" class="auth-badge">Connect Adobe Sign</span>
<span id="badge-docusign" class="auth-badge">Connect DocuSign</span>
</div>
</header>
<main>
<!-- Action bar -->
<div class="action-bar">
<button id="btn-migrate" disabled>Migrate Selected</button>
<button id="btn-refresh">↻ Refresh</button>
<span id="status-msg">Loading…</span>
</div>
<!-- Side-by-side panels -->
<div class="panel-row">
<div class="panel">
<div class="panel-header">
<span>Adobe Sign Templates</span>
<span style="font-weight:400;font-size:12px;color:#888">Select to migrate →</span>
</div>
<div class="panel-body">
<ul class="template-list" id="adobe-list">
<li class="empty-msg">Loading…</li>
</ul>
</div>
</div>
<div class="panel">
<div class="panel-header">
<span>DocuSign Templates</span>
</div>
<div class="panel-body">
<ul class="template-list" id="ds-list">
<li class="empty-msg">Loading…</li>
</ul>
</div>
</div>
</div>
<!-- Migration history -->
<div class="history-section">
<div class="panel-header">Migration History</div>
<table class="history-table">
<thead>
<tr>
<th>Time</th>
<th>Adobe Template</th>
<th>DocuSign Template ID</th>
<th>Action</th>
<th>Status</th>
</tr>
</thead>
<tbody id="history-tbody">
<tr><td colspan="5" class="empty-msg">No migrations yet.</td></tr>
</tbody>
</table>
</div>
</main>
<script src="/static/app.js"></script>
</body>
</html>

153
web/static/style.css Normal file
View File

@ -0,0 +1,153 @@
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
background: #f5f5f5;
color: #222;
font-size: 14px;
}
/* ── Header ── */
header {
background: #1a3c5e;
color: #fff;
padding: 14px 24px;
display: flex;
align-items: center;
justify-content: space-between;
}
header h1 { font-size: 18px; font-weight: 600; }
#auth-bar { display: flex; gap: 12px; align-items: center; font-size: 13px; }
.auth-badge {
padding: 4px 10px;
border-radius: 12px;
border: 1px solid rgba(255,255,255,0.4);
cursor: pointer;
transition: background 0.15s;
}
.auth-badge.connected { background: #28a745; border-color: #28a745; }
.auth-badge:not(.connected):hover { background: rgba(255,255,255,0.15); }
/* ── Main layout ── */
main { padding: 20px 24px; }
.panel-row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 16px;
margin-bottom: 20px;
}
.panel {
background: #fff;
border: 1px solid #ddd;
border-radius: 6px;
overflow: hidden;
}
.panel-header {
padding: 12px 16px;
background: #f8f9fa;
border-bottom: 1px solid #ddd;
display: flex;
justify-content: space-between;
align-items: center;
font-weight: 600;
font-size: 13px;
color: #555;
text-transform: uppercase;
letter-spacing: 0.04em;
}
.panel-body { padding: 0; }
/* ── Template list ── */
.template-list { list-style: none; }
.template-item {
display: flex;
align-items: center;
gap: 10px;
padding: 10px 16px;
border-bottom: 1px solid #f0f0f0;
cursor: pointer;
transition: background 0.1s;
}
.template-item:last-child { border-bottom: none; }
.template-item:hover { background: #f9f9f9; }
.template-item.selected { background: #eef4ff; }
.template-item input[type=checkbox] { flex-shrink: 0; }
.template-name { flex: 1; font-size: 13px; }
/* ── Status badges ── */
.badge {
font-size: 11px;
font-weight: 600;
padding: 2px 8px;
border-radius: 10px;
white-space: nowrap;
}
.badge-migrated { background: #d4edda; color: #155724; }
.badge-needs_update { background: #fff3cd; color: #856404; }
.badge-not_migrated { background: #f8d7da; color: #721c24; }
.template-spinner { font-size: 12px; color: #888; }
/* ── Action bar ── */
.action-bar {
display: flex;
align-items: center;
gap: 12px;
margin-bottom: 20px;
}
button {
padding: 8px 18px;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 13px;
font-weight: 600;
transition: opacity 0.15s;
}
button:disabled { opacity: 0.45; cursor: not-allowed; }
#btn-migrate { background: #1a3c5e; color: #fff; }
#btn-migrate:not(:disabled):hover { background: #235080; }
#btn-refresh { background: #e9ecef; color: #333; }
#btn-refresh:hover { background: #dee2e6; }
#status-msg { font-size: 13px; color: #555; }
/* ── History ── */
.history-section { background: #fff; border: 1px solid #ddd; border-radius: 6px; }
.history-section .panel-header { background: #f8f9fa; }
.history-table {
width: 100%;
border-collapse: collapse;
font-size: 13px;
}
.history-table th {
text-align: left;
padding: 8px 14px;
background: #f8f9fa;
border-bottom: 1px solid #ddd;
font-weight: 600;
color: #555;
}
.history-table td {
padding: 8px 14px;
border-bottom: 1px solid #f0f0f0;
}
.history-table tr:last-child td { border-bottom: none; }
.empty-msg { padding: 20px; text-align: center; color: #999; font-size: 13px; }
/* ── Responsive ── */
@media (max-width: 700px) {
.panel-row { grid-template-columns: 1fr; }
}