235 lines
8.8 KiB
Python
235 lines
8.8 KiB
Python
"""
|
|
Tests for Phase 10: migration options (dryRun, overwriteIfExists, includeDocuments).
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
import respx
|
|
import httpx
|
|
from fastapi.testclient import TestClient
|
|
|
|
from web.app import app
|
|
from web.session import _serializer, _COOKIE_NAME
|
|
import web.routers.migrate as migrate_module
|
|
|
|
client = TestClient(app, raise_server_exceptions=True)
|
|
|
|
ADOBE_BASE = "https://api.eu2.adobesign.com/api/rest/v6"
|
|
DS_BASE = "https://demo.docusign.net/restapi"
|
|
DS_ACCOUNT = "test-account-id"
|
|
TEMPLATE_NAME = "Options Test Template"
|
|
ADOBE_ID = "opt-adobe-001"
|
|
DS_EXISTING_ID = "ds-existing-opt-001"
|
|
DS_NEW_ID = "ds-new-opt-001"
|
|
|
|
|
|
def _full_session():
|
|
return _serializer.dumps({
|
|
"adobe_access_token": "adobe-tok",
|
|
"docusign_access_token": "ds-tok",
|
|
})
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def patch_settings(monkeypatch):
|
|
import web.config as cfg
|
|
monkeypatch.setattr(cfg.settings, "docusign_account_id", DS_ACCOUNT)
|
|
monkeypatch.setattr(cfg.settings, "docusign_base_url", DS_BASE)
|
|
monkeypatch.setattr(cfg.settings, "adobe_sign_base_url", ADOBE_BASE)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def temp_history(tmp_path, monkeypatch):
|
|
history_path = str(tmp_path / ".history.json")
|
|
monkeypatch.setattr(migrate_module, "_HISTORY_FILE", history_path)
|
|
return history_path
|
|
|
|
|
|
def _async_wrap(sync_fn):
|
|
async def wrapper(*args, **kwargs):
|
|
return sync_fn(*args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def _mock_download(template_id, access_token, output_dir):
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
with open(os.path.join(output_dir, "metadata.json"), "w") as f:
|
|
json.dump({"name": TEMPLATE_NAME, "id": template_id}, f)
|
|
with open(os.path.join(output_dir, "form_fields.json"), "w") as f:
|
|
json.dump({"fields": []}, f)
|
|
with open(os.path.join(output_dir, "documents.json"), "w") as f:
|
|
json.dump({"documents": []}, f)
|
|
return True
|
|
|
|
|
|
def _mock_compose(template_dir: str, output_path: str):
|
|
with open(output_path, "w") as f:
|
|
json.dump({"name": TEMPLATE_NAME, "description": "mocked"}, f)
|
|
|
|
|
|
def _mock_validation_ok(download_dir):
|
|
return {"blockers": [], "warnings": [], "has_blockers": False}
|
|
|
|
|
|
class TestDryRun:
|
|
@respx.mock
|
|
def test_dry_run_does_not_upload(self):
|
|
"""dry_run=True: compose succeeds but no POST/PUT to DocuSign."""
|
|
with (
|
|
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
|
|
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
|
|
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
|
|
):
|
|
resp = client.post(
|
|
"/api/migrate",
|
|
json={
|
|
"source_template_ids": [ADOBE_ID],
|
|
"options": {"dry_run": True},
|
|
},
|
|
cookies={_COOKIE_NAME: _full_session()},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
results = resp.json()["results"]
|
|
assert results[0]["status"] == "dry_run"
|
|
assert results[0]["action"] == "dry_run"
|
|
assert results[0]["docusign_template_id"] is None
|
|
assert results[0]["dry_run"] is True
|
|
|
|
@respx.mock
|
|
def test_dry_run_false_does_upload(self):
|
|
"""dry_run=False (default): upload proceeds."""
|
|
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
|
|
return_value=httpx.Response(200, json={"envelopeTemplates": []})
|
|
)
|
|
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
|
|
return_value=httpx.Response(201, json={"templateId": DS_NEW_ID})
|
|
)
|
|
with (
|
|
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
|
|
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
|
|
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
|
|
):
|
|
resp = client.post(
|
|
"/api/migrate",
|
|
json={"source_template_ids": [ADOBE_ID], "options": {"dry_run": False}},
|
|
cookies={_COOKIE_NAME: _full_session()},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
assert resp.json()["results"][0]["status"] == "success"
|
|
|
|
|
|
class TestOverwriteIfExists:
|
|
@respx.mock
|
|
def test_skip_when_overwrite_false(self):
|
|
"""overwrite_if_exists=False + existing template → skipped."""
|
|
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
|
|
return_value=httpx.Response(200, json={
|
|
"envelopeTemplates": [
|
|
{"templateId": DS_EXISTING_ID, "name": TEMPLATE_NAME, "lastModified": "2026-04-10T00:00:00Z"}
|
|
]
|
|
})
|
|
)
|
|
with (
|
|
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
|
|
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
|
|
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
|
|
):
|
|
resp = client.post(
|
|
"/api/migrate",
|
|
json={"source_template_ids": [ADOBE_ID], "options": {"overwrite_if_exists": False}},
|
|
cookies={_COOKIE_NAME: _full_session()},
|
|
)
|
|
|
|
results = resp.json()["results"]
|
|
assert results[0]["status"] == "skipped"
|
|
assert results[0]["docusign_template_id"] == DS_EXISTING_ID
|
|
|
|
@respx.mock
|
|
def test_overwrite_when_true(self):
|
|
"""overwrite_if_exists=True + existing template → PUT update."""
|
|
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
|
|
return_value=httpx.Response(200, json={
|
|
"envelopeTemplates": [
|
|
{"templateId": DS_EXISTING_ID, "name": TEMPLATE_NAME, "lastModified": "2026-04-10T00:00:00Z"}
|
|
]
|
|
})
|
|
)
|
|
respx.put(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates/{DS_EXISTING_ID}").mock(
|
|
return_value=httpx.Response(200, json={})
|
|
)
|
|
with (
|
|
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
|
|
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
|
|
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
|
|
):
|
|
resp = client.post(
|
|
"/api/migrate",
|
|
json={"source_template_ids": [ADOBE_ID], "options": {"overwrite_if_exists": True}},
|
|
cookies={_COOKIE_NAME: _full_session()},
|
|
)
|
|
|
|
assert resp.json()["results"][0]["action"] == "updated"
|
|
|
|
|
|
class TestSourceTemplateIds:
|
|
@respx.mock
|
|
def test_source_template_ids_field(self):
|
|
"""source_template_ids (new field) works correctly."""
|
|
respx.get(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
|
|
return_value=httpx.Response(200, json={"envelopeTemplates": []})
|
|
)
|
|
respx.post(f"{DS_BASE}/v2.1/accounts/{DS_ACCOUNT}/templates").mock(
|
|
return_value=httpx.Response(201, json={"templateId": DS_NEW_ID})
|
|
)
|
|
with (
|
|
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
|
|
patch.object(migrate_module, "_load_compose", return_value=_mock_compose),
|
|
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_ok),
|
|
):
|
|
resp = client.post(
|
|
"/api/migrate",
|
|
json={"source_template_ids": [ADOBE_ID]},
|
|
cookies={_COOKIE_NAME: _full_session()},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["results"][0]["status"] == "success"
|
|
|
|
def test_no_ids_returns_400(self):
|
|
resp = client.post(
|
|
"/api/migrate",
|
|
json={},
|
|
cookies={_COOKIE_NAME: _full_session()},
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
|
|
class TestValidationBlocking:
|
|
def test_blocked_template_not_uploaded(self):
|
|
"""Template with validation blockers → status=blocked, no upload."""
|
|
def _mock_validation_blocked(download_dir):
|
|
return {
|
|
"blockers": ["No documents attached"],
|
|
"warnings": [],
|
|
"has_blockers": True,
|
|
}
|
|
|
|
with (
|
|
patch.object(migrate_module, "_download_adobe_template", new=_async_wrap(_mock_download)),
|
|
patch.object(migrate_module, "_run_validation", side_effect=_mock_validation_blocked),
|
|
):
|
|
resp = client.post(
|
|
"/api/migrate",
|
|
json={"source_template_ids": [ADOBE_ID]},
|
|
cookies={_COOKIE_NAME: _full_session()},
|
|
)
|
|
|
|
assert resp.status_code == 200
|
|
result = resp.json()["results"][0]
|
|
assert result["status"] == "blocked"
|
|
assert "No documents" in result["error"]
|