Skip to content

Verification API

Sender verification via probe-based challenge or SPKI-based TOFU.

VerifyingHandler

VerifyingHandler

VerifyingHandler(
    wrapped: MessageHandler,
    verifier: ProbeVerifier
    | SPKIVerifier
    | CombinedVerifier,
    mode: VerificationMode = VerificationMode.OPTIONAL,
    method: VerificationMethod = VerificationMethod.PROBE,
)

Bases: MessageHandler

Wraps a MessageHandler to verify sender addresses before delivery.

Zero-length messages (verification probes) are always forwarded to the wrapped handler without verification to avoid infinite loops.

Source code in src/titlani/verification/handler.py
def __init__(
    self,
    wrapped: MessageHandler,
    verifier: ProbeVerifier | SPKIVerifier | CombinedVerifier,
    mode: VerificationMode = VerificationMode.OPTIONAL,
    method: VerificationMethod = VerificationMethod.PROBE,
) -> None:
    self.wrapped = wrapped
    self.verifier = verifier
    self.mode = mode
    self.method = method

ProbeVerifier

ProbeVerifier

ProbeVerifier(
    cache: SenderVerificationCache,
    identity_cert: Path,
    identity_key: Path,
    port: int = DEFAULT_PORT,
    timeout: float = 10.0,
)

Verify senders by probing their server with a zero-length request.

A successful probe (status 20) means the sender's server recognises the mailbox and returns its certificate fingerprint. Results are cached so each sender is probed at most once.

Source code in src/titlani/verification/verifier.py
def __init__(
    self,
    cache: SenderVerificationCache,
    identity_cert: Path,
    identity_key: Path,
    port: int = DEFAULT_PORT,
    timeout: float = 10.0,
) -> None:
    self.cache = cache
    self.identity_cert = identity_cert
    self.identity_key = identity_key
    self.port = port
    self.timeout = timeout

SPKIVerifier

SPKIVerifier

SPKIVerifier(
    cache: SenderVerificationCache,
    port: int = DEFAULT_PORT,
    timeout: float = 10.0,
    on_spki_change: str = "reject",
)

Verify senders by caching their server's SPKI hash (TOFU model).

On first contact with a server, connects via TLS, extracts the SPKI hash, and caches it. While the cache entry is valid (within TTL), subsequent verifications return immediately without a network call. When the entry expires, the server is re-checked and its current SPKI compared against the last known value.

Source code in src/titlani/verification/spki_verifier.py
def __init__(
    self,
    cache: SenderVerificationCache,
    port: int = DEFAULT_PORT,
    timeout: float = 10.0,
    on_spki_change: str = "reject",
) -> None:
    self.cache = cache
    self.port = port
    self.timeout = timeout
    self.on_spki_change = on_spki_change

SenderVerificationCache

SenderVerificationCache

SenderVerificationCache(
    db_path: Path | None = None,
    ttl_seconds: int = _DEFAULT_TTL,
)

SQLite-backed cache for probe fingerprints and server SPKI hashes.

Tables: - verified_senders: maps sender addresses to probe fingerprints - server_spki: maps hostnames to SPKI hashes (for SPKI verification)

Source code in src/titlani/verification/cache.py
def __init__(
    self,
    db_path: Path | None = None,
    ttl_seconds: int = _DEFAULT_TTL,
) -> None:
    if db_path is None:
        self._db_path = ":memory:"
    else:
        self._db_path = str(db_path)
    self._ttl_seconds = ttl_seconds
    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    if self._db_path != ":memory:":
        os.chmod(self._db_path, 0o600)
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(
        """
        CREATE TABLE IF NOT EXISTS verified_senders (
            address TEXT PRIMARY KEY,
            fingerprint TEXT NOT NULL,
            verified_at TEXT NOT NULL
        )
        """
    )
    self._conn.execute(
        """
        CREATE TABLE IF NOT EXISTS server_spki (
            hostname TEXT PRIMARY KEY,
            spki_hash TEXT NOT NULL,
            verified_at TEXT NOT NULL
        )
        """
    )
    self._conn.commit()

get_fingerprint

get_fingerprint(address: str) -> str | None

Return cached fingerprint for address, or None if missing/expired.

Source code in src/titlani/verification/cache.py
def get_fingerprint(self, address: str) -> str | None:
    """Return cached fingerprint for *address*, or None if missing/expired."""
    cutoff = datetime.now(UTC) - timedelta(seconds=self._ttl_seconds)
    cur = self._conn.execute(
        "SELECT fingerprint FROM verified_senders "
        "WHERE address = ? AND verified_at >= ?",
        (address, cutoff.isoformat()),
    )
    row = cur.fetchone()
    return row[0] if row else None

add_verified

add_verified(address: str, fingerprint: str) -> None

Store (or update) a verified sender.

Source code in src/titlani/verification/cache.py
def add_verified(self, address: str, fingerprint: str) -> None:
    """Store (or update) a verified sender."""
    now = datetime.now(UTC).isoformat()
    self._conn.execute(
        """
        INSERT INTO verified_senders (address, fingerprint, verified_at)
        VALUES (?, ?, ?)
        ON CONFLICT(address) DO UPDATE SET
            fingerprint = excluded.fingerprint,
            verified_at = excluded.verified_at
        """,
        (address, fingerprint, now),
    )
    self._conn.commit()
    logger.debug("verification_cache_write", address=address)

revoke

revoke(address: str) -> bool

Remove address from cache. Returns True if it existed.

Source code in src/titlani/verification/cache.py
def revoke(self, address: str) -> bool:
    """Remove *address* from cache. Returns True if it existed."""
    cur = self._conn.execute(
        "DELETE FROM verified_senders WHERE address = ?",
        (address,),
    )
    self._conn.commit()
    if cur.rowcount > 0:
        logger.info("verification_cache_revoked", address=address)
    return cur.rowcount > 0

list_verified

list_verified() -> list[tuple[str, str, datetime]]

Return all verified senders as (address, fingerprint, verified_at).

Source code in src/titlani/verification/cache.py
def list_verified(
    self,
) -> list[tuple[str, str, datetime]]:
    """Return all verified senders as (address, fingerprint, verified_at)."""
    cur = self._conn.execute(
        "SELECT address, fingerprint, verified_at "
        "FROM verified_senders ORDER BY verified_at DESC"
    )
    results: list[tuple[str, str, datetime]] = []
    for address, fingerprint, ts_str in cur.fetchall():
        results.append((address, fingerprint, datetime.fromisoformat(ts_str)))
    return results

cleanup

cleanup() -> int

Remove expired entries from all tables. Returns total purged rows.

Source code in src/titlani/verification/cache.py
def cleanup(self) -> int:
    """Remove expired entries from all tables. Returns total purged rows."""
    cutoff = datetime.now(UTC) - timedelta(seconds=self._ttl_seconds)
    cur1 = self._conn.execute(
        "DELETE FROM verified_senders WHERE verified_at < ?",
        (cutoff.isoformat(),),
    )
    cur2 = self._conn.execute(
        "DELETE FROM server_spki WHERE verified_at < ?",
        (cutoff.isoformat(),),
    )
    self._conn.commit()
    total = cur1.rowcount + cur2.rowcount
    if total > 0:
        logger.info("verification_cache_cleanup", expired_count=total)
    return total

get_server_spki

get_server_spki(hostname: str) -> str | None

Return cached SPKI hash for hostname, or None if missing/expired.

Source code in src/titlani/verification/cache.py
def get_server_spki(self, hostname: str) -> str | None:
    """Return cached SPKI hash for *hostname*, or None if missing/expired."""
    cutoff = datetime.now(UTC) - timedelta(seconds=self._ttl_seconds)
    cur = self._conn.execute(
        "SELECT spki_hash FROM server_spki WHERE hostname = ? AND verified_at >= ?",
        (hostname, cutoff.isoformat()),
    )
    row = cur.fetchone()
    return row[0] if row else None

get_last_server_spki

get_last_server_spki(hostname: str) -> str | None

Return last known SPKI hash for hostname, ignoring TTL.

Used during re-verification after cache expiry to detect key changes.

Source code in src/titlani/verification/cache.py
def get_last_server_spki(self, hostname: str) -> str | None:
    """Return last known SPKI hash for *hostname*, ignoring TTL.

    Used during re-verification after cache expiry to detect key changes.
    """
    cur = self._conn.execute(
        "SELECT spki_hash FROM server_spki WHERE hostname = ?",
        (hostname,),
    )
    row = cur.fetchone()
    return row[0] if row else None

add_server_spki

add_server_spki(hostname: str, spki_hash: str) -> None

Store (or update) a server SPKI hash.

Source code in src/titlani/verification/cache.py
def add_server_spki(self, hostname: str, spki_hash: str) -> None:
    """Store (or update) a server SPKI hash."""
    now = datetime.now(UTC).isoformat()
    self._conn.execute(
        """
        INSERT INTO server_spki (hostname, spki_hash, verified_at)
        VALUES (?, ?, ?)
        ON CONFLICT(hostname) DO UPDATE SET
            spki_hash = excluded.spki_hash,
            verified_at = excluded.verified_at
        """,
        (hostname, spki_hash, now),
    )
    self._conn.commit()
    logger.debug("spki_cache_write", hostname=hostname)

list_server_spki

list_server_spki() -> list[tuple[str, str, datetime]]

Return all server SPKI entries as (hostname, spki_hash, verified_at).

Source code in src/titlani/verification/cache.py
def list_server_spki(self) -> list[tuple[str, str, datetime]]:
    """Return all server SPKI entries as (hostname, spki_hash, verified_at)."""
    cur = self._conn.execute(
        "SELECT hostname, spki_hash, verified_at "
        "FROM server_spki ORDER BY verified_at DESC"
    )
    results: list[tuple[str, str, datetime]] = []
    for hostname, spki_hash, ts_str in cur.fetchall():
        results.append((hostname, spki_hash, datetime.fromisoformat(ts_str)))
    return results

clear_server_spki

clear_server_spki() -> int

Remove all server SPKI entries. Returns count of removed rows.

Source code in src/titlani/verification/cache.py
def clear_server_spki(self) -> int:
    """Remove all server SPKI entries. Returns count of removed rows."""
    cur = self._conn.execute("DELETE FROM server_spki")
    self._conn.commit()
    if cur.rowcount > 0:
        logger.info(
            "spki_cache_cleared",
            count=cur.rowcount,
        )
    return cur.rowcount

VerificationMode

VerificationMode

Bases: StrEnum

VerificationMethod

VerificationMethod

Bases: StrEnum

VerificationResult

VerificationResult dataclass

VerificationResult(
    verified: bool,
    fingerprint: str | None = None,
    cached: bool = False,
    reason: str | None = None,
    checks: dict[str, VerificationResult] | None = None,
)