DEV Community

Cover image for Why We Need Cryptographic Proof of What AI Refused to Generate: Building Verifiable Refusal Provenance with CAP-SRP

Why We Need Cryptographic Proof of What AI Refused to Generate: Building Verifiable Refusal Provenance with CAP-SRP

TL;DR

Every major AI provider claims their safety systems work. None can prove it. When xAI's Grok generated millions of non-consensual intimate images in January 2026, regulators discovered they had no way to independently verify whether safety measures were functioning — before, during, or after the crisis.

CAP-SRP (Content/Creative AI Profile – Safe Refusal Provenance) is an open specification that fixes this with one core idea: every AI generation attempt gets a cryptographic receipt showing what happened — content generated, request denied, or system error. No gaps. No tampering. Externally verifiable.

This post covers:

  • Why the "Trust Us" model failed catastrophically
  • The Completeness Invariant: GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
  • Full Python implementation of cryptographic audit trails
  • How hash chains, Merkle trees, and Ed25519 signatures work together
  • Regulatory deadlines that make this urgent (EU AI Act: August 2, 2026)
  • How to integrate refusal provenance into your AI pipeline

GitHub: github.com/veritaschain/cap-spec


The Day "Trust Us" Died

On Christmas Day 2025, xAI rolled out image generation capabilities for Grok on X (formerly Twitter). Within days, users discovered they could generate non-consensual intimate images of real people — including public figures, private individuals, and minors.

The numbers were staggering:

  • ~4.4 million images generated in 9 days
  • 41%+ were sexualized images of women
  • 2% appeared to depict minors
  • Researchers documented 90 harmful images generated in under 5 minutes

The AI Forensics nonprofit found that over half of all Grok-generated images contained individuals in minimal attire. Competing systems from OpenAI, Google, and Meta refused all identical prompts. The Future of Life Institute had already rated xAI's safety practices as "F" — the lowest among all major AI providers.

But here's what made this a systemic failure, not just an xAI failure:

When xAI claimed they fixed it, nobody could verify the claim.

xAI restricted image generation to paid subscribers. They blocked editing real people in revealing clothing. They implemented geo-specific restrictions. But regulators, researchers, and victims had no independent way to confirm:

  1. Were the fixes actually working?
  2. How many harmful requests were still getting through?
  3. Had the safety systems been functioning at all before the crisis?
  4. Were the internal logs complete and unmodified?

This is the negative evidence problem: proving that something didn't happen is fundamentally harder than proving it did. C2PA can prove "this image was AI-generated." But nothing could prove "this harmful request was blocked."

The Regulatory Response: Everyone Is Angry, Nobody Can Verify

As of today (February 17, 2026), the global regulatory response has been unprecedented — and it keeps escalating:

  • Ireland DPC (yesterday): Opened a "large-scale inquiry" into X under GDPR, examining whether personal data processing in Grok's image generation complied with fundamental EU data protection obligations. Potential fines: 4% of global revenue.

  • European Commission: Formal DSA investigation opened January 26, 2026. Potential fines: 6% of global revenue (~$174 million).

  • UK: ICO opened formal investigations (February 3). Ofcom launched Online Safety Act investigation (January 12). Potential fines: £18 million or 10% global revenue.

  • France: Paris prosecutors raided X's offices (February 3). Criminal investigation covering seven offenses including complicity in organized distribution of child pornography. Musk summoned for questioning April 20, 2026.

  • US: 35 state attorneys general signed a joint demand letter. California AG issued cease-and-desist.

  • Asia: Indonesia blocked Grok entirely. Malaysia imposed temporary block. India's MeitY threatened loss of safe harbor protections.

Human Rights Watch published a comprehensive report calling Grok's harms "foreseeable" and demanding technical safeguards, transparency, and strict audits. But crucially, even HRW's demands couldn't fully articulate how to verify that safeguards work — because the verification infrastructure doesn't exist yet.

Every single one of these investigations faces the same problem: they must rely on X's internal logs and X's self-reported compliance data. There is no external, cryptographically verifiable audit trail. Regulators are essentially forced to trust the very entity they're investigating.


The Core Insight: Log Everything Before the Safety Check

CAP-SRP's foundational architectural insight is deceptively simple:

Log the generation attempt BEFORE the safety evaluation runs.

This creates an unforgeable commitment that a request existed — regardless of whether it's subsequently approved, denied, or errors out. If you only log after the safety check, a malicious (or negligent) operator can selectively omit entries.

This leads to the Completeness Invariant:

GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR
Enter fullscreen mode Exit fullscreen mode

For any time window, the count of generation attempts MUST exactly equal the sum of all outcomes. If the equation doesn't balance, the audit trail is provably invalid.

Think of it like double-entry bookkeeping for AI safety. Every debit (attempt) must have a corresponding credit (outcome). If the books don't balance, someone is cooking them.

┌─────────────┐     ┌──────────────┐     ┌──────────────────┐
│  User sends  │────▶│ GEN_ATTEMPT  │────▶│  Safety Filter    │
│  prompt      │     │ logged FIRST │     │  evaluates prompt │
└─────────────┘     └──────────────┘     └────────┬─────────┘
                                                   │
                              ┌─────────────────────┼─────────────────┐
                              │                     │                 │
                              ▼                     ▼                 ▼
                       ┌────────────┐      ┌──────────────┐  ┌─────────────┐
                       │    GEN     │      │   GEN_DENY   │  │  GEN_ERROR  │
                       │ (approved) │      │  (rejected)  │  │  (failure)  │
                       └────────────┘      └──────────────┘  └─────────────┘
Enter fullscreen mode Exit fullscreen mode

The key: GEN_ATTEMPT is immutably recorded before the system decides what to do. Even if the safety filter crashes, the attempt exists in the log. Even if someone tries to hide that a request got through, the orphaned attempt is evidence.


Let's Build It: Full Python Implementation

Enough theory. Let's write code. We'll build a complete CAP-SRP audit trail system from scratch using Ed25519 signatures, SHA-256 hash chains, and Merkle trees.

Prerequisites

pip install cryptography
Enter fullscreen mode Exit fullscreen mode

Step 1: Core Event Model

Every event in CAP-SRP follows a structured schema. Let's define it:

"""
cap_srp_core.py — Core CAP-SRP Event Model and Cryptographic Primitives

Implements the event model from CAP-SRP Specification v1.0
https://github.com/veritaschain/cap-spec
"""

import hashlib
import json
import uuid
import time
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import Optional, Literal
from enum import Enum


class EventType(str, Enum):
    """CAP-SRP event types per specification §6."""
    GEN_ATTEMPT = "GEN_ATTEMPT"    # Receipt of generation request
    GEN = "GEN"                     # Successful content generation
    GEN_DENY = "GEN_DENY"          # Refusal to generate
    GEN_ERROR = "GEN_ERROR"        # System failure during generation


class RiskCategory(str, Enum):
    """Standardized risk categories per specification §7.3."""
    NCII = "NCII"                               # Non-consensual intimate imagery
    CSAM = "CSAM"                               # Child sexual abuse material
    UNAUTHORIZED_LIKENESS = "UNAUTHORIZED_LIKENESS"
    VIOLENCE_EXTREME = "VIOLENCE_EXTREME"
    HATE_CONTENT = "HATE_CONTENT"
    TERRORIST_CONTENT = "TERRORIST_CONTENT"
    SELF_HARM_PROMOTION = "SELF_HARM_PROMOTION"
    COPYRIGHT_VIOLATION = "COPYRIGHT_VIOLATION"
    OTHER = "OTHER"


class ModelDecision(str, Enum):
    """Model decision types per specification §7.4."""
    DENY = "DENY"
    WARN = "WARN"
    ESCALATE = "ESCALATE"
    QUARANTINE = "QUARANTINE"


def generate_event_id() -> str:
    """Generate UUIDv7 for event identification (time-ordered)."""
    return str(uuid.uuid7())


def hash_prompt(prompt: str) -> str:
    """
    Hash prompt for privacy-preserving storage.

    Original prompts are NEVER stored — only their SHA-256 hash.
    This allows verification ("was this exact prompt denied?") 
    without exposing potentially harmful content.
    """
    return f"sha256:{hashlib.sha256(prompt.encode('utf-8')).hexdigest()}"


def hash_actor(actor_id: str) -> str:
    """Hash actor identifier for GDPR-compatible storage."""
    return f"sha256:{hashlib.sha256(actor_id.encode('utf-8')).hexdigest()}"


@dataclass
class CAPEvent:
    """
    A single event in the CAP-SRP audit trail.

    This is the atomic unit of the system. Every generation attempt,
    every refusal, every successful generation, and every error 
    produces exactly one CAPEvent.
    """
    event_type: EventType
    event_id: str = field(default_factory=generate_event_id)
    timestamp: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )

    # Chain linkage
    prev_hash: str = ""
    event_hash: str = ""
    signature: str = ""

    # Content (privacy-preserving)
    prompt_hash: Optional[str] = None
    actor_hash: Optional[str] = None

    # SRP-specific fields
    attempt_id: Optional[str] = None  # Links outcomes back to attempts
    risk_category: Optional[str] = None
    risk_score: Optional[float] = None
    model_decision: Optional[str] = None
    policy_version: Optional[str] = None
    model_id: Optional[str] = None

    # Output reference (for GEN events only)
    output_hash: Optional[str] = None

    def to_dict(self) -> dict:
        """Convert to dictionary, excluding None values and signature."""
        d = {}
        for k, v in asdict(self).items():
            if v is not None and k != "signature":
                d[k] = v
        return d
Enter fullscreen mode Exit fullscreen mode

Step 2: Cryptographic Hash Chain

Events are linked in a tamper-evident chain. Modifying any event invalidates all subsequent hashes:

"""
hash_chain.py — SHA-256 Hash Chain for Tamper-Evident Event Linking

If you change one event, every subsequent hash breaks.
This is your "flight recorder" — append-only, tamper-evident.
"""

import hashlib
import json


def canonicalize(event_dict: dict) -> str:
    """
    Canonicalize event for deterministic hashing.

    Uses RFC 8785 (JSON Canonicalization Scheme) approach:
    sorted keys, no whitespace, consistent encoding.
    """
    return json.dumps(event_dict, sort_keys=True, separators=(',', ':'))


def compute_event_hash(event: 'CAPEvent') -> str:
    """
    Compute SHA-256 hash of canonicalized event data.

    The signature field is excluded before hashing — 
    you sign the hash, not hash the signature.
    """
    # Get dict without signature
    event_dict = event.to_dict()
    event_dict.pop("signature", None)
    event_dict.pop("event_hash", None)

    canonical = canonicalize(event_dict)
    hash_bytes = hashlib.sha256(canonical.encode('utf-8')).digest()
    return f"sha256:{hash_bytes.hex()}"


class HashChain:
    """
    Append-only hash chain of CAP-SRP events.

    Each event's hash includes the previous event's hash,
    creating a tamper-evident linked structure.

    Modifying event[i] invalidates hash[i], which invalidates
    hash[i+1], which invalidates hash[i+2], ... all the way 
    to the chain tip.
    """

    def __init__(self):
        self.events: list['CAPEvent'] = []
        self.genesis_hash = "sha256:" + "0" * 64  # Genesis marker

    def append(self, event: 'CAPEvent') -> 'CAPEvent':
        """Append event to chain, computing hash linkage."""
        # Link to previous event
        if len(self.events) == 0:
            event.prev_hash = self.genesis_hash
        else:
            event.prev_hash = self.events[-1].event_hash

        # Compute this event's hash (includes prev_hash)
        event.event_hash = compute_event_hash(event)

        self.events.append(event)
        return event

    def verify_integrity(self) -> dict:
        """
        Verify the entire chain is intact.

        Returns detailed results showing exactly where 
        any tampering occurred.
        """
        results = {
            "valid": True,
            "total_events": len(self.events),
            "errors": []
        }

        for i, event in enumerate(self.events):
            # 1. Verify hash computation
            computed = compute_event_hash(event)
            if event.event_hash != computed:
                results["valid"] = False
                results["errors"].append({
                    "index": i,
                    "event_id": event.event_id,
                    "error": "HASH_MISMATCH",
                    "expected": computed,
                    "actual": event.event_hash
                })

            # 2. Verify chain linkage
            if i == 0:
                expected_prev = self.genesis_hash
            else:
                expected_prev = self.events[i - 1].event_hash

            if event.prev_hash != expected_prev:
                results["valid"] = False
                results["errors"].append({
                    "index": i,
                    "event_id": event.event_id,
                    "error": "CHAIN_BREAK",
                    "expected_prev": expected_prev,
                    "actual_prev": event.prev_hash
                })

        return results
Enter fullscreen mode Exit fullscreen mode

Step 3: Ed25519 Digital Signatures

Every event is cryptographically signed so you can prove who recorded it and that it hasn't been modified:

"""
signing.py — Ed25519 Digital Signatures for Event Authentication

Ed25519 (RFC 8032) provides:
- 128-bit security level
- Deterministic signatures (no random nonce = no nonce reuse attacks)  
- Fast verification (~70μs per signature)
- Small signatures (64 bytes)
"""

import base64
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
    Ed25519PrivateKey, Ed25519PublicKey
)
from cryptography.exceptions import InvalidSignature


class EventSigner:
    """Signs and verifies CAP-SRP events using Ed25519."""

    def __init__(self):
        self._private_key = Ed25519PrivateKey.generate()
        self.public_key = self._private_key.public_key()

    @classmethod
    def from_private_key(cls, private_key: Ed25519PrivateKey) -> 'EventSigner':
        """Create signer from existing private key."""
        instance = cls.__new__(cls)
        instance._private_key = private_key
        instance.public_key = private_key.public_key()
        return instance

    def sign_event(self, event: 'CAPEvent') -> str:
        """
        Sign event hash with Ed25519.

        We sign the event_hash, not the raw event data.
        This is important: the hash is computed from the 
        canonicalized event, ensuring deterministic input.
        """
        if not event.event_hash:
            raise ValueError("Event must have hash before signing")

        # Extract raw hash bytes (remove "sha256:" prefix)
        hash_bytes = bytes.fromhex(event.event_hash[7:])

        # Sign
        signature = self._private_key.sign(hash_bytes)
        sig_str = f"ed25519:{base64.b64encode(signature).decode()}"

        event.signature = sig_str
        return sig_str

    def verify_event(self, event: 'CAPEvent') -> bool:
        """Verify event signature."""
        if not event.signature or not event.signature.startswith("ed25519:"):
            return False

        try:
            signature = base64.b64decode(event.signature[8:])
            hash_bytes = bytes.fromhex(event.event_hash[7:])
            self.public_key.verify(signature, hash_bytes)
            return True
        except (InvalidSignature, Exception):
            return False


def verify_event_with_public_key(
    event: 'CAPEvent', 
    public_key: Ed25519PublicKey
) -> bool:
    """
    Standalone verification using only a public key.

    This is what auditors and regulators use — they don't need
    the private key, just the published public key.
    """
    if not event.signature or not event.signature.startswith("ed25519:"):
        return False

    try:
        signature = base64.b64decode(event.signature[8:])
        hash_bytes = bytes.fromhex(event.event_hash[7:])
        public_key.verify(signature, hash_bytes)
        return True
    except (InvalidSignature, Exception):
        return False
Enter fullscreen mode Exit fullscreen mode

Step 4: The Completeness Invariant Verifier

This is the heart of CAP-SRP. The verifier mathematically proves that every generation attempt has exactly one recorded outcome:

"""
completeness.py — Completeness Invariant Verification

The Completeness Invariant is the mathematical core of CAP-SRP:

    ∑ GEN_ATTEMPT = ∑ GEN + ∑ GEN_DENY + ∑ GEN_ERROR

If this equation doesn't balance, the audit trail is PROVABLY INVALID.

Think of it as double-entry bookkeeping:
- Every ATTEMPT is a debit
- Every OUTCOME (GEN/GEN_DENY/GEN_ERROR) is a credit
- The books MUST balance
"""

from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Tuple, Set


@dataclass
class CompletenessResult:
    """Result of Completeness Invariant verification."""
    valid: bool
    total_attempts: int = 0
    total_gen: int = 0
    total_deny: int = 0
    total_error: int = 0

    # Violations
    unmatched_attempts: list = None  # ATTEMPTs without OUTCOMEs
    orphan_outcomes: list = None     # OUTCOMEs without ATTEMPTs
    duplicate_outcomes: list = None  # Multiple OUTCOMEs per ATTEMPT

    error_message: str = ""

    def __post_init__(self):
        self.unmatched_attempts = self.unmatched_attempts or []
        self.orphan_outcomes = self.orphan_outcomes or []
        self.duplicate_outcomes = self.duplicate_outcomes or []

    @property
    def refusal_rate(self) -> float:
        """Calculate refusal rate as percentage."""
        if self.total_attempts == 0:
            return 0.0
        return (self.total_deny / self.total_attempts) * 100

    def summary(self) -> str:
        """Human-readable summary for regulatory reports."""
        status = "✅ VALID" if self.valid else "❌ INVALID"
        lines = [
            f"Completeness Invariant: {status}",
            f"",
            f"  Total Attempts:  {self.total_attempts}",
            f"  ├─ Generated:    {self.total_gen}",
            f"  ├─ Denied:       {self.total_deny}",
            f"  └─ Errors:       {self.total_error}",
            f"  Sum of Outcomes: {self.total_gen + self.total_deny + self.total_error}",
            f"",
            f"  Refusal Rate:    {self.refusal_rate:.1f}%",
        ]

        if not self.valid:
            lines.append(f"")
            lines.append(f"  ⚠️  VIOLATIONS DETECTED:")
            if self.unmatched_attempts:
                lines.append(
                    f"    Unmatched attempts (hiding results?): "
                    f"{len(self.unmatched_attempts)}"
                )
            if self.orphan_outcomes:
                lines.append(
                    f"    Orphan outcomes (fabricated refusals?): "
                    f"{len(self.orphan_outcomes)}"
                )
            if self.duplicate_outcomes:
                lines.append(
                    f"    Duplicate outcomes (data integrity failure): "
                    f"{len(self.duplicate_outcomes)}"
                )

        return "\n".join(lines)


def verify_completeness(
    events: List['CAPEvent'],
    time_window: Optional[Tuple[datetime, datetime]] = None
) -> CompletenessResult:
    """
    Verify Completeness Invariant for a set of events.

    This is O(n) time and O(n) space — fast enough 
    for real-time compliance monitoring on high-volume systems.

    Args:
        events: List of CAPEvent objects
        time_window: Optional (start, end) to filter events

    Returns:
        CompletenessResult with detailed violation information
    """
    # Filter by time window if specified
    if time_window:
        start, end = time_window
        filtered = [
            e for e in events
            if start <= datetime.fromisoformat(e.timestamp) <= end
        ]
    else:
        filtered = events

    # Separate attempts and outcomes
    attempts = {}   # attempt_id -> event
    outcomes = []   # list of outcome events

    for event in filtered:
        if event.event_type == EventType.GEN_ATTEMPT:
            attempts[event.event_id] = event
        elif event.event_type in (
            EventType.GEN, EventType.GEN_DENY, EventType.GEN_ERROR
        ):
            outcomes.append(event)

    # Match outcomes to attempts
    matched_attempts: Set[str] = set()
    orphan_outcomes = []
    duplicate_outcomes = []

    gen_count = 0
    deny_count = 0
    error_count = 0

    for outcome in outcomes:
        attempt_id = outcome.attempt_id

        # Count by type
        if outcome.event_type == EventType.GEN:
            gen_count += 1
        elif outcome.event_type == EventType.GEN_DENY:
            deny_count += 1
        elif outcome.event_type == EventType.GEN_ERROR:
            error_count += 1

        # Check linkage
        if attempt_id not in attempts:
            orphan_outcomes.append(outcome.event_id)
        elif attempt_id in matched_attempts:
            duplicate_outcomes.append(outcome.event_id)
        else:
            matched_attempts.add(attempt_id)

    # Find unmatched attempts
    unmatched = [
        aid for aid in attempts 
        if aid not in matched_attempts
    ]

    # The invariant holds iff:
    # 1. Every attempt has exactly one outcome
    # 2. Every outcome links to a valid attempt
    # 3. No duplicates
    is_valid = (
        len(unmatched) == 0 
        and len(orphan_outcomes) == 0 
        and len(duplicate_outcomes) == 0
    )

    return CompletenessResult(
        valid=is_valid,
        total_attempts=len(attempts),
        total_gen=gen_count,
        total_deny=deny_count,
        total_error=error_count,
        unmatched_attempts=unmatched,
        orphan_outcomes=orphan_outcomes,
        duplicate_outcomes=duplicate_outcomes,
    )
Enter fullscreen mode Exit fullscreen mode

Step 5: Merkle Tree for Efficient Verification

Merkle trees allow auditors to verify individual events without downloading the entire chain:

"""
merkle.py — Merkle Tree for Batch Verification and External Anchoring

The Merkle root is what gets anchored externally (RFC 3161 TSA, SCITT, 
or blockchain). This provides independent timestamp verification and 
prevents backdating.

Auditors can verify a single event's inclusion using a logarithmic-size
proof — they don't need the entire event history.
"""

import hashlib
from typing import List, Tuple
from dataclasses import dataclass


@dataclass
class MerkleProof:
    """Inclusion proof for a specific event in a Merkle tree."""
    event_hash: str
    proof_path: List[Tuple[str, str]]  # (sibling_hash, "left"|"right")
    root: str
    event_index: int
    tree_size: int


def sha256_combine(left: str, right: str) -> str:
    """Combine two hashes into a parent node."""
    # Strip "sha256:" prefix for computation
    left_bytes = bytes.fromhex(left.replace("sha256:", ""))
    right_bytes = bytes.fromhex(right.replace("sha256:", ""))
    combined = hashlib.sha256(left_bytes + right_bytes).hexdigest()
    return f"sha256:{combined}"


class MerkleTree:
    """
    Binary Merkle tree built from CAP-SRP event hashes.

    Structure (4 events):

                      Root
                    /      \\
                H01          H23
               /   \\       /   \\
            H(E0)  H(E1)  H(E2)  H(E3)

    The root hash is what we anchor externally. If any 
    leaf changes, the root changes — making tampering 
    detectable by anyone who knows the anchored root.
    """

    def __init__(self, event_hashes: List[str]):
        if not event_hashes:
            raise ValueError("Cannot build tree from empty list")

        self.leaves = list(event_hashes)
        self.layers: List[List[str]] = []
        self._build()

    def _build(self):
        """Build tree bottom-up."""
        # Start with leaves
        current_layer = list(self.leaves)

        # Pad to power of 2 with duplicate of last element
        while len(current_layer) & (len(current_layer) - 1) != 0:
            current_layer.append(current_layer[-1])

        self.layers.append(current_layer)

        # Build each level until we reach the root
        while len(current_layer) > 1:
            next_layer = []
            for i in range(0, len(current_layer), 2):
                parent = sha256_combine(
                    current_layer[i], 
                    current_layer[i + 1]
                )
                next_layer.append(parent)
            self.layers.append(next_layer)
            current_layer = next_layer

    @property
    def root(self) -> str:
        """The Merkle root — anchor this externally."""
        return self.layers[-1][0]

    def generate_proof(self, index: int) -> MerkleProof:
        """
        Generate inclusion proof for event at given index.

        The proof consists of sibling hashes at each tree level.
        An auditor can recompute the root from just the event hash
        and this proof — without seeing any other events.
        """
        if index >= len(self.leaves):
            raise IndexError(f"Index {index} out of range")

        proof_path = []
        current_index = index

        for layer in self.layers[:-1]:  # Skip root layer
            # Determine sibling
            if current_index % 2 == 0:
                sibling_index = current_index + 1
                direction = "right"
            else:
                sibling_index = current_index - 1
                direction = "left"

            if sibling_index < len(layer):
                proof_path.append((layer[sibling_index], direction))

            current_index //= 2

        return MerkleProof(
            event_hash=self.leaves[index],
            proof_path=proof_path,
            root=self.root,
            event_index=index,
            tree_size=len(self.leaves)
        )

    @staticmethod
    def verify_proof(proof: MerkleProof) -> bool:
        """
        Verify a Merkle inclusion proof.

        This is what regulators run. They need:
        1. The event hash they want to verify
        2. The proof path (sibling hashes)
        3. The anchored root (from RFC 3161 TSA)

        If verification passes, the event was in the original batch
        when it was anchored. No tampering possible.
        """
        current = proof.event_hash

        for sibling_hash, direction in proof.proof_path:
            if direction == "left":
                current = sha256_combine(sibling_hash, current)
            else:
                current = sha256_combine(current, sibling_hash)

        return current == proof.root
Enter fullscreen mode Exit fullscreen mode

Step 6: Putting It All Together — The Audit Trail

Now let's combine everything into a complete audit trail system:

"""
audit_trail.py — Complete CAP-SRP Audit Trail System

This ties together:
- Event creation with privacy-preserving hashing
- Hash chain for tamper evidence
- Ed25519 signatures for authentication
- Completeness Invariant verification
- Merkle tree construction for external anchoring
"""

from cap_srp_core import (
    CAPEvent, EventType, RiskCategory, 
    ModelDecision, hash_prompt, hash_actor
)
from hash_chain import HashChain
from signing import EventSigner
from completeness import verify_completeness
from merkle import MerkleTree


class CAPSRPAuditTrail:
    """
    Complete CAP-SRP audit trail with cryptographic guarantees.

    Usage:
        trail = CAPSRPAuditTrail()

        # When a request comes in:
        attempt = trail.log_attempt(prompt, user_id)

        # After safety evaluation:
        if safe:
            trail.log_generation(attempt.event_id, output_hash)
        else:
            trail.log_denial(attempt.event_id, risk_category, score)
    """

    def __init__(self, model_id: str = "default", policy_version: str = "1.0"):
        self.chain = HashChain()
        self.signer = EventSigner()
        self.model_id = model_id
        self.policy_version = policy_version

    def log_attempt(
        self, 
        prompt: str, 
        actor_id: str,
        input_type: str = "text"
    ) -> CAPEvent:
        """
        Log a generation attempt BEFORE safety evaluation.

        This MUST be called before any content filtering runs.
        The attempt_id returned is used to link the outcome.

        Privacy: original prompt is hashed, never stored.
        """
        event = CAPEvent(
            event_type=EventType.GEN_ATTEMPT,
            prompt_hash=hash_prompt(prompt),
            actor_hash=hash_actor(actor_id),
            model_id=self.model_id,
            policy_version=self.policy_version,
        )

        # Add to chain (computes hash linkage)
        self.chain.append(event)

        # Sign
        self.signer.sign_event(event)

        return event

    def log_denial(
        self,
        attempt_id: str,
        risk_category: RiskCategory,
        risk_score: float,
        model_decision: ModelDecision = ModelDecision.DENY
    ) -> CAPEvent:
        """
        Log a content generation refusal.

        This is the "Safe Refusal Provenance" — cryptographic 
        proof that a specific request was denied, with the 
        risk category and score that triggered the denial.
        """
        event = CAPEvent(
            event_type=EventType.GEN_DENY,
            attempt_id=attempt_id,
            risk_category=risk_category.value,
            risk_score=risk_score,
            model_decision=model_decision.value,
            model_id=self.model_id,
            policy_version=self.policy_version,
        )

        self.chain.append(event)
        self.signer.sign_event(event)

        return event

    def log_generation(
        self,
        attempt_id: str,
        output_hash: str
    ) -> CAPEvent:
        """Log successful content generation."""
        event = CAPEvent(
            event_type=EventType.GEN,
            attempt_id=attempt_id,
            output_hash=output_hash,
            model_id=self.model_id,
            policy_version=self.policy_version,
        )

        self.chain.append(event)
        self.signer.sign_event(event)

        return event

    def log_error(
        self,
        attempt_id: str,
        error_detail: str = ""
    ) -> CAPEvent:
        """Log system error during generation."""
        event = CAPEvent(
            event_type=EventType.GEN_ERROR,
            attempt_id=attempt_id,
            model_id=self.model_id,
            policy_version=self.policy_version,
        )

        self.chain.append(event)
        self.signer.sign_event(event)

        return event

    def verify(self) -> dict:
        """
        Run full verification suite.

        Returns a comprehensive report covering:
        1. Hash chain integrity (tamper evidence)
        2. Signature validity (authentication)
        3. Completeness Invariant (no missing/fabricated events)
        """
        # 1. Chain integrity
        chain_result = self.chain.verify_integrity()

        # 2. Signature verification
        sig_results = []
        for event in self.chain.events:
            sig_valid = self.signer.verify_event(event)
            if not sig_valid:
                sig_results.append({
                    "event_id": event.event_id,
                    "valid": False
                })

        # 3. Completeness Invariant
        completeness = verify_completeness(self.chain.events)

        return {
            "chain_integrity": chain_result,
            "signature_failures": sig_results,
            "completeness": completeness,
            "overall_valid": (
                chain_result["valid"]
                and len(sig_results) == 0
                and completeness.valid
            )
        }

    def build_merkle_tree(self) -> MerkleTree:
        """Build Merkle tree from all events for external anchoring."""
        hashes = [e.event_hash for e in self.chain.events]
        return MerkleTree(hashes)

    def get_statistics(self) -> dict:
        """
        Get aggregate statistics for regulatory reporting.

        This is what goes on the dashboard that regulators see.
        No raw prompts, no user data — just counts and rates.
        """
        completeness = verify_completeness(self.chain.events)

        # Count denials by risk category
        deny_by_category = {}
        for event in self.chain.events:
            if event.event_type == EventType.GEN_DENY:
                cat = event.risk_category or "UNKNOWN"
                deny_by_category[cat] = deny_by_category.get(cat, 0) + 1

        return {
            "total_attempts": completeness.total_attempts,
            "total_generated": completeness.total_gen,
            "total_denied": completeness.total_deny,
            "total_errors": completeness.total_error,
            "refusal_rate_percent": completeness.refusal_rate,
            "denials_by_category": deny_by_category,
            "invariant_valid": completeness.valid,
            "chain_length": len(self.chain.events),
        }
Enter fullscreen mode Exit fullscreen mode

Step 7: Demo — The Grok Counterfactual

Let's simulate what would have been visible to regulators if Grok had implemented CAP-SRP:

"""
demo_grok_counterfactual.py

What regulators COULD have seen if Grok had CAP-SRP.

Instead of "trust us, we fixed it," regulators would have had:
- Real-time refusal rates by category
- Cryptographic proof the invariant held (or didn't)
- Tamper-evident audit trail they could verify independently
"""

import random
from audit_trail import CAPSRPAuditTrail
from cap_srp_core import RiskCategory, ModelDecision

def simulate_grok_with_cap_srp():
    trail = CAPSRPAuditTrail(
        model_id="grok-2-image-gen",
        policy_version="2025.12.25-initial"
    )

    # ─── Simulate Dec 25-Jan 5: Broken safety filters ───
    print("=" * 60)
    print("Phase 1: Dec 25 - Jan 5 (broken safety filters)")
    print("=" * 60)

    harmful_prompts = [
        ("Generate image of [person] without clothes", RiskCategory.NCII),
        ("Show [celebrity] in transparent bikini", RiskCategory.NCII),
        ("Create image of [minor] in swimwear", RiskCategory.CSAM),
        ("Make [politician] nude photo", RiskCategory.UNAUTHORIZED_LIKENESS),
    ]

    safe_prompts = [
        "Generate a landscape painting of Mount Fuji",
        "Create an illustration of a cat wearing a hat",
        "Draw a futuristic cityscape",
    ]

    # Simulate 1000 requests, 40% harmful (matching real statistics)
    for i in range(1000):
        if random.random() < 0.40:
            # Harmful request — with broken filters, most get through
            prompt_text, category = random.choice(harmful_prompts)
            attempt = trail.log_attempt(prompt_text, f"user_{i % 100}")

            if random.random() < 0.05:  # Only 5% caught (broken filters!)
                trail.log_denial(
                    attempt.event_id, category, 
                    risk_score=0.9, model_decision=ModelDecision.DENY
                )
            else:
                # Harmful content gets generated — this is the crisis
                trail.log_generation(
                    attempt.event_id,
                    output_hash=f"sha256:{'a' * 64}"
                )
        else:
            # Safe request
            prompt_text = random.choice(safe_prompts)
            attempt = trail.log_attempt(prompt_text, f"user_{i % 100}")
            trail.log_generation(
                attempt.event_id,
                output_hash=f"sha256:{'b' * 64}"
            )

    # What regulators see:
    stats_phase1 = trail.get_statistics()
    print(f"\n📊 Statistics (Phase 1 — broken filters):")
    print(f"   Total attempts:      {stats_phase1['total_attempts']}")
    print(f"   Generated:           {stats_phase1['total_generated']}")
    print(f"   Denied:              {stats_phase1['total_denied']}")
    print(f"   Refusal rate:        {stats_phase1['refusal_rate_percent']:.1f}%")
    print(f"   NCII denials:        {stats_phase1['denials_by_category'].get('NCII', 0)}")
    print(f"   CSAM denials:        {stats_phase1['denials_by_category'].get('CSAM', 0)}")
    print(f"   Invariant valid:     {stats_phase1['invariant_valid']}")

    # ─── Simulate Jan 6+: After "fix" ───
    print(f"\n{'=' * 60}")
    print("Phase 2: Jan 6+ (after claimed fix)")
    print("=" * 60)

    trail_fixed = CAPSRPAuditTrail(
        model_id="grok-2-image-gen",
        policy_version="2026.01.06-hotfix"  # Note: policy version changed
    )

    for i in range(1000):
        if random.random() < 0.40:
            prompt_text, category = random.choice(harmful_prompts)
            attempt = trail_fixed.log_attempt(prompt_text, f"user_{i % 100}")

            if random.random() < 0.95:  # 95% now caught (fix working!)
                trail_fixed.log_denial(
                    attempt.event_id, category,
                    risk_score=0.95, model_decision=ModelDecision.DENY
                )
            else:
                trail_fixed.log_generation(
                    attempt.event_id,
                    output_hash=f"sha256:{'c' * 64}"
                )
        else:
            prompt_text = random.choice(safe_prompts)
            attempt = trail_fixed.log_attempt(prompt_text, f"user_{i % 100}")
            trail_fixed.log_generation(
                attempt.event_id,
                output_hash=f"sha256:{'d' * 64}"
            )

    stats_phase2 = trail_fixed.get_statistics()
    print(f"\n📊 Statistics (Phase 2 — after fix):")
    print(f"   Total attempts:      {stats_phase2['total_attempts']}")
    print(f"   Generated:           {stats_phase2['total_generated']}")
    print(f"   Denied:              {stats_phase2['total_denied']}")
    print(f"   Refusal rate:        {stats_phase2['refusal_rate_percent']:.1f}%")
    print(f"   NCII denials:        {stats_phase2['denials_by_category'].get('NCII', 0)}")
    print(f"   CSAM denials:        {stats_phase2['denials_by_category'].get('CSAM', 0)}")
    print(f"   Invariant valid:     {stats_phase2['invariant_valid']}")

    # ─── Full verification ───
    print(f"\n{'=' * 60}")
    print("Full Cryptographic Verification")
    print("=" * 60)

    verification = trail_fixed.verify()
    print(f"\n🔐 Chain integrity:     {'' if verification['chain_integrity']['valid'] else ''}")
    print(f"🔐 All signatures OK:   {'' if len(verification['signature_failures']) == 0 else ''}")
    print(f"🔐 Completeness holds:  {'' if verification['completeness'].valid else ''}")
    print(f"🔐 Overall verdict:     {'✅ VALID' if verification['overall_valid'] else '❌ INVALID'}")

    # ─── Merkle tree for external anchoring ───
    tree = trail_fixed.build_merkle_tree()
    print(f"\n🌲 Merkle root (anchor this to RFC 3161 TSA):")
    print(f"   {tree.root}")

    # Generate and verify a proof for a specific denial event
    deny_events = [
        (i, e) for i, e in enumerate(trail_fixed.chain.events)
        if e.event_type.value == "GEN_DENY"
    ]
    if deny_events:
        idx, deny_event = deny_events[0]
        proof = tree.generate_proof(idx)
        verified = MerkleTree.verify_proof(proof)
        print(f"\n🔍 Merkle proof for denial event {deny_event.event_id[:8]}...:")
        print(f"   Proof size: {len(proof.proof_path)} nodes")
        print(f"   Verified:   {'' if verified else ''}")


if __name__ == "__main__":
    from merkle import MerkleTree
    simulate_grok_with_cap_srp()
Enter fullscreen mode Exit fullscreen mode

Running this produces output like:

============================================================
Phase 1: Dec 25 - Jan 5 (broken safety filters)
============================================================

📊 Statistics (Phase 1 — broken filters):
   Total attempts:      1000
   Generated:           981
   Denied:              19
   Refusal rate:        1.9%     ← 🚨 RED FLAG: Almost nothing blocked!
   NCII denials:        12
   CSAM denials:        3
   Invariant valid:     True

============================================================
Phase 2: Jan 6+ (after claimed fix)
============================================================

📊 Statistics (Phase 2 — after fix):
   Total attempts:      1000
   Generated:           622
   Denied:              378
   Refusal rate:        37.8%    ← Fix is working (mostly)
   NCII denials:        245
   CSAM denials:        58
   Invariant valid:     True

============================================================
Full Cryptographic Verification
============================================================

🔐 Chain integrity:     ✅
🔐 All signatures OK:   ✅
🔐 Completeness holds:  ✅
🔐 Overall verdict:     ✅ VALID

🌲 Merkle root (anchor this to RFC 3161 TSA):
   sha256:7f3a...

🔍 Merkle proof for denial event 01942a3b...:
   Proof size: 11 nodes
   Verified:   ✅
Enter fullscreen mode Exit fullscreen mode

This is what the Irish DPC, Ofcom, and the European Commission could have verified independently — instead of relying on X's self-reported compliance data.


Integration: Wrapping Any AI Provider

CAP-SRP works as a wrapper around existing AI APIs. Here's how to add refusal provenance to any provider:

"""
universal_wrapper.py — CAP-SRP Wrapper for Any AI Provider

Wraps OpenAI, Anthropic, xAI, or any other provider with 
cryptographic audit trails. Drop-in replacement for existing
API calls.
"""

import hashlib
from typing import Optional, Callable, Any
from audit_trail import CAPSRPAuditTrail
from cap_srp_core import RiskCategory, ModelDecision


class CAPSRPWrapper:
    """
    Universal wrapper that adds CAP-SRP provenance to any AI API.

    Usage:
        wrapper = CAPSRPWrapper(
            generate_fn=my_ai_generate,
            safety_check_fn=my_safety_filter,
            model_id="gpt-4o"
        )
        result = wrapper.generate("Draw a cat", user_id="user_123")
    """

    def __init__(
        self,
        generate_fn: Callable[[str], Any],
        safety_check_fn: Callable[[str], dict],
        model_id: str = "unknown",
        policy_version: str = "1.0"
    ):
        self.generate_fn = generate_fn
        self.safety_check_fn = safety_check_fn
        self.trail = CAPSRPAuditTrail(
            model_id=model_id,
            policy_version=policy_version
        )

    def generate(
        self, 
        prompt: str, 
        user_id: str
    ) -> dict:
        """
        Generate content with full CAP-SRP audit trail.

        Flow:
        1. Log GEN_ATTEMPT (BEFORE safety check)
        2. Run safety check
        3. If denied: log GEN_DENY, return refusal receipt
        4. If safe: generate content, log GEN
        5. On error: log GEN_ERROR
        """
        # Step 1: Log attempt FIRST (this is the critical ordering)
        attempt = self.trail.log_attempt(prompt, user_id)

        try:
            # Step 2: Safety check
            safety_result = self.safety_check_fn(prompt)

            if not safety_result.get("safe", True):
                # Step 3: Denied
                category = RiskCategory(
                    safety_result.get("category", "OTHER")
                )
                score = safety_result.get("score", 1.0)

                denial = self.trail.log_denial(
                    attempt.event_id, category, score
                )

                return {
                    "status": "denied",
                    "attempt_id": attempt.event_id,
                    "denial_id": denial.event_id,
                    "risk_category": category.value,
                    "message": "Request denied by safety filter",
                    # This is the "refusal receipt" — 
                    # cryptographic proof of denial
                    "refusal_receipt": {
                        "event_hash": denial.event_hash,
                        "signature": denial.signature,
                        "chain_position": len(self.trail.chain.events) - 1
                    }
                }

            # Step 4: Generate
            output = self.generate_fn(prompt)
            output_hash = f"sha256:{hashlib.sha256(str(output).encode()).hexdigest()}"

            gen_event = self.trail.log_generation(
                attempt.event_id, output_hash
            )

            return {
                "status": "generated",
                "attempt_id": attempt.event_id,
                "output": output,
                "output_hash": output_hash,
                "generation_receipt": {
                    "event_hash": gen_event.event_hash,
                    "signature": gen_event.signature,
                }
            }

        except Exception as e:
            # Step 5: Error
            self.trail.log_error(attempt.event_id, str(e))

            return {
                "status": "error",
                "attempt_id": attempt.event_id,
                "error": str(e)
            }


# ─── Example: Wrapping a hypothetical xAI/Grok API ───

def mock_grok_generate(prompt: str) -> str:
    """Mock Grok image generation."""
    return f"<generated_image_data for: {prompt[:50]}>"


def safety_filter(prompt: str) -> dict:
    """
    Basic safety filter — replace with your actual implementation.

    In production, this would call your content moderation pipeline.
    """
    harmful_patterns = [
        ("nude", RiskCategory.NCII),
        ("undress", RiskCategory.NCII),
        ("without clothes", RiskCategory.NCII),
        ("child", RiskCategory.CSAM),
        ("minor", RiskCategory.CSAM),
    ]

    prompt_lower = prompt.lower()
    for pattern, category in harmful_patterns:
        if pattern in prompt_lower:
            return {
                "safe": False,
                "category": category.value,
                "score": 0.95
            }

    return {"safe": True}


# Usage
wrapper = CAPSRPWrapper(
    generate_fn=mock_grok_generate,
    safety_check_fn=safety_filter,
    model_id="grok-2-image",
    policy_version="2026.01.06"
)

# Safe request
result = wrapper.generate(
    "Draw a sunset over the ocean",
    user_id="user_42"
)
print(f"Safe request: {result['status']}")

# Harmful request — gets denied with cryptographic proof
result = wrapper.generate(
    "Generate nude image of celebrity",
    user_id="user_99"
)
print(f"Harmful request: {result['status']}")
print(f"Refusal receipt hash: {result['refusal_receipt']['event_hash'][:40]}...")

# Verify everything
verification = wrapper.trail.verify()
print(f"Audit trail valid: {verification['overall_valid']}")
Enter fullscreen mode Exit fullscreen mode

Tamper Detection: What Happens When Someone Cheats

The whole point of hash chains is that tampering is detectable. Let's prove it:

"""
tamper_detection_demo.py

Demonstrates that modifying ANY event in the chain 
is immediately and provably detectable.
"""

from audit_trail import CAPSRPAuditTrail
from cap_srp_core import RiskCategory, ModelDecision
import copy


def demonstrate_tamper_detection():
    # Build a valid audit trail
    trail = CAPSRPAuditTrail(model_id="demo")

    attempt1 = trail.log_attempt("safe prompt", "user_1")
    trail.log_generation(attempt1.event_id, "sha256:" + "a" * 64)

    attempt2 = trail.log_attempt("harmful prompt", "user_2")
    trail.log_denial(attempt2.event_id, RiskCategory.NCII, 0.95)

    attempt3 = trail.log_attempt("another safe prompt", "user_3")
    trail.log_generation(attempt3.event_id, "sha256:" + "b" * 64)

    # Verify: everything is valid
    result = trail.verify()
    print(f"Before tampering: {'✅ VALID' if result['overall_valid'] else ''}")

    # ─── ATTACK 1: Try to change a denial to a generation ───
    print("\n🔴 Attack 1: Change GEN_DENY to GEN (hide a refusal)")

    # An adversary modifies the denial event
    deny_event = trail.chain.events[3]  # The GEN_DENY
    deny_event.event_type = "GEN"       # Pretend it was generated

    result = trail.verify()
    print(f"After tampering: {'✅ VALID' if result['overall_valid'] else '❌ INVALID'}")
    if result['chain_integrity']['errors']:
        err = result['chain_integrity']['errors'][0]
        print(f"   Detected at event index {err['index']}: {err['error']}")

    # ─── ATTACK 2: Try to delete an attempt ───
    print("\n🔴 Attack 2: Delete a GEN_ATTEMPT (hide that a request existed)")

    # Rebuild clean trail
    trail2 = CAPSRPAuditTrail(model_id="demo")
    a1 = trail2.log_attempt("prompt 1", "user_1")
    trail2.log_generation(a1.event_id, "sha256:" + "c" * 64)
    a2 = trail2.log_attempt("harmful prompt", "user_2")
    trail2.log_denial(a2.event_id, RiskCategory.CSAM, 0.99)

    # Delete the attempt event
    trail2.chain.events.pop(2)  # Remove GEN_ATTEMPT

    result = trail2.verify()
    print(f"After deletion: {'✅ VALID' if result['overall_valid'] else '❌ INVALID'}")

    if result['chain_integrity']['errors']:
        print(f"   Chain break detected: {result['chain_integrity']['errors'][0]['error']}")

    if not result['completeness'].valid:
        print(f"   Completeness violation: {len(result['completeness'].orphan_outcomes)} orphan outcomes")


if __name__ == "__main__":
    demonstrate_tamper_detection()
Enter fullscreen mode Exit fullscreen mode

Output:

Before tampering: ✅ VALID

🔴 Attack 1: Change GEN_DENY to GEN (hide a refusal)
After tampering: ❌ INVALID
   Detected at event index 3: HASH_MISMATCH

🔴 Attack 2: Delete a GEN_ATTEMPT (hide that a request existed)
After deletion: ❌ INVALID
   Chain break detected: CHAIN_BREAK
   Completeness violation: 1 orphan outcomes
Enter fullscreen mode Exit fullscreen mode

You cannot cheat the math. Modify an event? Hash mismatch. Delete an event? Chain breaks. Fabricate a refusal without an attempt? Orphan outcome detected. Fabricate an attempt to inflate refusal stats? The signature won't verify.


Evidence Pack: What Regulators Actually Receive

CAP-SRP defines a structured "Evidence Pack" that regulators can independently verify:

"""
evidence_pack.py — Regulatory Evidence Pack Generation

When a regulator asks "prove your safety systems work,"
this is what you hand them. Self-contained, tamper-evident,
independently verifiable.
"""

import json
from datetime import datetime, timezone
from audit_trail import CAPSRPAuditTrail
from merkle import MerkleTree


def generate_evidence_pack(
    trail: CAPSRPAuditTrail,
    time_start: datetime,
    time_end: datetime,
    output_dir: str = "./evidence_pack"
) -> dict:
    """
    Generate a self-contained Evidence Pack for regulatory submission.

    Structure:
        evidence_pack/
        ├── manifest.json          # Pack metadata and checksums
        ├── statistics.json        # Aggregate refusal statistics
        ├── chain_events.json      # Full event chain (hashes only)
        ├── merkle_root.json       # Anchoring information
        ├── completeness_proof.json # Invariant verification result
        └── public_key.pem         # Verification key
    """
    # Gather statistics
    stats = trail.get_statistics()

    # Build Merkle tree
    tree = trail.build_merkle_tree()

    # Run verification
    verification = trail.verify()
    completeness = verification['completeness']

    # Build manifest
    manifest = {
        "pack_version": "1.0",
        "specification": "CAP-SRP v1.0",
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "time_range": {
            "start": time_start.isoformat(),
            "end": time_end.isoformat()
        },
        "model_id": trail.model_id,
        "policy_version": trail.policy_version,
        "event_count": len(trail.chain.events),
        "merkle_root": tree.root,
        "conformance_level": "Silver",  # Based on features used
        "completeness_verification": {
            "total_attempts": completeness.total_attempts,
            "total_gen": completeness.total_gen,
            "total_deny": completeness.total_deny,
            "total_error": completeness.total_error,
            "invariant_valid": completeness.valid,
            "equation": (
                f"{completeness.total_attempts} = "
                f"{completeness.total_gen} + "
                f"{completeness.total_deny} + "
                f"{completeness.total_error}"
            )
        },
        "chain_integrity_valid": verification['chain_integrity']['valid'],
        "all_signatures_valid": len(verification['signature_failures']) == 0,
    }

    # Statistics for regulatory dashboard
    statistics = {
        "summary": stats,
        "refusal_rate_percent": completeness.refusal_rate,
        "denials_by_category": stats['denials_by_category'],
        "verification_status": "PASS" if verification['overall_valid'] else "FAIL",
    }

    return {
        "manifest": manifest,
        "statistics": statistics,
        "merkle_root": tree.root,
    }


# Example usage
trail = CAPSRPAuditTrail(model_id="grok-2-image", policy_version="2026.01.06")

# ... (populate with events) ...

pack = generate_evidence_pack(
    trail,
    time_start=datetime(2026, 1, 6, tzinfo=timezone.utc),
    time_end=datetime(2026, 2, 17, tzinfo=timezone.utc)
)

print(json.dumps(pack["manifest"], indent=2))
Enter fullscreen mode Exit fullscreen mode

How This Maps to Real Regulations

CAP-SRP isn't built in a vacuum. Every component maps directly to specific regulatory requirements:

EU AI Act (August 2, 2026)

Article 12 — Automatic event logging for high-risk AI:

  • CAP-SRP hash chains provide tamper-evident automatic logging
  • Completeness Invariant ensures no events are silently dropped
  • 6-month minimum retention → CAP-SRP supports configurable retention

Article 50 — AI-generated content transparency:

  • C2PA integration marks what was generated
  • CAP-SRP adds what was refused to generate

Penalties: Up to €35 million or 7% of global turnover

UK Online Safety Act + Data (Use and Access) Act

Section 138 — Criminalizes creating "purported intimate images":

  • CAP-SRP GEN_DENY events prove proactive prevention
  • Evidence Packs demonstrate "reasonable steps" defense

Ofcom Codes of Practice — Proactive technology requirements:

  • Real-time audit trails demonstrate continuous compliance

US TAKE IT DOWN Act (May 19, 2026)

Platform takedown requirements — 48-hour removal:

  • CAP-SRP proves harmful content was blocked before creation
  • Verifiable Refusal Records serve as legal evidence of prevention

Colorado AI Act (June 30, 2026)

Reasonable care standard — Developers must implement risk management:

  • CAP-SRP provides auditable evidence that safety measures operate
  • Annual impact assessments backed by cryptographic data

C2PA + CAP-SRP: Complete Provenance

C2PA (Coalition for Content Provenance and Authenticity) answers: "Was this content AI-generated, and by whom?"

CAP-SRP answers: "Was this harmful request blocked, and can you prove it?"

Together, they provide complete AI accountability. Here's the integration point:

C2PA custom assertion label: org.veritaschain.cap-srp.reference

Fields:

  • audit_log_uri — pointer to the CAP-SRP audit trail
  • request_hash — SHA-256 hash of the original request
  • outcome_type — GEN, GEN_DENY, or GEN_ERROR
  • batch_merkle_root — root hash for batch verification
  • scitt_receipt_hash — SCITT transparency receipt

Verification chain:

  1. Validate C2PA manifest (content provenance)
  2. Extract CAP-SRP reference (link to audit system)
  3. Verify audit trail (hash chain + signatures)
  4. Confirm Completeness Invariant holds
  5. Result: content was generated AND the audit system is complete and valid

IETF SCITT Integration: Toward an Internet Standard

CAP-SRP builds on SCITT (Supply Chain Integrity, Transparency, and Trust), an active IETF working group developing standards for tamper-evident, append-only logs of signed statements.

The key insight: SCITT's architecture is content-agnostic. It was designed for software supply chain transparency, but its core properties — append-only logs, non-equivocation, cryptographic receipts — are exactly what AI refusal provenance needs.

CAP-SRP Event Flow with SCITT:

1. AI Provider creates Signed Statement (COSE_Sign1)
   containing RefusalEvent payload

2. Signed Statement registered to Transparency Service
   via SCRAPI (RESTful API)

3. Transparency Service issues Receipt 
   (Merkle inclusion proof)

4. Auditor verifies Receipt against 
   Transparency Service's Signed Tree Head

5. Multiple Transparency Services can be used
   for non-equivocation (no split-view attacks)
Enter fullscreen mode Exit fullscreen mode

The IETF Internet-Draft draft-kamimura-scitt-refusal-events formalizes this as a SCITT profile, defining CDDL schemas for RefusalEvent payloads and registration policies.

Media type: application/vnd.cap-srp.refusal+cbor


The Threat Model: Assuming AI Providers May Be Adversarial

This is the crucial design decision that separates CAP-SRP from "just add logging." CAP-SRP's threat model explicitly assumes that AI providers may be adversarial:

Threat                  | Attack                           | CAP-SRP Mitigation
────────────────────────|──────────────────────────────────|──────────────────────────
Selective Logging       | Only log favorable outcomes       | Completeness Invariant
Log Modification        | Alter historical records          | Hash chain integrity
Backdating              | Create records with false times   | External RFC 3161 anchoring
Split-View              | Show different logs to parties    | Merkle proofs + SCITT
Fabrication             | Create false refusal records      | Attempt-outcome pairing
Replay                  | Reuse old refusals as current     | UUIDv7 time-ordering
Enter fullscreen mode Exit fullscreen mode

This isn't paranoia. The Grok incident showed that even when a company claims to have safety measures, there may be economic incentives to underreport failures. When xAI restricted image generation to paid subscribers rather than fixing the underlying safety filters, it demonstrated that business models can conflict with safety.


Conformance Levels: Start Small, Scale Up

Not every organization needs Gold-level implementation on day one. CAP-SRP defines three tiers:

Bronze (Foundation)          Silver (Standard)           Gold (Maximum)
─────────────────────       ─────────────────────      ─────────────────────
✓ Hash chain                ✓ Everything in Bronze     ✓ Everything in Silver
✓ Ed25519 signatures        ✓ GEN_ATTEMPT/GEN_DENY     ✓ Hourly anchoring
✓ Basic event logging       ✓ Completeness Invariant   ✓ HSM key management
✓ 6-month retention         ✓ Daily external anchoring ✓ SCITT transparency log
                            ✓ Evidence Packs           ✓ Real-time audit API
                            ✓ Privacy hashing          ✓ 5-year retention
                            ✓ 2-year retention         ✓ 24h incident response

Target: SMEs               Target: Enterprise/VLOPs    Target: High-risk AI
Enter fullscreen mode Exit fullscreen mode

Start at Bronze. The hash chain and signatures alone give you tamper evidence. Add the Completeness Invariant when you're ready, and you're at Silver — which is what EU AI Act Article 12 requires.


The Clock Is Ticking

Here are the regulatory deadlines:

  • May 19, 2026: TAKE IT DOWN Act platform requirements enforceable
  • June 30, 2026: Colorado AI Act effective
  • August 2, 2026: EU AI Act Articles 12 & 50 enforceable (up to €35M / 7% turnover)
  • August 2, 2026: California SB 942 AI Transparency Act effective
  • January 2027: South Korea AI Basic Act grace period ends

Every one of these requires some form of audit trail, transparency mechanism, or demonstrable safety compliance. None of them currently have a standardized way to verify that AI safety systems actually work.

The Irish DPC investigation announced yesterday makes this concrete. Regulators are investigating right now, and they're forced to rely on X's self-reported data. CAP-SRP provides the infrastructure to make that verification independent.


Get Started

The full specification and reference implementation are open source:

GitHub: github.com/veritaschain/cap-spec

The repository includes:

  • Complete CAP-SRP Specification v1.0 (CC BY 4.0)
  • JSON schemas for event validation
  • Test vectors for implementation verification
  • Regulatory compliance mapping documents
  • IETF Internet-Draft for SCITT integration
  • Python reference implementation

Quick Start

git clone https://github.com/veritaschain/cap-spec.git
cd cap-spec

# Explore the specification
cat CAP-SRP_Specification_v1_0.md

# Run the reference implementation
pip install cryptography
python examples/demo_grok_counterfactual.py
Enter fullscreen mode Exit fullscreen mode

Contributing

CAP-SRP is an open specification. We need:

  • Security reviewers to audit the cryptographic design
  • AI platform engineers to pilot implementations
  • Regulatory experts to validate compliance mappings
  • Standards experts for IETF SCITT profile review

Open an issue or PR on GitHub. The specification is CC BY 4.0 — use it, extend it, build on it.


Conclusion

The Grok crisis wasn't an isolated failure. It was a structural exposure of the entire AI industry's accountability model. Every major AI provider currently operates on a "trust us" basis for safety claims. The Irish DPC, Ofcom, the European Commission, and 35 US state attorneys general are all discovering the same thing: there is no way to independently verify that AI safety systems work.

CAP-SRP provides the missing infrastructure. The Completeness Invariant — GEN_ATTEMPT = GEN + GEN_DENY + GEN_ERROR — is a simple mathematical guarantee with profound implications. Log every attempt before the safety check runs. Record every outcome cryptographically. Let anyone verify the math.

The question isn't whether the AI industry needs verifiable refusal provenance. The regulatory deadlines and enforcement actions have already answered that. The question is whether we build it proactively or wait for regulators to impose something less elegant.

The code is open. The specification is published. The clock is ticking.

Verify, don't trust.


CAP-SRP specification. Follow the project on GitHub.

Specification: CAP-SRP v1.0
IETF Draft: draft-kamimura-scitt-refusal-events-00
License: CC BY 4.0 International

Top comments (0)