139 lines
4.6 KiB
Python
139 lines
4.6 KiB
Python
"""
|
|
Tests for Phase 12: security — log sanitization and audit trail.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
|
|
import pytest
|
|
|
|
from src.utils.log_sanitizer import (
|
|
SanitizingFilter,
|
|
install_sanitizing_filter,
|
|
redact,
|
|
redact_dict,
|
|
)
|
|
|
|
|
|
class TestRedact:
|
|
def test_bearer_token_redacted(self):
|
|
text = "Authorization: Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.abc.def"
|
|
result = redact(text)
|
|
assert "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9" not in result
|
|
assert "[REDACTED]" in result
|
|
|
|
def test_access_token_assignment_redacted(self):
|
|
text = 'access_token: "super_secret_value_12345"'
|
|
result = redact(text)
|
|
assert "super_secret_value_12345" not in result
|
|
assert "[REDACTED]" in result
|
|
|
|
def test_password_redacted(self):
|
|
text = "password=hunter2supersecure"
|
|
result = redact(text)
|
|
assert "hunter2supersecure" not in result
|
|
|
|
def test_safe_text_unchanged(self):
|
|
text = "Template migrated successfully: NDA v2"
|
|
result = redact(text)
|
|
assert result == text
|
|
|
|
def test_long_base64_redacted(self):
|
|
# Simulate a long PDF base64 payload being logged
|
|
b64 = "A" * 600
|
|
result = redact(b64)
|
|
assert "A" * 100 not in result
|
|
assert "[REDACTED]" in result
|
|
|
|
def test_short_base64_not_redacted(self):
|
|
# Short base64 (e.g. an ID) should not be redacted
|
|
short_b64 = "dGVzdA==" # "test" base64
|
|
result = redact(short_b64)
|
|
assert "dGVzdA" in result
|
|
|
|
|
|
class TestRedactDict:
|
|
def test_token_key_redacted(self):
|
|
d = {"access_token": "secret123", "name": "My Template"}
|
|
result = redact_dict(d)
|
|
assert result["access_token"] == "[REDACTED]"
|
|
assert result["name"] == "My Template"
|
|
|
|
def test_nested_dict_redacted(self):
|
|
d = {"auth": {"token": "secret123", "user": "alice"}}
|
|
result = redact_dict(d)
|
|
assert result["auth"]["token"] == "[REDACTED]"
|
|
assert result["auth"]["user"] == "alice"
|
|
|
|
def test_document_base64_redacted(self):
|
|
d = {"documentBase64": "A" * 200}
|
|
result = redact_dict(d)
|
|
assert result["documentBase64"] == "[REDACTED]"
|
|
|
|
def test_list_of_dicts_redacted(self):
|
|
d = {"items": [{"token": "abc123xyz", "id": "1"}]}
|
|
result = redact_dict(d)
|
|
assert result["items"][0]["token"] == "[REDACTED]"
|
|
assert result["items"][0]["id"] == "1"
|
|
|
|
def test_safe_dict_unchanged(self):
|
|
d = {"template_name": "NDA", "status": "success", "count": 3}
|
|
result = redact_dict(d)
|
|
assert result == d
|
|
|
|
|
|
class TestSanitizingFilter:
|
|
def test_filter_redacts_log_message(self):
|
|
record = logging.LogRecord(
|
|
name="test", level=logging.INFO,
|
|
pathname="", lineno=0,
|
|
msg="Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.payload.signature",
|
|
args=(), exc_info=None,
|
|
)
|
|
f = SanitizingFilter()
|
|
f.filter(record)
|
|
assert "eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9" not in record.msg
|
|
|
|
def test_filter_redacts_args(self):
|
|
record = logging.LogRecord(
|
|
name="test", level=logging.INFO,
|
|
pathname="", lineno=0,
|
|
msg="Token: %s",
|
|
args=("access_token=supersecretvalue123456",),
|
|
exc_info=None,
|
|
)
|
|
f = SanitizingFilter()
|
|
f.filter(record)
|
|
assert "supersecretvalue123456" not in str(record.args)
|
|
|
|
def test_install_sanitizing_filter_idempotent(self):
|
|
install_sanitizing_filter()
|
|
install_sanitizing_filter() # second call should not add duplicate
|
|
root = logging.getLogger()
|
|
sanitizing_filters = [f for f in root.filters if isinstance(f, SanitizingFilter)]
|
|
assert len(sanitizing_filters) == 1
|
|
# Clean up
|
|
for f in sanitizing_filters:
|
|
root.removeFilter(f)
|
|
|
|
|
|
class TestPdfChecksum:
|
|
def test_checksum_matches_content(self):
|
|
from src.services.mapping_service import adobe_folder_to_normalized
|
|
from pathlib import Path
|
|
|
|
downloads = Path(__file__).parent.parent / "downloads" / "David Tag Demo Form__CBJCHBCA"
|
|
if not downloads.exists():
|
|
pytest.skip("Downloads fixtures not present")
|
|
|
|
norm, _ = adobe_folder_to_normalized(str(downloads))
|
|
assert norm.documents, "Expected at least one document"
|
|
|
|
doc = norm.documents[0]
|
|
# Recompute checksum from source path to verify
|
|
import base64
|
|
pdf_bytes = Path(doc.source_path).read_bytes()
|
|
expected_checksum = hashlib.sha256(pdf_bytes).hexdigest()
|
|
assert doc.checksum_sha256 == expected_checksum
|