adobe-to-docusign-migrator/src/docusign_auth.py

217 lines
6.4 KiB
Python

"""
docusign_auth.py
----------------
Handles DocuSign OAuth using the Authorization Code Grant.
Usage:
python3 src/docusign_auth.py --authorize # one-time browser login
python3 src/docusign_auth.py # print a fresh access token
Required .env keys:
DOCUSIGN_CLIENT_ID
DOCUSIGN_CLIENT_SECRET
DOCUSIGN_AUTH_SERVER
DOCUSIGN_REDIRECT_URI
DOCUSIGN_BASE_URL
Auto-written to .env after authorization:
DOCUSIGN_ACCESS_TOKEN
DOCUSIGN_REFRESH_TOKEN
DOCUSIGN_TOKEN_EXPIRY
"""
import argparse
import os
import sys
import time
import webbrowser
from urllib.parse import parse_qs, urlencode, urlparse
import requests
from dotenv import load_dotenv, set_key
load_dotenv()
ENV_FILE = os.path.join(os.path.dirname(__file__), "..", ".env")
TOKEN_EXPIRY_BUFFER = 120
DOCUSIGN_SCOPE = "signature"
def _required_env(name: str) -> str:
value = os.getenv(name)
if not value:
raise RuntimeError(f"{name} must be set in .env")
return value
def _auth_server() -> str:
return os.getenv("DOCUSIGN_AUTH_SERVER", "account-d.docusign.com")
def _redirect_uri() -> str:
return os.getenv("DOCUSIGN_REDIRECT_URI", "http://localhost:8000/api/auth/docusign/callback")
def _persist_token_data(token_data: dict) -> str:
access_token = token_data["access_token"]
refresh_token = token_data.get("refresh_token")
expiry = int(time.time()) + int(token_data.get("expires_in", 3600))
abs_env = os.path.abspath(ENV_FILE)
set_key(abs_env, "DOCUSIGN_ACCESS_TOKEN", access_token)
set_key(abs_env, "DOCUSIGN_TOKEN_EXPIRY", str(expiry))
os.environ["DOCUSIGN_ACCESS_TOKEN"] = access_token
os.environ["DOCUSIGN_TOKEN_EXPIRY"] = str(expiry)
if refresh_token:
set_key(abs_env, "DOCUSIGN_REFRESH_TOKEN", refresh_token)
os.environ["DOCUSIGN_REFRESH_TOKEN"] = refresh_token
return access_token
def build_authorization_url(state: str | None = None) -> str:
client_id = _required_env("DOCUSIGN_CLIENT_ID")
params = {
"response_type": "code",
"scope": DOCUSIGN_SCOPE,
"client_id": client_id,
"redirect_uri": _redirect_uri(),
}
if state:
params["state"] = state
return f"https://{_auth_server()}/oauth/auth?{urlencode(params)}"
def exchange_code_for_token(code: str) -> dict:
client_id = _required_env("DOCUSIGN_CLIENT_ID")
client_secret = _required_env("DOCUSIGN_CLIENT_SECRET")
resp = requests.post(
f"https://{_auth_server()}/oauth/token",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": _redirect_uri(),
},
auth=(client_id, client_secret),
)
resp.raise_for_status()
return resp.json()
def refresh_access_token(refresh_token: str | None = None) -> dict:
client_id = _required_env("DOCUSIGN_CLIENT_ID")
client_secret = _required_env("DOCUSIGN_CLIENT_SECRET")
refresh_token = refresh_token or os.getenv("DOCUSIGN_REFRESH_TOKEN")
if not refresh_token:
raise RuntimeError(
"No DocuSign refresh token found. Run: python3 src/docusign_auth.py --authorize"
)
resp = requests.post(
f"https://{_auth_server()}/oauth/token",
data={
"grant_type": "refresh_token",
"refresh_token": refresh_token,
},
auth=(client_id, client_secret),
)
resp.raise_for_status()
return resp.json()
def save_code_token_exchange(code: str) -> str:
token_data = exchange_code_for_token(code)
return _persist_token_data(token_data)
def session_from_token_data(token_data: dict, current_session: dict | None = None) -> dict:
"""
Merge DocuSign OAuth token data into a web-session dict without writing .env.
"""
session = dict(current_session or {})
session["docusign_access_token"] = token_data["access_token"]
session["docusign_token_expiry"] = int(time.time()) + int(token_data.get("expires_in", 3600))
session["docusign_auth_mode"] = "session_oauth"
if token_data.get("refresh_token"):
session["docusign_refresh_token"] = token_data["refresh_token"]
return session
def session_has_valid_access_token(session: dict) -> bool:
token = session.get("docusign_access_token")
expiry = session.get("docusign_token_expiry")
if not token or not expiry:
return False
try:
return int(time.time()) < int(expiry) - TOKEN_EXPIRY_BUFFER
except (TypeError, ValueError):
return False
def get_access_token() -> str:
"""Return a valid DocuSign access token using cached or refreshed OAuth tokens."""
cached_token = os.getenv("DOCUSIGN_ACCESS_TOKEN")
cached_expiry = os.getenv("DOCUSIGN_TOKEN_EXPIRY")
if cached_token and cached_expiry:
if int(time.time()) < int(cached_expiry) - TOKEN_EXPIRY_BUFFER:
return cached_token
token_data = refresh_access_token()
return _persist_token_data(token_data)
def run_authorize_flow():
url = build_authorization_url()
print("\nOpening browser for DocuSign authorization...")
print(f"\nIf the browser doesn't open, go to:\n{url}\n")
webbrowser.open(url)
print("Log in, approve access, then paste the full redirect URL here.\n")
redirected_url = input("Paste the redirect URL: ").strip()
parsed = urlparse(redirected_url)
params = parse_qs(parsed.query)
if "error" in params:
error = params.get("error_description", params.get("error", ["unknown"]))[0]
print(f"ERROR: {error}")
sys.exit(1)
code_list = params.get("code")
if not code_list:
print("ERROR: No authorization code found in the URL.")
sys.exit(1)
print("Exchanging code for token...")
save_code_token_exchange(code_list[0])
print("Authorization complete. Access and refresh tokens were saved to .env.")
def main():
parser = argparse.ArgumentParser(description="DocuSign authentication helper")
parser.add_argument(
"--authorize",
action="store_true",
help="Run the Auth Code Grant flow and store refresh credentials",
)
parser.add_argument(
"--consent",
action="store_true",
help="Backward-compatible alias for --authorize",
)
args = parser.parse_args()
if args.authorize or args.consent:
run_authorize_flow()
else:
token = get_access_token()
print(f"Access token: {token[:20]}... (valid for ~1 hour)")
if __name__ == "__main__":
main()