Skip to content

GMAP API

GMAP (Gemini Mailbox Access Protocol) components for serving mailbox contents over Gemini protocol.

GmapHandler

GmapHandler

GmapHandler(
    mailbox_dir: Path,
    hostname: str,
    recipient_fps: dict[str, str] | None = None,
)

Routes GMAP requests and manages per-mailbox indices.

Source code in src/titlani/gmap/handler.py
def __init__(
    self,
    mailbox_dir: Path,
    hostname: str,
    recipient_fps: dict[str, str] | None = None,
) -> None:
    self.mailbox_dir = mailbox_dir
    self.hostname = hostname
    self.recipient_fps = recipient_fps or {}

GeminiRequest

GeminiRequest dataclass

GeminiRequest(
    url: str,
    hostname: str,
    path: str,
    query: str | None,
    client_cert: Certificate | None,
)

GeminiResponse

GeminiResponse dataclass

GeminiResponse(status: int, meta: str, body: bytes = b'')

parse_gemini_request

parse_gemini_request

parse_gemini_request(
    line: bytes, client_cert: Certificate | None = None
) -> GeminiRequest

Parse a Gemini request line into a GeminiRequest.

Format: gemini://hostname/path?query

Source code in src/titlani/gmap/handler.py
def parse_gemini_request(
    line: bytes,
    client_cert: x509.Certificate | None = None,
) -> GeminiRequest:
    """Parse a Gemini request line into a GeminiRequest.

    Format: gemini://hostname/path?query
    """
    try:
        text = line.decode("utf-8").strip()
    except UnicodeDecodeError as e:
        raise ValueError(f"Invalid UTF-8 in request: {e}") from e

    if not text.startswith("gemini://"):
        raise ValueError(f"Not a Gemini URL: {text!r}")

    # Strip scheme
    rest = text[len("gemini://") :]

    # Split host from path
    slash_idx = rest.find("/")
    if slash_idx == -1:
        hostname = rest
        path = "/"
    else:
        hostname = rest[:slash_idx]
        path = rest[slash_idx:]

    # Strip port from hostname if present
    if ":" in hostname:
        hostname = hostname.split(":")[0]

    # Split path from query
    query = None
    if "?" in path:
        path, query = path.split("?", 1)

    return GeminiRequest(
        url=text,
        hostname=hostname,
        path=path,
        query=query,
        client_cert=client_cert,
    )

GmapMailbox

GmapMailbox

GmapMailbox(mailbox_path: Path)

Manages the GMAP index for a single mailbox directory.

Source code in src/titlani/gmap/mailbox.py
def __init__(self, mailbox_path: Path) -> None:
    self.mailbox_path = mailbox_path
    self.index_path = mailbox_path / ".gmap.json"
    self.messages: dict[str, MessageEntry] = {}
    self._loaded = False
    self._is_list = is_mailing_list(mailbox_path)

load

load() -> None

Load index from disk. Creates empty index if missing.

Source code in src/titlani/gmap/mailbox.py
def load(self) -> None:
    """Load index from disk. Creates empty index if missing."""
    if not self.index_path.exists():
        self.messages = {}
        self._loaded = True
        return

    try:
        data = json.loads(self.index_path.read_text("utf-8"))
        msgs = data.get("messages", {})
        self.messages = {}
        for msgid, info in msgs.items():
            self.messages[msgid] = MessageEntry(
                tags=set(info.get("tags", [])),
                timestamp=info.get("timestamp", ""),
                filename=info.get("filename", ""),
            )
        self._loaded = True
    except (json.JSONDecodeError, KeyError, TypeError):
        logger.warning(
            "gmap_index_corrupt",
            path=str(self.index_path),
        )
        self.messages = {}
        self._loaded = True

save

save() -> None

Write index to disk atomically (temp + rename).

Source code in src/titlani/gmap/mailbox.py
def save(self) -> None:
    """Write index to disk atomically (temp + rename)."""
    data = {
        "version": INDEX_VERSION,
        "messages": {
            msgid: {
                "tags": sorted(entry.tags),
                "timestamp": entry.timestamp,
                "filename": entry.filename,
            }
            for msgid, entry in sorted(self.messages.items())
        },
    }
    content = json.dumps(data, indent=2) + "\n"
    fd, tmp_path = tempfile.mkstemp(dir=self.mailbox_path, suffix=".tmp")
    try:
        os.write(fd, content.encode("utf-8"))
        os.close(fd)
        fd = -1  # Mark as closed
        os.replace(tmp_path, self.index_path)
    except BaseException:
        logger.error(
            "gmap_index_save_failed",
            path=str(self.index_path),
        )
        if fd >= 0:
            try:
                os.close(fd)
            except OSError:
                pass
        if os.path.exists(tmp_path):
            os.unlink(tmp_path)
        raise

sync_filesystem

sync_filesystem() -> bool

Scan mailbox for new .gemmail files not in the index.

Auto-tags new messages with Inbox and Unread. Returns True if index was modified.

Source code in src/titlani/gmap/mailbox.py
def sync_filesystem(self) -> bool:
    """Scan mailbox for new .gemmail files not in the index.

    Auto-tags new messages with Inbox and Unread.
    Returns True if index was modified.
    """
    if not self._loaded:
        self.load()

    modified = False
    known_files = {e.filename for e in self.messages.values()}

    # Discover .gemmail and .gemmail.enc files (including .new unread)
    for pattern in (
        "*.gemmail",
        "*.gemmail.new",
        "*.gemmail.enc",
        "*.gemmail.enc.new",
    ):
        for path in self.mailbox_path.glob(pattern):
            if path.name.startswith("."):
                continue
            if path.name in known_files:
                continue

            msgid = _filename_to_msgid(path.name)
            if not msgid:
                continue

            if msgid in self.messages:
                # File was renamed (e.g. .new removed on read);
                # update filename but preserve existing tags.
                self.messages[msgid].filename = path.name
                modified = True
                continue

            ts = _parse_timestamp_from_msgid(msgid)
            tags = {"Inbox", "Unread"}
            if self._is_list:
                tags.add("List")
            self.messages[msgid] = MessageEntry(
                tags=tags,
                timestamp=ts,
                filename=path.name,
            )
            modified = True
            logger.debug(
                "gmap_indexed_new_message",
                msgid=msgid,
                filename=path.name,
            )

    # Remove entries whose files no longer exist
    if self._remove_stale_entries():
        modified = True

    return modified

list_all_msgids

list_all_msgids() -> list[str]

Return all message IDs (excluding Trash).

Source code in src/titlani/gmap/mailbox.py
def list_all_msgids(self) -> list[str]:
    """Return all message IDs (excluding Trash)."""
    return [
        msgid for msgid, entry in self.messages.items() if "Trash" not in entry.tags
    ]

list_by_tag

list_by_tag(
    tag: str, since: datetime | None = None
) -> list[str]

Return message IDs with the given tag.

Messages tagged Trash are excluded unless querying Trash itself.

Source code in src/titlani/gmap/mailbox.py
def list_by_tag(
    self,
    tag: str,
    since: datetime | None = None,
) -> list[str]:
    """Return message IDs with the given tag.

    Messages tagged Trash are excluded unless querying Trash itself.
    """
    result = []
    for msgid, entry in self.messages.items():
        if tag not in entry.tags:
            continue
        if tag != "Trash" and "Trash" in entry.tags:
            continue
        if since is not None:
            msg_time = _parse_iso_timestamp(entry.timestamp)
            if msg_time is not None and msg_time < since:
                continue
        result.append(msgid)
    return result

get_message_bytes

get_message_bytes(msgid: str) -> bytes | None

Read a message file and return its raw bytes.

Source code in src/titlani/gmap/mailbox.py
def get_message_bytes(self, msgid: str) -> bytes | None:
    """Read a message file and return its raw bytes."""
    entry = self.messages.get(msgid)
    if entry is None:
        return None
    filepath = self.mailbox_path / entry.filename
    if not filepath.exists():
        return None
    return filepath.read_bytes()

is_encrypted

is_encrypted(msgid: str) -> bool

Check if a message is encrypted (.gemmail.enc or .gemmail.enc.new).

Source code in src/titlani/gmap/mailbox.py
def is_encrypted(self, msgid: str) -> bool:
    """Check if a message is encrypted (.gemmail.enc or .gemmail.enc.new)."""
    entry = self.messages.get(msgid)
    if entry is None:
        return False
    return ".enc" in entry.filename

add_tag

add_tag(msgid: str, tag: str) -> bool

Add a tag to a message. Returns True if changed.

Source code in src/titlani/gmap/mailbox.py
def add_tag(self, msgid: str, tag: str) -> bool:
    """Add a tag to a message. Returns True if changed."""
    if not _valid_tag(tag):
        return False
    entry = self.messages.get(msgid)
    if entry is None:
        return False
    if tag in entry.tags:
        return True  # Already tagged, spec says update timestamp
    entry.tags.add(tag)
    return True

remove_tag

remove_tag(msgid: str, tag: str) -> bool

Remove a tag from a message. Returns True if changed.

Source code in src/titlani/gmap/mailbox.py
def remove_tag(self, msgid: str, tag: str) -> bool:
    """Remove a tag from a message. Returns True if changed."""
    entry = self.messages.get(msgid)
    if entry is None:
        return False
    if tag not in entry.tags:
        return True  # Already untagged
    entry.tags.discard(tag)
    return True

delete_message

delete_message(msgid: str) -> bool

Permanently delete a message. Only succeeds if tagged Trash.

Source code in src/titlani/gmap/mailbox.py
def delete_message(self, msgid: str) -> bool:
    """Permanently delete a message. Only succeeds if tagged Trash."""
    entry = self.messages.get(msgid)
    if entry is None:
        return False
    if "Trash" not in entry.tags:
        return False

    filepath = self.mailbox_path / entry.filename
    if filepath.exists():
        filepath.unlink()
        logger.info(
            "gmap_message_file_deleted",
            msgid=msgid,
            filename=entry.filename,
        )
    del self.messages[msgid]
    return True

MessageEntry

MessageEntry dataclass

MessageEntry(
    tags: set[str] = set(),
    timestamp: str = "",
    filename: str = "",
)

GeminiServerProtocol

GeminiServerProtocol

GeminiServerProtocol(
    request_handler: Callable[
        [GeminiRequest],
        GeminiResponse | Awaitable[GeminiResponse],
    ],
)

Bases: Protocol

Gemini protocol handler for GMAP requests.

Simpler than MisfinServerProtocol: single-phase (no body), accumulate until CRLF, parse URL, route to handler.

Source code in src/titlani/gmap/protocol.py
def __init__(
    self,
    request_handler: Callable[
        [GeminiRequest],
        GeminiResponse | Awaitable[GeminiResponse],
    ],
) -> None:
    self.request_handler = request_handler
    self.transport: asyncio.Transport | None = None
    self.buffer = b""
    self.peer_name: tuple[str, int] | None = None
    self.request_start_time: float | None = None
    self.timeout_handle: asyncio.TimerHandle | None = None
    self.received_first_byte = False