Skip to content

API reference

Auto-generated from docstrings via mkdocstrings.

Top-level

tuiwright

tuiwright — Playwright-style end-to-end testing for TUI applications.

Drives any TUI binary under a real PTY + terminal emulator, with cell-grid and PNG snapshot regression.

Quick start:

async def test_app(tui, snapshot):
    await tui.start("myapp")
    await tui.wait_for_text("Ready")
    await tui.type("hello")
    await tui.press("enter")
    assert tui.screen == snapshot

Cell dataclass

A single rendered terminal cell.

Source code in src/tuiwright/screen.py
@dataclass(frozen=True, slots=True)
class Cell:
    """A single rendered terminal cell."""

    char: str = " "
    fg: Color = _DEFAULT_COLOR
    bg: Color = _DEFAULT_COLOR
    bold: bool = False
    italic: bool = False
    underline: bool = False
    reverse: bool = False
    strike: bool = False
    blink: bool = False

    def has_default_attrs(self) -> bool:
        return (
            self.fg.is_default
            and self.bg.is_default
            and not (self.bold or self.italic or self.underline or self.reverse or self.strike or self.blink)
        )

Color dataclass

A foreground or background colour.

name is one of "default", "black", "red" … or the empty string if rgb is set. rgb is a hex triple "ff8800" (no leading #) for true-colour cells, or None otherwise.

Source code in src/tuiwright/screen.py
@dataclass(frozen=True, slots=True)
class Color:
    """A foreground or background colour.

    ``name`` is one of ``"default"``, ``"black"``, ``"red"`` … or the empty
    string if ``rgb`` is set. ``rgb`` is a hex triple ``"ff8800"`` (no
    leading ``#``) for true-colour cells, or ``None`` otherwise.
    """

    name: str = "default"
    rgb: str | None = None

    @classmethod
    def from_pyte(cls, value: str) -> Color:
        # pyte uses "default" sentinel and 6-char hex for true colour.
        if value == "default":
            return _DEFAULT_COLOR
        if len(value) == 6 and all(c in "0123456789abcdefABCDEF" for c in value):
            return cls(name="", rgb=value.lower())
        return cls(name=value)

    @property
    def is_default(self) -> bool:
        return self.name == "default" and self.rgb is None

Region dataclass

A rectangular sub-view of a :class:Screen.

Bounds are inclusive on top/left and exclusive on bottom/right (standard Python slice convention).

Source code in src/tuiwright/screen.py
@dataclass(frozen=True)
class Region:
    """A rectangular sub-view of a :class:`Screen`.

    Bounds are *inclusive* on top/left and *exclusive* on bottom/right
    (standard Python slice convention).
    """

    screen: Screen
    top: int
    bottom: int
    left: int
    right: int
    title: str | None = None

    @property
    def rows(self) -> int:
        return self.bottom - self.top

    @property
    def cols(self) -> int:
        return self.right - self.left

    @property
    def text(self) -> str:
        return "\n".join(self.row(i) for i in range(self.rows))

    def row(self, i: int) -> str:
        cells = self.screen.cells[self.top + i][self.left : self.right]
        return "".join(c.char for c in cells).rstrip()

    def contains(self, needle: str | re.Pattern[str], *, regex: bool = False) -> bool:
        pat = needle if isinstance(needle, re.Pattern) else (
            re.compile(needle) if regex else re.compile(re.escape(needle))
        )
        return any(pat.search(self.row(i)) for i in range(self.rows))

Screen dataclass

An immutable snapshot of the terminal grid.

Source code in src/tuiwright/screen.py
@dataclass(frozen=True)
class Screen:
    """An immutable snapshot of the terminal grid."""

    rows: int
    cols: int
    cells: tuple[tuple[Cell, ...], ...]
    cursor: Cursor
    modes: frozenset[int] = field(default_factory=frozenset)

    # -- construction --------------------------------------------------

    @classmethod
    def from_emulator(cls, emu: Emulator) -> Screen:
        from tuiwright._emulator import Emulator as _Emu  # noqa: F401

        pyte_cells = emu.cells()
        rows = tuple(
            tuple(
                Cell(
                    char=pc.data or " ",
                    fg=Color.from_pyte(pc.fg),
                    bg=Color.from_pyte(pc.bg),
                    bold=bool(pc.bold),
                    italic=bool(pc.italics),
                    underline=bool(pc.underscore),
                    reverse=bool(pc.reverse),
                    strike=bool(pc.strikethrough),
                    blink=bool(pc.blink),
                )
                for pc in row
            )
            for row in pyte_cells
        )
        pyte_cursor = emu.screen.cursor
        cursor = Cursor(row=pyte_cursor.y, col=pyte_cursor.x, hidden=bool(pyte_cursor.hidden))
        return cls(
            rows=emu.rows,
            cols=emu.cols,
            cells=rows,
            cursor=cursor,
            modes=emu.private_modes,
        )

    # -- text views ----------------------------------------------------

    @property
    def text(self) -> str:
        """All rows joined with ``\\n``, trailing spaces stripped per row."""
        return "\n".join(self.row(i) for i in range(self.rows))

    def row(self, i: int) -> str:
        return "".join(c.char for c in self.cells[i]).rstrip()

    def row_padded(self, i: int) -> str:
        return "".join(c.char for c in self.cells[i])

    def cell(self, row: int, col: int) -> Cell:
        return self.cells[row][col]

    # -- search --------------------------------------------------------

    def find(self, needle: str | re.Pattern[str], *, regex: bool = False) -> list[Position]:
        pattern = needle if isinstance(needle, re.Pattern) else (
            re.compile(needle) if regex else re.compile(re.escape(needle))
        )
        out: list[Position] = []
        for r in range(self.rows):
            for m in pattern.finditer(self.row_padded(r)):
                out.append(Position(r, m.start()))
        return out

    def row_containing(
        self, needle: str | re.Pattern[str], *, regex: bool = False
    ) -> int | None:
        hits = self.find(needle, regex=regex)
        return hits[0].row if hits else None

    def contains(self, needle: str | re.Pattern[str], *, regex: bool = False) -> bool:
        return bool(self.find(needle, regex=regex))

    # -- regions -------------------------------------------------------

    def region(
        self,
        *,
        title: str | None = None,
        rows: tuple[int, int] | None = None,
        cols: tuple[int, int] | None = None,
    ) -> Region:
        if title is not None:
            found = _find_titled_region(self, title)
            if found is None:
                raise LookupError(f"no titled region matching {title!r}")
            return found
        if rows is None or cols is None:
            raise ValueError("either title= or both rows= and cols= must be given")
        r0, r1 = rows
        c0, c1 = cols
        return Region(screen=self, top=r0, bottom=r1, left=c0, right=c1, title=None)
text property
text: str

All rows joined with \n, trailing spaces stripped per row.

TuiSession

Drives a TUI binary under a PTY.

Use it as an async context manager or call :meth:start / :meth:stop explicitly. After start, every method that injects input is sync from the caller's perspective (no awaiting on the child) but async because most flows interleave with await wait_for_*.

Source code in src/tuiwright/session.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
class TuiSession:
    """Drives a TUI binary under a PTY.

    Use it as an async context manager or call :meth:`start` / :meth:`stop`
    explicitly. After start, every method that injects input is sync from
    the caller's perspective (no awaiting on the child) but ``async``
    because most flows interleave with ``await wait_for_*``.
    """

    def __init__(self, config: TuiConfig | None = None) -> None:
        self.config = config or TuiConfig()
        self._pty = PtyTransport()
        self._emu = Emulator(cols=self.config.cols, rows=self.config.rows)
        self._recorder: CastRecorder | None = None
        self._cast_path: Path | None = None
        self._started = False
        self._screen_changed = asyncio.Event()
        self._mouse_warned = False
        self._argv: list[str] = []

    # -- lifecycle -----------------------------------------------------

    async def __aenter__(self) -> TuiSession:
        return self

    async def __aexit__(self, *exc: object) -> None:
        await self.stop()

    async def start(
        self,
        cmd: str | list[str],
        *,
        env: dict[str, str] | None = None,
        cwd: str | os.PathLike[str] | None = None,
        cols: int | None = None,
        rows: int | None = None,
        cast_path: Path | str | None = None,
    ) -> None:
        if self._started:
            raise RuntimeError("session already started; create a new one")
        if cols is not None:
            self.config.cols = cols
        if rows is not None:
            self.config.rows = rows
        self._emu = Emulator(cols=self.config.cols, rows=self.config.rows)
        self._argv = shlex.split(cmd) if isinstance(cmd, str) else list(cmd)

        # Cast file: explicit > config.cast_dir > tempdir.
        if cast_path is not None:
            self._cast_path = Path(cast_path)
        else:
            base = self.config.cast_dir or Path(tempfile.gettempdir()) / "tuiwright"
            base.mkdir(parents=True, exist_ok=True)
            self._cast_path = base / f"session-{uuid.uuid4().hex[:12]}.cast"

        full_env = {**os.environ, **(env or {})}
        full_env.setdefault("TERM", "xterm-256color")
        full_env.setdefault("COLORTERM", "truecolor")
        # Disable any prompt-toolkit / pager that would mess with size.
        full_env.setdefault("PAGER", "cat")
        full_env.setdefault("LESS", "-FRX")

        self._recorder = CastRecorder(
            self._cast_path,
            cols=self.config.cols,
            rows=self.config.rows,
            env=full_env,
            title=" ".join(self._argv),
        )
        # Hook listeners BEFORE spawning so we don't miss the first chunk
        # (PTYs can emit data the same loop tick the reader is registered).
        self._pty.add_listener(self._on_output)
        await self._pty.spawn(
            self._argv,
            env=full_env,
            cwd=str(cwd) if cwd is not None else None,
            cols=self.config.cols,
            rows=self.config.rows,
        )
        self._started = True

    async def stop(self, *, timeout: float = 2.0) -> int:
        if not self._started:
            return 0
        try:
            return await self._pty.stop(timeout=timeout)
        finally:
            if self._recorder is not None:
                self._recorder.close()
            self._started = False

    # -- listeners -----------------------------------------------------

    def _on_output(self, chunk: bytes) -> None:
        self._emu.feed(chunk)
        if self._recorder is not None:
            self._recorder.record_output(chunk)
        # Wake any wait_for_* waiters.
        self._screen_changed.set()
        # Re-arm. A wait coroutine that consumed the event must rebuild
        # its reference before re-awaiting; we use a fresh Event each
        # tick to avoid races.
        self._screen_changed = asyncio.Event()

    def _record_input(self, data: bytes) -> None:
        if self._recorder is not None:
            self._recorder.record_input(data)

    # -- properties ----------------------------------------------------

    @property
    def screen(self) -> Screen:
        return Screen.from_emulator(self._emu)

    @property
    def cast_path(self) -> Path:
        if self._cast_path is None:
            raise RuntimeError("session not started yet")
        return self._cast_path

    @property
    def alive(self) -> bool:
        return self._pty.alive

    # -- input: keys / text --------------------------------------------

    async def press(self, key: str) -> None:
        self._require_started()
        data = encode_key(key)
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)  # let the loop service the write/read

    async def type(self, text: str, *, delay: float = 0.0) -> None:
        """Send each character as its UTF-8 bytes.

        ``delay`` is seconds between characters — useful for apps that
        debounce input or for emulating a human typist in demos.
        """
        self._require_started()
        if delay <= 0:
            data = text.encode("utf-8")
            self._record_input(data)
            self._pty.write_bytes(data)
            await asyncio.sleep(0)
            return
        for ch in text:
            data = ch.encode("utf-8")
            self._record_input(data)
            self._pty.write_bytes(data)
            await asyncio.sleep(delay)

    async def paste(self, text: str) -> None:
        self._require_started()
        if not self._emu.is_bracketed_paste():
            # The app didn't enable bracketed paste, so the brackets would
            # leak into the buffer. Fall back to plain type.
            await self.type(text)
            return
        data = encode_paste(text)
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)

    # -- input: mouse ---------------------------------------------------

    async def click(
        self,
        row: int,
        col: int,
        *,
        button: str = "left",
        modifiers: tuple[str, ...] = (),
    ) -> None:
        self._require_started()
        self._check_mouse_enabled()
        mods = _modtuple_to_mod(modifiers)
        # 0-based → 1-based on the wire.
        down = encode_mouse(
            button=button, row=row + 1, col=col + 1, pressed=True, modifiers=mods
        )
        up = encode_mouse(
            button=button, row=row + 1, col=col + 1, pressed=False, modifiers=mods
        )
        self._record_input(down + up)
        self._pty.write_bytes(down)
        # Small gap so apps that distinguish press/release see two events.
        await asyncio.sleep(0)
        self._pty.write_bytes(up)
        await asyncio.sleep(0)

    async def double_click(
        self, row: int, col: int, *, button: str = "left", interval: float = 0.05
    ) -> None:
        await self.click(row, col, button=button)
        await asyncio.sleep(interval)
        await self.click(row, col, button=button)

    async def drag(
        self,
        from_row: int,
        from_col: int,
        to_row: int,
        to_col: int,
        *,
        button: str = "left",
        steps: int = 4,
    ) -> None:
        self._require_started()
        self._check_mouse_enabled()
        btn = resolve_button(button)
        down = encode_mouse(button=btn, row=from_row + 1, col=from_col + 1, pressed=True)
        self._record_input(down)
        self._pty.write_bytes(down)
        await asyncio.sleep(0)
        for i in range(1, steps + 1):
            r = from_row + (to_row - from_row) * i // steps
            c = from_col + (to_col - from_col) * i // steps
            move = encode_mouse(
                button=btn, row=r + 1, col=c + 1, pressed=True, motion=True
            )
            self._record_input(move)
            self._pty.write_bytes(move)
            await asyncio.sleep(0)
        up = encode_mouse(button=btn, row=to_row + 1, col=to_col + 1, pressed=False)
        self._record_input(up)
        self._pty.write_bytes(up)
        await asyncio.sleep(0)

    async def scroll(
        self,
        row: int,
        col: int,
        *,
        direction: str = "down",
        lines: int = 1,
    ) -> None:
        self._require_started()
        self._check_mouse_enabled()
        button = "wheel_down" if direction == "down" else "wheel_up"
        for _ in range(lines):
            data = encode_mouse(button=button, row=row + 1, col=col + 1, pressed=True)
            self._record_input(data)
            self._pty.write_bytes(data)
            await asyncio.sleep(0)

    async def hover(self, row: int, col: int) -> None:
        self._require_started()
        self._check_mouse_enabled()
        # Motion-only event uses button code 3 (release) + motion bit (32).
        data = encode_mouse(
            button="left", row=row + 1, col=col + 1, pressed=True, motion=True
        )
        # Override button bits with 35 (3 + 32) for "no button motion".
        # The encoder doesn't expose that directly, so build it inline:
        from tuiwright._input import CSI
        msg = CSI + b"<35;" + str(col + 1).encode() + b";" + str(row + 1).encode() + b"M"
        self._record_input(msg)
        self._pty.write_bytes(msg)
        await asyncio.sleep(0)
        _ = data  # silence unused warning

    # -- input: resize / focus -----------------------------------------

    async def resize(self, cols: int, rows: int) -> None:
        self._require_started()
        self._pty.resize(cols, rows)
        self._emu.resize(cols, rows)
        self.config.cols = cols
        self.config.rows = rows
        await asyncio.sleep(0)

    async def focus(self, in_: bool = True) -> None:
        self._require_started()
        if not self._emu.is_focus_events():
            return  # No-op if the app didn't enable focus reporting.
        data = encode_focus(in_)
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)

    # -- waits ----------------------------------------------------------

    async def wait_for_text(
        self,
        needle: str | re.Pattern[str],
        *,
        timeout: float | None = None,
        region: Region | None = None,
        regex: bool = False,
    ) -> Match[str]:
        timeout = timeout if timeout is not None else self.config.default_timeout
        pattern = needle if isinstance(needle, re.Pattern) else (
            re.compile(needle) if regex else re.compile(re.escape(needle))
        )
        # A Region is a snapshot of one Screen. To keep polling honest, we
        # re-derive a Region with the same bounds (or title lookup) from
        # the current screen on every iteration.
        bounds = None if region is None else (region.title, region.top, region.bottom, region.left, region.right)

        def check() -> Match[str] | None:
            if bounds is None:
                target = self.screen.text
            else:
                title, top, bottom, left, right = bounds
                live_screen = self.screen
                if title is not None:
                    try:
                        live = live_screen.region(title=title)
                        target = live.text
                    except LookupError:
                        target = ""
                else:
                    target = "\n".join(
                        "".join(c.char for c in live_screen.cells[r][left:right]).rstrip()
                        for r in range(top, min(bottom, live_screen.rows))
                    )
            return pattern.search(target)

        return await self._poll_until(check, timeout=timeout, description=f"text {needle!r}")

    async def wait_for_predicate(
        self,
        predicate: Callable[[Screen], bool | Awaitable[bool]],
        *,
        timeout: float | None = None,
        description: str = "predicate",
    ) -> None:
        timeout = timeout if timeout is not None else self.config.default_timeout

        async def check() -> bool:
            res = predicate(self.screen)
            if asyncio.iscoroutine(res):
                res = await res
            return bool(res)

        await self._poll_until_async(check, timeout=timeout, description=description)

    async def wait_for_stable(
        self,
        *,
        quiet_ms: int | None = None,
        timeout: float | None = None,
        ignore_cursor: bool = True,
    ) -> None:
        """Wait until the *visible* screen state has stopped changing.

        This is content-based, not byte-based: a TUI that pulses a
        cursor blink, ticks a spinner, or polls a status indicator will
        still settle as long as the rendered text + attributes are
        stable. Set ``ignore_cursor=False`` to also require the cursor
        position / visibility to stop changing (rarely needed; cursor
        blink is the most common reason for false negatives).
        """
        quiet = (quiet_ms if quiet_ms is not None else self.config.stable_quiet_ms) / 1000.0
        timeout = timeout if timeout is not None else self.config.default_timeout
        deadline = time.monotonic() + timeout

        def fingerprint() -> tuple[object, ...]:
            screen = self.screen
            # Compare the cell grid (chars + attrs) and the active DEC
            # modes. Cursor visibility / position is usually noise — we
            # only include it when the caller asks.
            base: tuple[object, ...] = (screen.cells, screen.modes)
            if not ignore_cursor:
                return (*base, screen.cursor)
            return base

        last_state = fingerprint()
        last_change = time.monotonic()
        while True:
            await asyncio.sleep(self.config.poll_interval)
            now = time.monotonic()
            current = fingerprint()
            if current != last_state:
                last_state = current
                last_change = now
            elif now - last_change >= quiet:
                return
            if now >= deadline:
                raise TuiTimeoutError(
                    f"screen did not stabilise within {timeout}s "
                    f"(quiet_ms={int(quiet * 1000)})"
                )

    # -- queries / assertions ------------------------------------------

    def region(
        self,
        *,
        title: str | None = None,
        rows: tuple[int, int] | None = None,
        cols: tuple[int, int] | None = None,
    ) -> Region:
        return self.screen.region(title=title, rows=rows, cols=cols)

    async def assert_region(
        self,
        *,
        title: str | None = None,
        contains: str | re.Pattern[str] | None = None,
        rows: tuple[int, int] | None = None,
        cols: tuple[int, int] | None = None,
        regex: bool = False,
    ) -> None:
        reg = self.region(title=title, rows=rows, cols=cols)
        if contains is not None and not reg.contains(contains, regex=regex):
            raise AssertionError(
                f"region {title or (rows, cols)!r} does not contain {contains!r}.\n"
                f"--- region text ---\n{reg.text}\n--- end ---"
            )

    # -- rendering ------------------------------------------------------

    def png(self, *, out_path: Path | str | None = None, theme: str = "asciinema") -> bytes:
        """Render the current cast position to a PNG via ``agg``.

        Returns the PNG bytes; if ``out_path`` is given, also writes there.
        Raises :class:`FileNotFoundError` if ``agg`` is not installed.
        """
        if self._cast_path is None:
            raise RuntimeError("session not started")
        agg = self.config.agg_path or shutil.which("agg")
        if agg is None:
            raise FileNotFoundError(
                "agg (asciinema-agg) is not installed or not on PATH. "
                "Install via `cargo install --git https://github.com/asciinema/agg` "
                "or `brew install agg`, or set TuiConfig.agg_path."
            )
        # agg streams to stdout when given '-' as output, but the v1 CLI
        # requires a file argument. Use a tempfile.
        target = Path(out_path) if out_path else Path(tempfile.mkstemp(suffix=".png")[1])
        try:
            subprocess.run(
                [agg, "--theme", theme, "--cols", str(self.config.cols),
                 "--rows", str(self.config.rows), str(self._cast_path), str(target)],
                check=True,
                capture_output=True,
            )
            return target.read_bytes()
        finally:
            if out_path is None:
                try:
                    target.unlink()
                except OSError:
                    pass

    # -- internals ------------------------------------------------------

    def _require_started(self) -> None:
        if not self._started:
            raise RuntimeError("call start() before sending input")

    def _check_mouse_enabled(self) -> None:
        if self._emu.is_mouse_tracking():
            return
        if self.config.strict_mouse:
            raise RuntimeError(
                "mouse input sent but the app has not enabled mouse tracking "
                "(modes 1000/1002/1003). Set TuiConfig.strict_mouse=False to warn instead."
            )
        if not self._mouse_warned:
            import warnings
            warnings.warn(
                "Sending mouse event but the app has not enabled mouse tracking. "
                "The app likely won't see this input. Wait for the app to fully start.",
                stacklevel=3,
            )
            self._mouse_warned = True

    async def _poll_until(
        self,
        check: Callable[[], Any],
        *,
        timeout: float,
        description: str,
    ) -> Any:
        deadline = time.monotonic() + timeout
        while True:
            result = check()
            if result:
                return result
            if time.monotonic() >= deadline:
                raise TuiTimeoutError(
                    f"timed out after {timeout}s waiting for {description}.\n"
                    f"--- last screen ---\n{self.screen.text}\n--- end ---"
                )
            try:
                await asyncio.wait_for(
                    self._screen_changed.wait(),
                    timeout=min(self.config.poll_interval, deadline - time.monotonic()),
                )
            except TimeoutError:
                pass

    async def _poll_until_async(
        self,
        check: Callable[[], Awaitable[bool]],
        *,
        timeout: float,
        description: str,
    ) -> None:
        deadline = time.monotonic() + timeout
        while True:
            if await check():
                return
            if time.monotonic() >= deadline:
                raise TuiTimeoutError(
                    f"timed out after {timeout}s waiting for {description}.\n"
                    f"--- last screen ---\n{self.screen.text}\n--- end ---"
                )
            try:
                await asyncio.wait_for(
                    self._screen_changed.wait(),
                    timeout=min(self.config.poll_interval, deadline - time.monotonic()),
                )
            except TimeoutError:
                pass
type async
type(text: str, *, delay: float = 0.0) -> None

Send each character as its UTF-8 bytes.

delay is seconds between characters — useful for apps that debounce input or for emulating a human typist in demos.

Source code in src/tuiwright/session.py
async def type(self, text: str, *, delay: float = 0.0) -> None:
    """Send each character as its UTF-8 bytes.

    ``delay`` is seconds between characters — useful for apps that
    debounce input or for emulating a human typist in demos.
    """
    self._require_started()
    if delay <= 0:
        data = text.encode("utf-8")
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)
        return
    for ch in text:
        data = ch.encode("utf-8")
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(delay)
wait_for_stable async
wait_for_stable(
    *,
    quiet_ms: int | None = None,
    timeout: float | None = None,
    ignore_cursor: bool = True,
) -> None

Wait until the visible screen state has stopped changing.

This is content-based, not byte-based: a TUI that pulses a cursor blink, ticks a spinner, or polls a status indicator will still settle as long as the rendered text + attributes are stable. Set ignore_cursor=False to also require the cursor position / visibility to stop changing (rarely needed; cursor blink is the most common reason for false negatives).

Source code in src/tuiwright/session.py
async def wait_for_stable(
    self,
    *,
    quiet_ms: int | None = None,
    timeout: float | None = None,
    ignore_cursor: bool = True,
) -> None:
    """Wait until the *visible* screen state has stopped changing.

    This is content-based, not byte-based: a TUI that pulses a
    cursor blink, ticks a spinner, or polls a status indicator will
    still settle as long as the rendered text + attributes are
    stable. Set ``ignore_cursor=False`` to also require the cursor
    position / visibility to stop changing (rarely needed; cursor
    blink is the most common reason for false negatives).
    """
    quiet = (quiet_ms if quiet_ms is not None else self.config.stable_quiet_ms) / 1000.0
    timeout = timeout if timeout is not None else self.config.default_timeout
    deadline = time.monotonic() + timeout

    def fingerprint() -> tuple[object, ...]:
        screen = self.screen
        # Compare the cell grid (chars + attrs) and the active DEC
        # modes. Cursor visibility / position is usually noise — we
        # only include it when the caller asks.
        base: tuple[object, ...] = (screen.cells, screen.modes)
        if not ignore_cursor:
            return (*base, screen.cursor)
        return base

    last_state = fingerprint()
    last_change = time.monotonic()
    while True:
        await asyncio.sleep(self.config.poll_interval)
        now = time.monotonic()
        current = fingerprint()
        if current != last_state:
            last_state = current
            last_change = now
        elif now - last_change >= quiet:
            return
        if now >= deadline:
            raise TuiTimeoutError(
                f"screen did not stabilise within {timeout}s "
                f"(quiet_ms={int(quiet * 1000)})"
            )
png
png(*, out_path: Path | str | None = None, theme: str = 'asciinema') -> bytes

Render the current cast position to a PNG via agg.

Returns the PNG bytes; if out_path is given, also writes there. Raises :class:FileNotFoundError if agg is not installed.

Source code in src/tuiwright/session.py
def png(self, *, out_path: Path | str | None = None, theme: str = "asciinema") -> bytes:
    """Render the current cast position to a PNG via ``agg``.

    Returns the PNG bytes; if ``out_path`` is given, also writes there.
    Raises :class:`FileNotFoundError` if ``agg`` is not installed.
    """
    if self._cast_path is None:
        raise RuntimeError("session not started")
    agg = self.config.agg_path or shutil.which("agg")
    if agg is None:
        raise FileNotFoundError(
            "agg (asciinema-agg) is not installed or not on PATH. "
            "Install via `cargo install --git https://github.com/asciinema/agg` "
            "or `brew install agg`, or set TuiConfig.agg_path."
        )
    # agg streams to stdout when given '-' as output, but the v1 CLI
    # requires a file argument. Use a tempfile.
    target = Path(out_path) if out_path else Path(tempfile.mkstemp(suffix=".png")[1])
    try:
        subprocess.run(
            [agg, "--theme", theme, "--cols", str(self.config.cols),
             "--rows", str(self.config.rows), str(self._cast_path), str(target)],
            check=True,
            capture_output=True,
        )
        return target.read_bytes()
    finally:
        if out_path is None:
            try:
                target.unlink()
            except OSError:
                pass

TuiTimeoutError

Bases: TimeoutError

Raised when a wait_for_* call exceeds its timeout.

Source code in src/tuiwright/session.py
class TuiTimeoutError(TimeoutError):
    """Raised when a ``wait_for_*`` call exceeds its timeout."""

Session

TuiSession

Drives a TUI binary under a PTY.

Use it as an async context manager or call :meth:start / :meth:stop explicitly. After start, every method that injects input is sync from the caller's perspective (no awaiting on the child) but async because most flows interleave with await wait_for_*.

Source code in src/tuiwright/session.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
class TuiSession:
    """Drives a TUI binary under a PTY.

    Use it as an async context manager or call :meth:`start` / :meth:`stop`
    explicitly. After start, every method that injects input is sync from
    the caller's perspective (no awaiting on the child) but ``async``
    because most flows interleave with ``await wait_for_*``.
    """

    def __init__(self, config: TuiConfig | None = None) -> None:
        self.config = config or TuiConfig()
        self._pty = PtyTransport()
        self._emu = Emulator(cols=self.config.cols, rows=self.config.rows)
        self._recorder: CastRecorder | None = None
        self._cast_path: Path | None = None
        self._started = False
        self._screen_changed = asyncio.Event()
        self._mouse_warned = False
        self._argv: list[str] = []

    # -- lifecycle -----------------------------------------------------

    async def __aenter__(self) -> TuiSession:
        return self

    async def __aexit__(self, *exc: object) -> None:
        await self.stop()

    async def start(
        self,
        cmd: str | list[str],
        *,
        env: dict[str, str] | None = None,
        cwd: str | os.PathLike[str] | None = None,
        cols: int | None = None,
        rows: int | None = None,
        cast_path: Path | str | None = None,
    ) -> None:
        if self._started:
            raise RuntimeError("session already started; create a new one")
        if cols is not None:
            self.config.cols = cols
        if rows is not None:
            self.config.rows = rows
        self._emu = Emulator(cols=self.config.cols, rows=self.config.rows)
        self._argv = shlex.split(cmd) if isinstance(cmd, str) else list(cmd)

        # Cast file: explicit > config.cast_dir > tempdir.
        if cast_path is not None:
            self._cast_path = Path(cast_path)
        else:
            base = self.config.cast_dir or Path(tempfile.gettempdir()) / "tuiwright"
            base.mkdir(parents=True, exist_ok=True)
            self._cast_path = base / f"session-{uuid.uuid4().hex[:12]}.cast"

        full_env = {**os.environ, **(env or {})}
        full_env.setdefault("TERM", "xterm-256color")
        full_env.setdefault("COLORTERM", "truecolor")
        # Disable any prompt-toolkit / pager that would mess with size.
        full_env.setdefault("PAGER", "cat")
        full_env.setdefault("LESS", "-FRX")

        self._recorder = CastRecorder(
            self._cast_path,
            cols=self.config.cols,
            rows=self.config.rows,
            env=full_env,
            title=" ".join(self._argv),
        )
        # Hook listeners BEFORE spawning so we don't miss the first chunk
        # (PTYs can emit data the same loop tick the reader is registered).
        self._pty.add_listener(self._on_output)
        await self._pty.spawn(
            self._argv,
            env=full_env,
            cwd=str(cwd) if cwd is not None else None,
            cols=self.config.cols,
            rows=self.config.rows,
        )
        self._started = True

    async def stop(self, *, timeout: float = 2.0) -> int:
        if not self._started:
            return 0
        try:
            return await self._pty.stop(timeout=timeout)
        finally:
            if self._recorder is not None:
                self._recorder.close()
            self._started = False

    # -- listeners -----------------------------------------------------

    def _on_output(self, chunk: bytes) -> None:
        self._emu.feed(chunk)
        if self._recorder is not None:
            self._recorder.record_output(chunk)
        # Wake any wait_for_* waiters.
        self._screen_changed.set()
        # Re-arm. A wait coroutine that consumed the event must rebuild
        # its reference before re-awaiting; we use a fresh Event each
        # tick to avoid races.
        self._screen_changed = asyncio.Event()

    def _record_input(self, data: bytes) -> None:
        if self._recorder is not None:
            self._recorder.record_input(data)

    # -- properties ----------------------------------------------------

    @property
    def screen(self) -> Screen:
        return Screen.from_emulator(self._emu)

    @property
    def cast_path(self) -> Path:
        if self._cast_path is None:
            raise RuntimeError("session not started yet")
        return self._cast_path

    @property
    def alive(self) -> bool:
        return self._pty.alive

    # -- input: keys / text --------------------------------------------

    async def press(self, key: str) -> None:
        self._require_started()
        data = encode_key(key)
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)  # let the loop service the write/read

    async def type(self, text: str, *, delay: float = 0.0) -> None:
        """Send each character as its UTF-8 bytes.

        ``delay`` is seconds between characters — useful for apps that
        debounce input or for emulating a human typist in demos.
        """
        self._require_started()
        if delay <= 0:
            data = text.encode("utf-8")
            self._record_input(data)
            self._pty.write_bytes(data)
            await asyncio.sleep(0)
            return
        for ch in text:
            data = ch.encode("utf-8")
            self._record_input(data)
            self._pty.write_bytes(data)
            await asyncio.sleep(delay)

    async def paste(self, text: str) -> None:
        self._require_started()
        if not self._emu.is_bracketed_paste():
            # The app didn't enable bracketed paste, so the brackets would
            # leak into the buffer. Fall back to plain type.
            await self.type(text)
            return
        data = encode_paste(text)
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)

    # -- input: mouse ---------------------------------------------------

    async def click(
        self,
        row: int,
        col: int,
        *,
        button: str = "left",
        modifiers: tuple[str, ...] = (),
    ) -> None:
        self._require_started()
        self._check_mouse_enabled()
        mods = _modtuple_to_mod(modifiers)
        # 0-based → 1-based on the wire.
        down = encode_mouse(
            button=button, row=row + 1, col=col + 1, pressed=True, modifiers=mods
        )
        up = encode_mouse(
            button=button, row=row + 1, col=col + 1, pressed=False, modifiers=mods
        )
        self._record_input(down + up)
        self._pty.write_bytes(down)
        # Small gap so apps that distinguish press/release see two events.
        await asyncio.sleep(0)
        self._pty.write_bytes(up)
        await asyncio.sleep(0)

    async def double_click(
        self, row: int, col: int, *, button: str = "left", interval: float = 0.05
    ) -> None:
        await self.click(row, col, button=button)
        await asyncio.sleep(interval)
        await self.click(row, col, button=button)

    async def drag(
        self,
        from_row: int,
        from_col: int,
        to_row: int,
        to_col: int,
        *,
        button: str = "left",
        steps: int = 4,
    ) -> None:
        self._require_started()
        self._check_mouse_enabled()
        btn = resolve_button(button)
        down = encode_mouse(button=btn, row=from_row + 1, col=from_col + 1, pressed=True)
        self._record_input(down)
        self._pty.write_bytes(down)
        await asyncio.sleep(0)
        for i in range(1, steps + 1):
            r = from_row + (to_row - from_row) * i // steps
            c = from_col + (to_col - from_col) * i // steps
            move = encode_mouse(
                button=btn, row=r + 1, col=c + 1, pressed=True, motion=True
            )
            self._record_input(move)
            self._pty.write_bytes(move)
            await asyncio.sleep(0)
        up = encode_mouse(button=btn, row=to_row + 1, col=to_col + 1, pressed=False)
        self._record_input(up)
        self._pty.write_bytes(up)
        await asyncio.sleep(0)

    async def scroll(
        self,
        row: int,
        col: int,
        *,
        direction: str = "down",
        lines: int = 1,
    ) -> None:
        self._require_started()
        self._check_mouse_enabled()
        button = "wheel_down" if direction == "down" else "wheel_up"
        for _ in range(lines):
            data = encode_mouse(button=button, row=row + 1, col=col + 1, pressed=True)
            self._record_input(data)
            self._pty.write_bytes(data)
            await asyncio.sleep(0)

    async def hover(self, row: int, col: int) -> None:
        self._require_started()
        self._check_mouse_enabled()
        # Motion-only event uses button code 3 (release) + motion bit (32).
        data = encode_mouse(
            button="left", row=row + 1, col=col + 1, pressed=True, motion=True
        )
        # Override button bits with 35 (3 + 32) for "no button motion".
        # The encoder doesn't expose that directly, so build it inline:
        from tuiwright._input import CSI
        msg = CSI + b"<35;" + str(col + 1).encode() + b";" + str(row + 1).encode() + b"M"
        self._record_input(msg)
        self._pty.write_bytes(msg)
        await asyncio.sleep(0)
        _ = data  # silence unused warning

    # -- input: resize / focus -----------------------------------------

    async def resize(self, cols: int, rows: int) -> None:
        self._require_started()
        self._pty.resize(cols, rows)
        self._emu.resize(cols, rows)
        self.config.cols = cols
        self.config.rows = rows
        await asyncio.sleep(0)

    async def focus(self, in_: bool = True) -> None:
        self._require_started()
        if not self._emu.is_focus_events():
            return  # No-op if the app didn't enable focus reporting.
        data = encode_focus(in_)
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)

    # -- waits ----------------------------------------------------------

    async def wait_for_text(
        self,
        needle: str | re.Pattern[str],
        *,
        timeout: float | None = None,
        region: Region | None = None,
        regex: bool = False,
    ) -> Match[str]:
        timeout = timeout if timeout is not None else self.config.default_timeout
        pattern = needle if isinstance(needle, re.Pattern) else (
            re.compile(needle) if regex else re.compile(re.escape(needle))
        )
        # A Region is a snapshot of one Screen. To keep polling honest, we
        # re-derive a Region with the same bounds (or title lookup) from
        # the current screen on every iteration.
        bounds = None if region is None else (region.title, region.top, region.bottom, region.left, region.right)

        def check() -> Match[str] | None:
            if bounds is None:
                target = self.screen.text
            else:
                title, top, bottom, left, right = bounds
                live_screen = self.screen
                if title is not None:
                    try:
                        live = live_screen.region(title=title)
                        target = live.text
                    except LookupError:
                        target = ""
                else:
                    target = "\n".join(
                        "".join(c.char for c in live_screen.cells[r][left:right]).rstrip()
                        for r in range(top, min(bottom, live_screen.rows))
                    )
            return pattern.search(target)

        return await self._poll_until(check, timeout=timeout, description=f"text {needle!r}")

    async def wait_for_predicate(
        self,
        predicate: Callable[[Screen], bool | Awaitable[bool]],
        *,
        timeout: float | None = None,
        description: str = "predicate",
    ) -> None:
        timeout = timeout if timeout is not None else self.config.default_timeout

        async def check() -> bool:
            res = predicate(self.screen)
            if asyncio.iscoroutine(res):
                res = await res
            return bool(res)

        await self._poll_until_async(check, timeout=timeout, description=description)

    async def wait_for_stable(
        self,
        *,
        quiet_ms: int | None = None,
        timeout: float | None = None,
        ignore_cursor: bool = True,
    ) -> None:
        """Wait until the *visible* screen state has stopped changing.

        This is content-based, not byte-based: a TUI that pulses a
        cursor blink, ticks a spinner, or polls a status indicator will
        still settle as long as the rendered text + attributes are
        stable. Set ``ignore_cursor=False`` to also require the cursor
        position / visibility to stop changing (rarely needed; cursor
        blink is the most common reason for false negatives).
        """
        quiet = (quiet_ms if quiet_ms is not None else self.config.stable_quiet_ms) / 1000.0
        timeout = timeout if timeout is not None else self.config.default_timeout
        deadline = time.monotonic() + timeout

        def fingerprint() -> tuple[object, ...]:
            screen = self.screen
            # Compare the cell grid (chars + attrs) and the active DEC
            # modes. Cursor visibility / position is usually noise — we
            # only include it when the caller asks.
            base: tuple[object, ...] = (screen.cells, screen.modes)
            if not ignore_cursor:
                return (*base, screen.cursor)
            return base

        last_state = fingerprint()
        last_change = time.monotonic()
        while True:
            await asyncio.sleep(self.config.poll_interval)
            now = time.monotonic()
            current = fingerprint()
            if current != last_state:
                last_state = current
                last_change = now
            elif now - last_change >= quiet:
                return
            if now >= deadline:
                raise TuiTimeoutError(
                    f"screen did not stabilise within {timeout}s "
                    f"(quiet_ms={int(quiet * 1000)})"
                )

    # -- queries / assertions ------------------------------------------

    def region(
        self,
        *,
        title: str | None = None,
        rows: tuple[int, int] | None = None,
        cols: tuple[int, int] | None = None,
    ) -> Region:
        return self.screen.region(title=title, rows=rows, cols=cols)

    async def assert_region(
        self,
        *,
        title: str | None = None,
        contains: str | re.Pattern[str] | None = None,
        rows: tuple[int, int] | None = None,
        cols: tuple[int, int] | None = None,
        regex: bool = False,
    ) -> None:
        reg = self.region(title=title, rows=rows, cols=cols)
        if contains is not None and not reg.contains(contains, regex=regex):
            raise AssertionError(
                f"region {title or (rows, cols)!r} does not contain {contains!r}.\n"
                f"--- region text ---\n{reg.text}\n--- end ---"
            )

    # -- rendering ------------------------------------------------------

    def png(self, *, out_path: Path | str | None = None, theme: str = "asciinema") -> bytes:
        """Render the current cast position to a PNG via ``agg``.

        Returns the PNG bytes; if ``out_path`` is given, also writes there.
        Raises :class:`FileNotFoundError` if ``agg`` is not installed.
        """
        if self._cast_path is None:
            raise RuntimeError("session not started")
        agg = self.config.agg_path or shutil.which("agg")
        if agg is None:
            raise FileNotFoundError(
                "agg (asciinema-agg) is not installed or not on PATH. "
                "Install via `cargo install --git https://github.com/asciinema/agg` "
                "or `brew install agg`, or set TuiConfig.agg_path."
            )
        # agg streams to stdout when given '-' as output, but the v1 CLI
        # requires a file argument. Use a tempfile.
        target = Path(out_path) if out_path else Path(tempfile.mkstemp(suffix=".png")[1])
        try:
            subprocess.run(
                [agg, "--theme", theme, "--cols", str(self.config.cols),
                 "--rows", str(self.config.rows), str(self._cast_path), str(target)],
                check=True,
                capture_output=True,
            )
            return target.read_bytes()
        finally:
            if out_path is None:
                try:
                    target.unlink()
                except OSError:
                    pass

    # -- internals ------------------------------------------------------

    def _require_started(self) -> None:
        if not self._started:
            raise RuntimeError("call start() before sending input")

    def _check_mouse_enabled(self) -> None:
        if self._emu.is_mouse_tracking():
            return
        if self.config.strict_mouse:
            raise RuntimeError(
                "mouse input sent but the app has not enabled mouse tracking "
                "(modes 1000/1002/1003). Set TuiConfig.strict_mouse=False to warn instead."
            )
        if not self._mouse_warned:
            import warnings
            warnings.warn(
                "Sending mouse event but the app has not enabled mouse tracking. "
                "The app likely won't see this input. Wait for the app to fully start.",
                stacklevel=3,
            )
            self._mouse_warned = True

    async def _poll_until(
        self,
        check: Callable[[], Any],
        *,
        timeout: float,
        description: str,
    ) -> Any:
        deadline = time.monotonic() + timeout
        while True:
            result = check()
            if result:
                return result
            if time.monotonic() >= deadline:
                raise TuiTimeoutError(
                    f"timed out after {timeout}s waiting for {description}.\n"
                    f"--- last screen ---\n{self.screen.text}\n--- end ---"
                )
            try:
                await asyncio.wait_for(
                    self._screen_changed.wait(),
                    timeout=min(self.config.poll_interval, deadline - time.monotonic()),
                )
            except TimeoutError:
                pass

    async def _poll_until_async(
        self,
        check: Callable[[], Awaitable[bool]],
        *,
        timeout: float,
        description: str,
    ) -> None:
        deadline = time.monotonic() + timeout
        while True:
            if await check():
                return
            if time.monotonic() >= deadline:
                raise TuiTimeoutError(
                    f"timed out after {timeout}s waiting for {description}.\n"
                    f"--- last screen ---\n{self.screen.text}\n--- end ---"
                )
            try:
                await asyncio.wait_for(
                    self._screen_changed.wait(),
                    timeout=min(self.config.poll_interval, deadline - time.monotonic()),
                )
            except TimeoutError:
                pass

type async

type(text: str, *, delay: float = 0.0) -> None

Send each character as its UTF-8 bytes.

delay is seconds between characters — useful for apps that debounce input or for emulating a human typist in demos.

Source code in src/tuiwright/session.py
async def type(self, text: str, *, delay: float = 0.0) -> None:
    """Send each character as its UTF-8 bytes.

    ``delay`` is seconds between characters — useful for apps that
    debounce input or for emulating a human typist in demos.
    """
    self._require_started()
    if delay <= 0:
        data = text.encode("utf-8")
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(0)
        return
    for ch in text:
        data = ch.encode("utf-8")
        self._record_input(data)
        self._pty.write_bytes(data)
        await asyncio.sleep(delay)

wait_for_stable async

wait_for_stable(
    *,
    quiet_ms: int | None = None,
    timeout: float | None = None,
    ignore_cursor: bool = True,
) -> None

Wait until the visible screen state has stopped changing.

This is content-based, not byte-based: a TUI that pulses a cursor blink, ticks a spinner, or polls a status indicator will still settle as long as the rendered text + attributes are stable. Set ignore_cursor=False to also require the cursor position / visibility to stop changing (rarely needed; cursor blink is the most common reason for false negatives).

Source code in src/tuiwright/session.py
async def wait_for_stable(
    self,
    *,
    quiet_ms: int | None = None,
    timeout: float | None = None,
    ignore_cursor: bool = True,
) -> None:
    """Wait until the *visible* screen state has stopped changing.

    This is content-based, not byte-based: a TUI that pulses a
    cursor blink, ticks a spinner, or polls a status indicator will
    still settle as long as the rendered text + attributes are
    stable. Set ``ignore_cursor=False`` to also require the cursor
    position / visibility to stop changing (rarely needed; cursor
    blink is the most common reason for false negatives).
    """
    quiet = (quiet_ms if quiet_ms is not None else self.config.stable_quiet_ms) / 1000.0
    timeout = timeout if timeout is not None else self.config.default_timeout
    deadline = time.monotonic() + timeout

    def fingerprint() -> tuple[object, ...]:
        screen = self.screen
        # Compare the cell grid (chars + attrs) and the active DEC
        # modes. Cursor visibility / position is usually noise — we
        # only include it when the caller asks.
        base: tuple[object, ...] = (screen.cells, screen.modes)
        if not ignore_cursor:
            return (*base, screen.cursor)
        return base

    last_state = fingerprint()
    last_change = time.monotonic()
    while True:
        await asyncio.sleep(self.config.poll_interval)
        now = time.monotonic()
        current = fingerprint()
        if current != last_state:
            last_state = current
            last_change = now
        elif now - last_change >= quiet:
            return
        if now >= deadline:
            raise TuiTimeoutError(
                f"screen did not stabilise within {timeout}s "
                f"(quiet_ms={int(quiet * 1000)})"
            )

png

png(*, out_path: Path | str | None = None, theme: str = 'asciinema') -> bytes

Render the current cast position to a PNG via agg.

Returns the PNG bytes; if out_path is given, also writes there. Raises :class:FileNotFoundError if agg is not installed.

Source code in src/tuiwright/session.py
def png(self, *, out_path: Path | str | None = None, theme: str = "asciinema") -> bytes:
    """Render the current cast position to a PNG via ``agg``.

    Returns the PNG bytes; if ``out_path`` is given, also writes there.
    Raises :class:`FileNotFoundError` if ``agg`` is not installed.
    """
    if self._cast_path is None:
        raise RuntimeError("session not started")
    agg = self.config.agg_path or shutil.which("agg")
    if agg is None:
        raise FileNotFoundError(
            "agg (asciinema-agg) is not installed or not on PATH. "
            "Install via `cargo install --git https://github.com/asciinema/agg` "
            "or `brew install agg`, or set TuiConfig.agg_path."
        )
    # agg streams to stdout when given '-' as output, but the v1 CLI
    # requires a file argument. Use a tempfile.
    target = Path(out_path) if out_path else Path(tempfile.mkstemp(suffix=".png")[1])
    try:
        subprocess.run(
            [agg, "--theme", theme, "--cols", str(self.config.cols),
             "--rows", str(self.config.rows), str(self._cast_path), str(target)],
            check=True,
            capture_output=True,
        )
        return target.read_bytes()
    finally:
        if out_path is None:
            try:
                target.unlink()
            except OSError:
                pass

TuiConfig dataclass

Source code in src/tuiwright/session.py
@dataclass
class TuiConfig:
    cols: int = 80
    rows: int = 24
    default_timeout: float = 5.0
    poll_interval: float = _POLL_SECONDS
    stable_quiet_ms: int = 50
    agg_path: str | None = None  # Discovered lazily from PATH if None.
    cast_dir: Path | None = None  # Tempdir per session if None.
    strict_mouse: bool = False   # Raise instead of warning on mouse-mode mismatch.

TuiTimeoutError

Bases: TimeoutError

Raised when a wait_for_* call exceeds its timeout.

Source code in src/tuiwright/session.py
class TuiTimeoutError(TimeoutError):
    """Raised when a ``wait_for_*`` call exceeds its timeout."""

Screen model

Screen dataclass

An immutable snapshot of the terminal grid.

Source code in src/tuiwright/screen.py
@dataclass(frozen=True)
class Screen:
    """An immutable snapshot of the terminal grid."""

    rows: int
    cols: int
    cells: tuple[tuple[Cell, ...], ...]
    cursor: Cursor
    modes: frozenset[int] = field(default_factory=frozenset)

    # -- construction --------------------------------------------------

    @classmethod
    def from_emulator(cls, emu: Emulator) -> Screen:
        from tuiwright._emulator import Emulator as _Emu  # noqa: F401

        pyte_cells = emu.cells()
        rows = tuple(
            tuple(
                Cell(
                    char=pc.data or " ",
                    fg=Color.from_pyte(pc.fg),
                    bg=Color.from_pyte(pc.bg),
                    bold=bool(pc.bold),
                    italic=bool(pc.italics),
                    underline=bool(pc.underscore),
                    reverse=bool(pc.reverse),
                    strike=bool(pc.strikethrough),
                    blink=bool(pc.blink),
                )
                for pc in row
            )
            for row in pyte_cells
        )
        pyte_cursor = emu.screen.cursor
        cursor = Cursor(row=pyte_cursor.y, col=pyte_cursor.x, hidden=bool(pyte_cursor.hidden))
        return cls(
            rows=emu.rows,
            cols=emu.cols,
            cells=rows,
            cursor=cursor,
            modes=emu.private_modes,
        )

    # -- text views ----------------------------------------------------

    @property
    def text(self) -> str:
        """All rows joined with ``\\n``, trailing spaces stripped per row."""
        return "\n".join(self.row(i) for i in range(self.rows))

    def row(self, i: int) -> str:
        return "".join(c.char for c in self.cells[i]).rstrip()

    def row_padded(self, i: int) -> str:
        return "".join(c.char for c in self.cells[i])

    def cell(self, row: int, col: int) -> Cell:
        return self.cells[row][col]

    # -- search --------------------------------------------------------

    def find(self, needle: str | re.Pattern[str], *, regex: bool = False) -> list[Position]:
        pattern = needle if isinstance(needle, re.Pattern) else (
            re.compile(needle) if regex else re.compile(re.escape(needle))
        )
        out: list[Position] = []
        for r in range(self.rows):
            for m in pattern.finditer(self.row_padded(r)):
                out.append(Position(r, m.start()))
        return out

    def row_containing(
        self, needle: str | re.Pattern[str], *, regex: bool = False
    ) -> int | None:
        hits = self.find(needle, regex=regex)
        return hits[0].row if hits else None

    def contains(self, needle: str | re.Pattern[str], *, regex: bool = False) -> bool:
        return bool(self.find(needle, regex=regex))

    # -- regions -------------------------------------------------------

    def region(
        self,
        *,
        title: str | None = None,
        rows: tuple[int, int] | None = None,
        cols: tuple[int, int] | None = None,
    ) -> Region:
        if title is not None:
            found = _find_titled_region(self, title)
            if found is None:
                raise LookupError(f"no titled region matching {title!r}")
            return found
        if rows is None or cols is None:
            raise ValueError("either title= or both rows= and cols= must be given")
        r0, r1 = rows
        c0, c1 = cols
        return Region(screen=self, top=r0, bottom=r1, left=c0, right=c1, title=None)

text property

text: str

All rows joined with \n, trailing spaces stripped per row.

Region dataclass

A rectangular sub-view of a :class:Screen.

Bounds are inclusive on top/left and exclusive on bottom/right (standard Python slice convention).

Source code in src/tuiwright/screen.py
@dataclass(frozen=True)
class Region:
    """A rectangular sub-view of a :class:`Screen`.

    Bounds are *inclusive* on top/left and *exclusive* on bottom/right
    (standard Python slice convention).
    """

    screen: Screen
    top: int
    bottom: int
    left: int
    right: int
    title: str | None = None

    @property
    def rows(self) -> int:
        return self.bottom - self.top

    @property
    def cols(self) -> int:
        return self.right - self.left

    @property
    def text(self) -> str:
        return "\n".join(self.row(i) for i in range(self.rows))

    def row(self, i: int) -> str:
        cells = self.screen.cells[self.top + i][self.left : self.right]
        return "".join(c.char for c in cells).rstrip()

    def contains(self, needle: str | re.Pattern[str], *, regex: bool = False) -> bool:
        pat = needle if isinstance(needle, re.Pattern) else (
            re.compile(needle) if regex else re.compile(re.escape(needle))
        )
        return any(pat.search(self.row(i)) for i in range(self.rows))

Cell dataclass

A single rendered terminal cell.

Source code in src/tuiwright/screen.py
@dataclass(frozen=True, slots=True)
class Cell:
    """A single rendered terminal cell."""

    char: str = " "
    fg: Color = _DEFAULT_COLOR
    bg: Color = _DEFAULT_COLOR
    bold: bool = False
    italic: bool = False
    underline: bool = False
    reverse: bool = False
    strike: bool = False
    blink: bool = False

    def has_default_attrs(self) -> bool:
        return (
            self.fg.is_default
            and self.bg.is_default
            and not (self.bold or self.italic or self.underline or self.reverse or self.strike or self.blink)
        )

Color dataclass

A foreground or background colour.

name is one of "default", "black", "red" … or the empty string if rgb is set. rgb is a hex triple "ff8800" (no leading #) for true-colour cells, or None otherwise.

Source code in src/tuiwright/screen.py
@dataclass(frozen=True, slots=True)
class Color:
    """A foreground or background colour.

    ``name`` is one of ``"default"``, ``"black"``, ``"red"`` … or the empty
    string if ``rgb`` is set. ``rgb`` is a hex triple ``"ff8800"`` (no
    leading ``#``) for true-colour cells, or ``None`` otherwise.
    """

    name: str = "default"
    rgb: str | None = None

    @classmethod
    def from_pyte(cls, value: str) -> Color:
        # pyte uses "default" sentinel and 6-char hex for true colour.
        if value == "default":
            return _DEFAULT_COLOR
        if len(value) == 6 and all(c in "0123456789abcdefABCDEF" for c in value):
            return cls(name="", rgb=value.lower())
        return cls(name=value)

    @property
    def is_default(self) -> bool:
        return self.name == "default" and self.rgb is None

Cursor dataclass

Source code in src/tuiwright/screen.py
@dataclass(frozen=True, slots=True)
class Cursor:
    row: int
    col: int
    hidden: bool = False

Position

Bases: NamedTuple

Source code in src/tuiwright/screen.py
class Position(NamedTuple):
    row: int
    col: int

Snapshot extensions

ScreenSnapshotExtension

Bases: SingleFileSnapshotExtension

Snapshots :class:Screen values to a reviewable text+JSON file.

Source code in src/tuiwright/_snapshot/cells.py
class ScreenSnapshotExtension(SingleFileSnapshotExtension):
    """Snapshots :class:`Screen` values to a reviewable text+JSON file."""

    file_extension = "screen"
    _write_mode = WriteMode.TEXT

    def serialize(  # type: ignore[override]
        self,
        data: SerializableData,
        *,
        exclude: PropertyFilter | None = None,
        include: PropertyFilter | None = None,
        matcher: PropertyMatcher | None = None,
    ) -> SerializedData:
        if not isinstance(data, Screen):
            raise TypeError(
                f"ScreenSnapshotExtension only accepts Screen values, got {type(data).__name__}"
            )
        return _serialize_screen(data)

PNGSnapshotExtension

Bases: SingleFileSnapshotExtension

Snapshots raw PNG bytes; pixel-diffs on read.

Source code in src/tuiwright/_snapshot/png.py
class PNGSnapshotExtension(SingleFileSnapshotExtension):
    """Snapshots raw PNG bytes; pixel-diffs on read."""

    file_extension = "png"
    _write_mode = WriteMode.BINARY

    threshold: float = 0.1
    include_anti_alias: bool = True

    def serialize(  # type: ignore[override]
        self,
        data: SerializableData,
        *,
        exclude: PropertyFilter | None = None,
        include: PropertyFilter | None = None,
        matcher: PropertyMatcher | None = None,
    ) -> SerializedData:
        if isinstance(data, (bytes, bytearray)):
            return bytes(data)
        if isinstance(data, (str, Path)):
            return Path(data).read_bytes()
        raise TypeError(
            f"PNGSnapshotExtension expects bytes or a path, got {type(data).__name__}"
        )

    def matches(  # type: ignore[override]
        self,
        *,
        serialized_data: SerializedData,
        snapshot_data: SerializedData,
    ) -> bool:
        if serialized_data == snapshot_data:
            return True
        # Pixel-tolerant compare.
        try:
            actual = Image.open(io.BytesIO(serialized_data)).convert("RGBA")
            expected = Image.open(io.BytesIO(snapshot_data)).convert("RGBA")
        except Exception:
            return False
        if actual.size != expected.size:
            return False
        diff_img = Image.new("RGBA", actual.size)
        mismatched = pixelmatch(
            expected,
            actual,
            diff_img,
            threshold=self.threshold,
            includeAA=self.include_anti_alias,
        )
        return mismatched == 0