| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # Copyright (C) 2024 by the Linux Foundation |
| # |
| """Integration tests for the TrackingApp TUI. |
| |
| Uses real SQLite databases (via b4.review.tracking) and git repos |
| (via the gitdir fixture), but no network access. Tests exercise |
| core user workflows: series listing, navigation, filtering, |
| status transitions, and modal interactions. |
| """ |
| |
| import pathlib |
| from typing import Any, Dict, List, Optional |
| from unittest.mock import patch |
| |
| import pytest |
| from textual.widgets import Input, ListView, Static |
| |
| import b4 |
| import b4.review |
| import b4.review.tracking as tracking |
| from b4.review_tui._modals import ( |
| ActionItem, |
| ActionScreen, |
| ConfirmScreen, |
| HelpScreen, |
| LimitScreen, |
| SnoozeScreen, |
| TargetBranchScreen, |
| ) |
| from b4.review_tui._tracking_app import TrackedSeriesItem, TrackingApp |
| |
| # --------------------------------------------------------------------------- |
| # Compat helper — Textual ≥ 1.0 (pip) uses Static.content, |
| # older builds (e.g. Fedora 43 package) still use Static.renderable. |
| # --------------------------------------------------------------------------- |
| |
| |
| def _static_text(widget: Any) -> str: |
| """Return the text content of a Static widget across Textual versions.""" |
| if hasattr(widget, 'content'): |
| return str(widget.content) |
| return str(widget.renderable) |
| |
| |
| # --------------------------------------------------------------------------- |
| # Helpers |
| # --------------------------------------------------------------------------- |
| |
| |
| def _seed_db(identifier: str, series_list: List[Dict[str, Any]]) -> None: |
| """Create and populate a tracking database with test series.""" |
| conn = tracking.init_db(identifier) |
| for s in series_list: |
| tracking.add_series_to_db( |
| conn, |
| change_id=s['change_id'], |
| revision=s.get('revision', 1), |
| subject=s.get('subject', '[PATCH] test'), |
| sender_name=s.get('sender_name', 'Test Author'), |
| sender_email=s.get('sender_email', 'author@example.com'), |
| sent_at=s.get('sent_at', '2026-01-15T10:00:00+00:00'), |
| message_id=s.get('message_id', f'{s["change_id"]}@example.com'), |
| num_patches=s.get('num_patches', 1), |
| ) |
| # Set status if specified (add_series_to_db always starts as 'new') |
| status = s.get('status') |
| if status and status != 'new': |
| conn.execute( |
| 'UPDATE series SET status = ? WHERE change_id = ? AND revision = ?', |
| (status, s['change_id'], s.get('revision', 1)), |
| ) |
| conn.commit() |
| # Set message counts if specified |
| mc = s.get('message_count') |
| if mc is not None: |
| conn.execute( |
| 'UPDATE series SET message_count = ?, seen_message_count = ? ' |
| 'WHERE change_id = ? AND revision = ?', |
| ( |
| mc, |
| s.get('seen_message_count', mc), |
| s['change_id'], |
| s.get('revision', 1), |
| ), |
| ) |
| conn.commit() |
| conn.close() |
| |
| |
| def _create_review_branch( |
| gitdir: str, |
| change_id: str, |
| identifier: str = 'test-project', |
| revision: int = 1, |
| status: str = 'reviewing', |
| subject: str = 'Test series', |
| sender_name: str = 'Test Author', |
| sender_email: str = 'test@example.com', |
| ) -> str: |
| """Create a fake b4 review branch with a proper tracking commit. |
| |
| Returns the branch name. |
| """ |
| branch_name = f'b4/review/{change_id}' |
| # Get current HEAD as base |
| ecode, base_sha = b4.git_run_command(gitdir, ['rev-parse', 'HEAD']) |
| assert ecode == 0 |
| base_sha = base_sha.strip() |
| # Create the branch at HEAD |
| ecode, _ = b4.git_run_command(gitdir, ['branch', branch_name, base_sha]) |
| assert ecode == 0 |
| # Build tracking metadata |
| trk = { |
| 'series': { |
| 'identifier': identifier, |
| 'change-id': change_id, |
| 'revision': revision, |
| 'status': status, |
| 'subject': subject, |
| 'fromname': sender_name, |
| 'fromemail': sender_email, |
| 'expected': 1, |
| 'complete': True, |
| 'base-commit': base_sha, |
| 'prerequisite-commits': [], |
| 'first-patch-commit': base_sha, |
| 'header-info': {}, |
| }, |
| 'followups': [], |
| 'patches': [], |
| } |
| commit_msg = f'{subject}\n\n{b4.review.make_review_magic_json(trk)}' |
| # Create an empty tracking commit on the branch |
| ecode, tree = b4.git_run_command(gitdir, ['rev-parse', f'{branch_name}^{{tree}}']) |
| assert ecode == 0 |
| tree = tree.strip() |
| ecode, new_sha = b4.git_run_command( |
| gitdir, |
| ['commit-tree', tree, '-p', base_sha], |
| stdin=commit_msg.encode(), |
| ) |
| assert ecode == 0 |
| new_sha = new_sha.strip() |
| ecode, _ = b4.git_run_command( |
| gitdir, ['update-ref', f'refs/heads/{branch_name}', new_sha] |
| ) |
| assert ecode == 0 |
| return branch_name |
| |
| |
| SAMPLE_SERIES: List[Dict[str, Any]] = [ |
| { |
| 'change_id': 'test-change-alpha', |
| 'subject': '[PATCH net-next] alpha: add widget support', |
| 'sender_name': 'Alice Author', |
| 'sender_email': 'alice@example.com', |
| 'sent_at': '2026-03-10T10:00:00+00:00', |
| 'message_id': 'alpha-v1@example.com', |
| 'num_patches': 3, |
| 'message_count': 5, |
| 'seen_message_count': 3, |
| }, |
| { |
| 'change_id': 'test-change-bravo', |
| 'subject': '[PATCH drm] bravo: fix cursor rendering', |
| 'sender_name': 'Bob Builder', |
| 'sender_email': 'bob@example.com', |
| 'sent_at': '2026-03-09T08:00:00+00:00', |
| 'message_id': 'bravo-v1@example.com', |
| 'num_patches': 1, |
| }, |
| { |
| 'change_id': 'test-change-charlie', |
| 'subject': '[PATCH v2 bpf] charlie: verifier refactor', |
| 'sender_name': 'Charlie Coder', |
| 'sender_email': 'charlie@example.com', |
| 'sent_at': '2026-03-08T12:00:00+00:00', |
| 'message_id': 'charlie-v2@example.com', |
| 'num_patches': 7, |
| 'revision': 2, |
| }, |
| ] |
| |
| |
| # --------------------------------------------------------------------------- |
| # Tests |
| # --------------------------------------------------------------------------- |
| |
| |
| class TestTrackingAppStartup: |
| """Tests for the TrackingApp startup and series listing.""" |
| |
| @pytest.mark.asyncio |
| async def test_empty_database(self, tmp_path: pathlib.Path) -> None: |
| """App should show empty message when no series are tracked.""" |
| _seed_db('test-empty', []) |
| |
| app = TrackingApp('test-empty') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Should show the "no tracked series" message |
| empty = app.query('#tracking-empty') |
| assert len(empty) > 0 |
| |
| @pytest.mark.asyncio |
| async def test_series_listed(self, tmp_path: pathlib.Path) -> None: |
| """App should display all seeded series.""" |
| _seed_db('test-listing', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-listing') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| assert len(list(lv.children)) == 3 |
| |
| @pytest.mark.asyncio |
| async def test_title_shows_identifier_and_count( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| _seed_db('test-title', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-title') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| title = app.query_one('#title-left', Static) |
| assert 'test-title' in _static_text(title) |
| |
| @pytest.mark.asyncio |
| async def test_series_sorted_by_added_at(self, tmp_path: pathlib.Path) -> None: |
| """Series should appear newest-tracked-first (by added_at).""" |
| _seed_db('test-sort', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-sort') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| # All are 'new' (actionable); sorted by added_at desc — last |
| # inserted (charlie) appears first, first inserted (alpha) last. |
| subjects = [i.series['subject'] for i in items] |
| assert 'charlie' in subjects[0] |
| assert 'bravo' in subjects[1] |
| assert 'alpha' in subjects[2] |
| |
| |
| class TestTrackingNavigation: |
| """Tests for keyboard navigation in the tracking list.""" |
| |
| @pytest.mark.asyncio |
| async def test_jk_navigation(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-nav', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-nav') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| assert lv.index == 0 |
| |
| await pilot.press('j') |
| await pilot.pause() |
| assert lv.index == 1 |
| |
| await pilot.press('j') |
| await pilot.pause() |
| assert lv.index == 2 |
| |
| await pilot.press('k') |
| await pilot.pause() |
| assert lv.index == 1 |
| |
| @pytest.mark.asyncio |
| async def test_help_opens_and_closes(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-help', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-help') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| await pilot.press('question_mark') |
| await pilot.pause() |
| assert isinstance(app.screen, HelpScreen) |
| |
| await pilot.press('q') |
| await pilot.pause() |
| assert not isinstance(app.screen, HelpScreen) |
| |
| |
| class TestTrackingLimit: |
| """Tests for the limit/filter functionality.""" |
| |
| @pytest.mark.asyncio |
| async def test_limit_filters_by_subject(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-limit', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-limit') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| assert len(list(lv.children)) == 3 |
| |
| # Open limit dialog and filter for 'drm' |
| await pilot.press('l') |
| await pilot.pause() |
| assert isinstance(app.screen, LimitScreen) |
| |
| from textual.widgets import Input |
| |
| inp = app.screen.query_one('#limit-input', Input) |
| inp.value = 'drm' |
| await pilot.press('enter') |
| await pilot.pause() |
| |
| # Should now show only the 'bravo' series |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| assert len(items) == 1 |
| assert 'bravo' in items[0].series['subject'] |
| |
| @pytest.mark.asyncio |
| async def test_limit_filters_by_sender(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-limit-sender', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-limit-sender') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| await pilot.press('l') |
| await pilot.pause() |
| |
| from textual.widgets import Input |
| |
| inp = app.screen.query_one('#limit-input', Input) |
| inp.value = 'Charlie' |
| await pilot.press('enter') |
| await pilot.pause() |
| |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| assert len(items) == 1 |
| assert 'charlie' in items[0].series['subject'] |
| |
| @pytest.mark.asyncio |
| async def test_clear_limit(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-limit-clear', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-limit-clear') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| # Apply a filter |
| await pilot.press('l') |
| await pilot.pause() |
| from textual.widgets import Input |
| |
| inp = app.screen.query_one('#limit-input', Input) |
| inp.value = 'alpha' |
| await pilot.press('enter') |
| await pilot.pause() |
| |
| lv = app.query_one('#tracking-list', ListView) |
| assert ( |
| len([c for c in lv.children if isinstance(c, TrackedSeriesItem)]) == 1 |
| ) |
| |
| # Clear the filter |
| await pilot.press('l') |
| await pilot.pause() |
| inp = app.screen.query_one('#limit-input', Input) |
| inp.value = '' |
| await pilot.press('enter') |
| await pilot.pause() |
| |
| lv = app.query_one('#tracking-list', ListView) |
| assert ( |
| len([c for c in lv.children if isinstance(c, TrackedSeriesItem)]) == 3 |
| ) |
| |
| @pytest.mark.asyncio |
| async def test_limit_title_shows_count(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-limit-title', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-limit-title') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| await pilot.press('l') |
| await pilot.pause() |
| from textual.widgets import Input |
| |
| inp = app.screen.query_one('#limit-input', Input) |
| inp.value = 'alpha' |
| await pilot.press('enter') |
| await pilot.pause() |
| |
| title = app.query_one('#title-left', Static) |
| assert 'alpha' in _static_text(title) |
| |
| |
| class TestTrackingLimitPrefixes: |
| """Tests for s: and t: prefix filters in the limit dialog.""" |
| |
| @pytest.mark.asyncio |
| async def test_limit_by_status(self, tmp_path: pathlib.Path) -> None: |
| """s:snoozed should show only snoozed series.""" |
| _seed_db( |
| 'test-limit-status', |
| [ |
| { |
| 'change_id': 'ls-new', |
| 'subject': '[PATCH] new one', |
| 'message_id': 'lsn@ex.com', |
| }, |
| { |
| 'change_id': 'ls-snoozed', |
| 'subject': '[PATCH] snoozed one', |
| 'status': 'snoozed', |
| 'message_id': 'lss@ex.com', |
| }, |
| ], |
| ) |
| |
| app = TrackingApp('test-limit-status') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('l') |
| await pilot.pause() |
| from textual.widgets import Input |
| |
| inp = app.screen.query_one('#limit-input', Input) |
| inp.value = 's:snoozed' |
| await pilot.press('enter') |
| await pilot.pause() |
| |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| assert len(items) == 1 |
| assert items[0].series['status'] == 'snoozed' |
| |
| def test_matches_limit_status_substring(self) -> None: |
| """s:re should match both reviewing and replied.""" |
| m = TrackingApp._matches_limit |
| assert m({'status': 'reviewing'}, 's:re') |
| assert m({'status': 'replied'}, 's:re') |
| assert not m({'status': 'new'}, 's:re') |
| assert not m({'status': 'snoozed'}, 's:re') |
| |
| def test_matches_limit_target_branch(self) -> None: |
| """t:next should match series with target_branch containing 'next'.""" |
| m = TrackingApp._matches_limit |
| assert m({'target_branch': 'net-next'}, 't:next') |
| assert m({'target_branch': 'bpf-next'}, 't:next') |
| assert not m({'target_branch': 'bpf'}, 't:next') |
| assert not m({'target_branch': None}, 't:next') |
| assert not m({}, 't:next') |
| |
| def test_matches_limit_combined(self) -> None: |
| """s:new bpf should match new series with 'bpf' in subject.""" |
| m = TrackingApp._matches_limit |
| series_new_bpf = {'status': 'new', 'subject': '[PATCH bpf] fix verifier'} |
| series_new_net = {'status': 'new', 'subject': '[PATCH net] fix routing'} |
| series_snoozed_bpf = {'status': 'snoozed', 'subject': '[PATCH bpf] old'} |
| assert m(series_new_bpf, 's:new bpf') |
| assert not m(series_new_net, 's:new bpf') |
| assert not m(series_snoozed_bpf, 's:new bpf') |
| |
| |
| class TestTrackingStatusGroups: |
| """Tests for status grouping and display.""" |
| |
| @pytest.mark.asyncio |
| async def test_actionable_before_non_actionable( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| """Actionable series (new) should appear before non-actionable (snoozed). |
| |
| We use only statuses that don't require a real review branch |
| (new, snoozed) to avoid the background rescan worker marking |
| them as 'gone'. |
| """ |
| mixed_series = [ |
| { |
| 'change_id': 'snoozed-1', |
| 'subject': '[PATCH] snoozed series', |
| 'status': 'snoozed', |
| 'sent_at': '2026-03-10T11:00:00+00:00', |
| 'message_id': 'snoozed@ex.com', |
| }, |
| { |
| 'change_id': 'new-2', |
| 'subject': '[PATCH] new series B', |
| 'sent_at': '2026-03-10T09:00:00+00:00', |
| 'message_id': 'new2@ex.com', |
| }, |
| { |
| 'change_id': 'new-1', |
| 'subject': '[PATCH] new series A', |
| 'sent_at': '2026-03-10T10:00:00+00:00', |
| 'message_id': 'new1@ex.com', |
| }, |
| ] |
| _seed_db('test-groups', mixed_series) |
| |
| app = TrackingApp('test-groups') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| statuses = [i.series['status'] for i in items] |
| # actionable (new) before non-actionable (snoozed); within new, by added_at desc |
| assert statuses == ['new', 'new', 'snoozed'] |
| assert 'new series A' in items[0].series['subject'] |
| assert 'new series B' in items[1].series['subject'] |
| |
| @pytest.mark.asyncio |
| async def test_archived_not_shown(self, tmp_path: pathlib.Path) -> None: |
| """Archived series should be excluded from the list.""" |
| series_with_archived = [ |
| { |
| 'change_id': 'visible-1', |
| 'subject': '[PATCH] visible', |
| 'message_id': 'vis@ex.com', |
| }, |
| { |
| 'change_id': 'archived-1', |
| 'subject': '[PATCH] archived', |
| 'status': 'archived', |
| 'message_id': 'arch@ex.com', |
| }, |
| ] |
| _seed_db('test-archived', series_with_archived) |
| |
| app = TrackingApp('test-archived') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| assert len(items) == 1 |
| assert items[0].series['change_id'] == 'visible-1' |
| |
| |
| class TestTrackingFocusChangeId: |
| """Tests for the focus_change_id startup parameter.""" |
| |
| @pytest.mark.asyncio |
| async def test_focus_on_specific_series(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-focus', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-focus', focus_change_id='test-change-charlie') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| assert lv.index == 0 # charlie is 1st (last inserted, added_at desc) |
| |
| |
| class TestTrackingQuit: |
| """Tests for quitting the app.""" |
| |
| @pytest.mark.asyncio |
| async def test_q_exits(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-quit', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-quit') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('q') |
| await pilot.pause() |
| # App should have exited (return value is None) |
| assert app.return_value is None |
| |
| |
| # --------------------------------------------------------------------------- |
| # Tests with real git repos (review branches) |
| # --------------------------------------------------------------------------- |
| |
| |
| class TestTrackingWithReviewBranch: |
| """Tests that use the gitdir fixture for real review branches.""" |
| |
| @pytest.mark.asyncio |
| async def test_reviewing_status_with_branch(self, gitdir: str) -> None: |
| """Series with a real review branch should appear as 'reviewing'.""" |
| identifier = 'test-reviewing' |
| change_id = 'test-review-branch-1' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] series with review branch', |
| 'status': 'reviewing', |
| 'message_id': 'review-branch-1@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| assert len(items) == 1 |
| assert items[0].series['status'] == 'reviewing' |
| |
| @pytest.mark.asyncio |
| async def test_review_exits_app_with_branch_name(self, gitdir: str) -> None: |
| """Pressing 'r' on a reviewing series should exit with branch name.""" |
| identifier = 'test-review-exit' |
| change_id = 'test-exit-branch' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] exit test', |
| 'status': 'reviewing', |
| 'message_id': 'exit@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('r') |
| await pilot.pause() |
| # App should exit with the branch name |
| assert app.return_value == f'b4/review/{change_id}' |
| |
| @pytest.mark.asyncio |
| async def test_enter_on_reviewing_exits_to_review(self, gitdir: str) -> None: |
| """Enter on a 'reviewing' series should go directly to review mode.""" |
| identifier = 'test-enter-review' |
| change_id = 'test-enter-branch' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] enter test', |
| 'status': 'reviewing', |
| 'message_id': 'enter@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('enter') |
| await pilot.pause() |
| assert app.return_value == f'b4/review/{change_id}' |
| |
| @pytest.mark.asyncio |
| async def test_waiting_to_reviewing_on_review(self, gitdir: str) -> None: |
| """Pressing 'r' on a waiting series should change it to reviewing.""" |
| identifier = 'test-wait-review' |
| change_id = 'test-waiting-branch' |
| _create_review_branch( |
| gitdir, change_id, identifier=identifier, status='waiting' |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] waiting test', |
| 'status': 'waiting', |
| 'message_id': 'waiting@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # The action menu should appear for 'waiting' via Enter |
| await pilot.press('r') |
| await pilot.pause() |
| # App exits to review mode |
| assert app.return_value == f'b4/review/{change_id}' |
| |
| # Verify status was updated in DB |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT status FROM series WHERE change_id = ?', (change_id,) |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == 'reviewing' |
| |
| @pytest.mark.asyncio |
| async def test_messages_marked_seen_on_review(self, gitdir: str) -> None: |
| """Entering review should mark all messages as seen.""" |
| identifier = 'test-seen' |
| change_id = 'test-seen-branch' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] seen test', |
| 'status': 'reviewing', |
| 'message_id': 'seen@ex.com', |
| 'message_count': 10, |
| 'seen_message_count': 3, |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('r') |
| await pilot.pause() |
| |
| # Verify message counts are equal in DB |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT message_count, seen_message_count FROM series WHERE change_id = ?', |
| (change_id,), |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == row[1] # seen should equal total |
| |
| |
| class TestTrackingActionMenu: |
| """Tests for the context-sensitive action menu.""" |
| |
| @pytest.mark.asyncio |
| async def test_action_menu_for_new_series(self, tmp_path: pathlib.Path) -> None: |
| """New series should show review/abandon/snooze actions.""" |
| _seed_db( |
| 'test-action-new', |
| [ |
| { |
| 'change_id': 'new-action-1', |
| 'subject': '[PATCH] new action test', |
| 'message_id': 'action-new@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp('test-action-new') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| |
| # Check available actions |
| lv = app.screen.query_one('#action-list', ListView) |
| from b4.review_tui._modals import ActionItem |
| |
| actions = [c.key for c in lv.children if isinstance(c, ActionItem)] |
| assert 'review' in actions |
| assert 'abandon' in actions |
| assert 'snooze' in actions |
| # These should NOT be available for 'new' |
| assert 'take' not in actions |
| assert 'rebase' not in actions |
| |
| # Cancel |
| await pilot.press('escape') |
| await pilot.pause() |
| assert not isinstance(app.screen, ActionScreen) |
| |
| @pytest.mark.asyncio |
| async def test_action_menu_for_reviewing(self, gitdir: str) -> None: |
| """Reviewing series should show take/rebase/waiting/snooze actions.""" |
| identifier = 'test-action-reviewing' |
| change_id = 'reviewing-action-1' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] reviewing action test', |
| 'status': 'reviewing', |
| 'message_id': 'action-rev@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| |
| lv = app.screen.query_one('#action-list', ListView) |
| from b4.review_tui._modals import ActionItem |
| |
| actions = [c.key for c in lv.children if isinstance(c, ActionItem)] |
| assert 'take' in actions |
| assert 'rebase' in actions |
| assert 'waiting' in actions |
| assert 'snooze' in actions |
| |
| await pilot.press('escape') |
| |
| @pytest.mark.asyncio |
| async def test_action_menu_for_snoozed(self, tmp_path: pathlib.Path) -> None: |
| """Snoozed series should show unsnooze/abandon actions.""" |
| _seed_db( |
| 'test-action-snoozed', |
| [ |
| { |
| 'change_id': 'snoozed-action-1', |
| 'subject': '[PATCH] snoozed action test', |
| 'status': 'snoozed', |
| 'message_id': 'action-snz@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp('test-action-snoozed') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| |
| lv = app.screen.query_one('#action-list', ListView) |
| from b4.review_tui._modals import ActionItem |
| |
| actions = [c.key for c in lv.children if isinstance(c, ActionItem)] |
| assert 'unsnooze' in actions |
| assert 'abandon' in actions |
| # Should NOT have take/rebase/snooze |
| assert 'take' not in actions |
| assert 'snooze' not in actions |
| |
| await pilot.press('escape') |
| |
| @pytest.mark.asyncio |
| async def test_enter_on_new_opens_action_menu(self, tmp_path: pathlib.Path) -> None: |
| """Enter on a 'new' series should open action menu (not review).""" |
| _seed_db( |
| 'test-enter-new', |
| [ |
| { |
| 'change_id': 'enter-new-1', |
| 'subject': '[PATCH] enter new test', |
| 'message_id': 'enter-new@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp('test-enter-new') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('enter') |
| await pilot.pause() |
| # Should open action menu, not exit the app |
| assert isinstance(app.screen, ActionScreen) |
| |
| await pilot.press('escape') |
| |
| |
| class TestTrackingUpgradeNewSeries: |
| """Tests for upgrading a new (not checked-out) series to a newer revision.""" |
| |
| @pytest.mark.asyncio |
| async def test_action_menu_shows_upgrade_for_new_with_newer( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| """New series with a newer revision available should offer upgrade.""" |
| identifier = 'test-upgrade-new' |
| change_id = 'upgrade-new-1' |
| conn = tracking.init_db(identifier) |
| tracking.add_series_to_db( |
| conn, |
| change_id=change_id, |
| revision=12, |
| subject='[PATCH v12] test upgrade', |
| sender_name='Test', |
| sender_email='t@ex.com', |
| sent_at='2026-01-15T10:00:00+00:00', |
| message_id='v12@ex.com', |
| num_patches=2, |
| ) |
| # Add v13 to the revisions table so has_newer is set |
| tracking.add_revision( |
| conn, change_id, 13, 'v13@ex.com', subject='[PATCH v13] test upgrade' |
| ) |
| conn.close() |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| lv = app.screen.query_one('#action-list', ListView) |
| from b4.review_tui._modals import ActionItem |
| |
| actions = [c.key for c in lv.children if isinstance(c, ActionItem)] |
| assert 'upgrade' in actions |
| assert 'review' in actions |
| await pilot.press('escape') |
| |
| @pytest.mark.asyncio |
| async def test_action_menu_no_upgrade_without_newer( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| """New series without newer revisions should not offer upgrade.""" |
| _seed_db( |
| 'test-upgrade-none', |
| [ |
| { |
| 'change_id': 'upgrade-none-1', |
| 'subject': '[PATCH] no newer test', |
| 'message_id': 'only@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp('test-upgrade-none') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| lv = app.screen.query_one('#action-list', ListView) |
| from b4.review_tui._modals import ActionItem |
| |
| actions = [c.key for c in lv.children if isinstance(c, ActionItem)] |
| assert 'upgrade' not in actions |
| await pilot.press('escape') |
| |
| @pytest.mark.asyncio |
| async def test_upgrade_switches_revision(self, tmp_path: pathlib.Path) -> None: |
| """Upgrade on a new series should update the DB to the newer revision.""" |
| identifier = 'test-upgrade-switch' |
| change_id = 'upgrade-switch-1' |
| conn = tracking.init_db(identifier) |
| tracking.add_series_to_db( |
| conn, |
| change_id=change_id, |
| revision=12, |
| subject='[PATCH v12] switch test', |
| sender_name='Test', |
| sender_email='t@ex.com', |
| sent_at='2026-01-15T10:00:00+00:00', |
| message_id='v12@ex.com', |
| num_patches=2, |
| ) |
| # Set message counts so we can verify they get reset |
| conn.execute( |
| 'UPDATE series SET message_count = 6, seen_message_count = 4' |
| ' WHERE change_id = ?', |
| (change_id,), |
| ) |
| conn.commit() |
| tracking.add_revision( |
| conn, change_id, 13, 'v13@ex.com', subject='[PATCH v13] switch test' |
| ) |
| conn.close() |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| |
| # Select 'upgrade' — it should be in the list |
| lv = app.screen.query_one('#action-list', ListView) |
| from b4.review_tui._modals import ActionItem |
| |
| for child in lv.children: |
| if isinstance(child, ActionItem) and child.key == 'upgrade': |
| lv.index = lv.children.index(child) |
| break |
| await pilot.press('enter') |
| await pilot.pause() |
| |
| # Verify the DB was updated to v13 with counts reset |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT revision, message_id, message_count,' |
| ' seen_message_count FROM series' |
| ' WHERE change_id = ?', |
| (change_id,), |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row is not None |
| assert row[0] == 13 |
| assert row[1] == 'v13@ex.com' |
| assert row[2] is None # message_count reset |
| assert row[3] is None # seen_message_count reset |
| |
| |
| class TestTrackingSnooze: |
| """Tests for the snooze workflow.""" |
| |
| @pytest.mark.asyncio |
| async def test_snooze_new_series(self, tmp_path: pathlib.Path) -> None: |
| """Snoozing a new series should update the database.""" |
| identifier = 'test-snooze' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'snooze-test-1', |
| 'subject': '[PATCH] snooze me', |
| 'message_id': 'snooze@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| # Open action menu and select snooze |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| await pilot.press('s') # shortcut for snooze |
| await pilot.pause() |
| |
| # Should now be on SnoozeScreen |
| assert isinstance(app.screen, SnoozeScreen) |
| |
| # Enter a tag snooze |
| tag_input = app.screen.query_one('#snooze-tag', Input) |
| tag_input.value = 'v6.15-rc1' |
| await pilot.press('ctrl+y') |
| await pilot.pause() |
| |
| # Should be back on main screen |
| assert not isinstance(app.screen, SnoozeScreen) |
| |
| # Verify DB was updated |
| # https://github.com/python/mypy/issues/9457: |
| # app.screen is stale-narrowed across await. |
| conn = tracking.get_db(identifier) # type: ignore[unreachable] |
| cursor = conn.execute( |
| 'SELECT status, snoozed_until FROM series WHERE change_id = ?', |
| ('snooze-test-1',), |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == 'snoozed' |
| assert row[1] == 'tag:v6.15-rc1' |
| |
| @pytest.mark.asyncio |
| async def test_snooze_cancel(self, tmp_path: pathlib.Path) -> None: |
| """Cancelling snooze should leave the series unchanged.""" |
| identifier = 'test-snooze-cancel' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'snooze-cancel-1', |
| 'subject': '[PATCH] do not snooze', |
| 'message_id': 'nosnooze@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| await pilot.press('s') |
| await pilot.pause() |
| assert isinstance(app.screen, SnoozeScreen) |
| |
| await pilot.press('escape') |
| await pilot.pause() |
| |
| # Verify status unchanged |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT status FROM series WHERE change_id = ?', ('snooze-cancel-1',) |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == 'new' |
| |
| @pytest.mark.asyncio |
| async def test_snooze_with_review_branch(self, gitdir: str) -> None: |
| """Snoozing a reviewing series should also update the tracking commit.""" |
| identifier = 'test-snooze-branch' |
| change_id = 'snooze-branch-1' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] snooze branch test', |
| 'status': 'reviewing', |
| 'message_id': 'snzbr@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| await pilot.press('s') |
| await pilot.pause() |
| assert isinstance(app.screen, SnoozeScreen) |
| |
| dur_input = app.screen.query_one('#snooze-duration', Input) |
| dur_input.value = '2w' |
| await pilot.press('ctrl+y') |
| await pilot.pause() |
| |
| # Verify DB |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT status FROM series WHERE change_id = ?', (change_id,) |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == 'snoozed' |
| |
| # Verify tracking commit was updated |
| _cover_text, trk = b4.review.load_tracking(gitdir, f'b4/review/{change_id}') |
| assert trk['series']['status'] == 'snoozed' |
| assert 'snoozed' in trk['series'] |
| assert trk['series']['snoozed']['previous_state'] == 'reviewing' |
| |
| |
| class TestTrackingAbandon: |
| """Tests for the abandon workflow.""" |
| |
| @pytest.mark.asyncio |
| async def test_abandon_new_series(self, tmp_path: pathlib.Path) -> None: |
| """Abandoning a new series should remove it from the DB.""" |
| identifier = 'test-abandon' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'keep-1', |
| 'subject': '[PATCH] keep me', |
| 'sent_at': '2026-03-10T11:00:00+00:00', |
| 'message_id': 'keep@ex.com', |
| }, |
| { |
| 'change_id': 'abandon-1', |
| 'subject': '[PATCH] abandon me', |
| 'sent_at': '2026-03-10T12:00:00+00:00', |
| 'message_id': 'abandon@ex.com', |
| }, |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # First series is 'abandon-1' (last inserted, added_at desc) |
| assert app._selected_series is not None |
| assert app._selected_series['change_id'] == 'abandon-1' |
| |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| |
| # Select 'abandon' from the menu |
| await pilot.press('A') # shortcut for abandon |
| await pilot.pause() |
| |
| # Should show confirm dialog |
| assert isinstance(app.screen, ConfirmScreen) |
| |
| await pilot.press('y') |
| await pilot.pause() |
| |
| # Verify the abandoned series is gone from DB |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute('SELECT change_id FROM series') |
| remaining = [row[0] for row in cursor.fetchall()] |
| conn.close() |
| assert 'keep-1' in remaining |
| assert 'abandon-1' not in remaining |
| |
| @pytest.mark.asyncio |
| async def test_abandon_cancel(self, tmp_path: pathlib.Path) -> None: |
| """Cancelling abandon should leave the series intact.""" |
| identifier = 'test-abandon-cancel' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'noabandon-1', |
| 'subject': '[PATCH] do not abandon', |
| 'message_id': 'noabandon@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| await pilot.press('A') |
| await pilot.pause() |
| assert isinstance(app.screen, ConfirmScreen) |
| |
| await pilot.press('escape') |
| await pilot.pause() |
| |
| # Still in DB |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT change_id FROM series WHERE change_id = ?', ('noabandon-1',) |
| ) |
| assert cursor.fetchone() is not None |
| conn.close() |
| |
| @pytest.mark.asyncio |
| async def test_abandon_with_branch_deletes_branch(self, gitdir: str) -> None: |
| """Abandoning a series with a review branch should delete the branch.""" |
| identifier = 'test-abandon-branch' |
| change_id = 'abandon-branch-1' |
| branch_name = _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] abandon with branch', |
| 'status': 'reviewing', |
| 'message_id': 'abr@ex.com', |
| } |
| ], |
| ) |
| |
| # Verify branch exists before |
| assert b4.git_branch_exists(gitdir, branch_name) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| # For reviewing, abandon is in the action list |
| assert isinstance(app.screen, ActionScreen) |
| await pilot.press('A') |
| await pilot.pause() |
| assert isinstance(app.screen, ConfirmScreen) |
| await pilot.press('y') |
| await pilot.pause() |
| |
| # Branch should be gone |
| assert not b4.git_branch_exists(gitdir, branch_name) |
| |
| # DB should be clean |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT change_id FROM series WHERE change_id = ?', (change_id,) |
| ) |
| assert cursor.fetchone() is None |
| conn.close() |
| |
| |
| class TestTrackingWaiting: |
| """Tests for the 'mark as waiting' workflow.""" |
| |
| @pytest.mark.asyncio |
| async def test_mark_as_waiting(self, gitdir: str) -> None: |
| """Marking a reviewing series as waiting should update DB and tracking.""" |
| identifier = 'test-waiting' |
| change_id = 'waiting-test-1' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] wait for v2', |
| 'status': 'reviewing', |
| 'message_id': 'wait@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| |
| await pilot.press('w') # shortcut for waiting |
| await pilot.pause() |
| |
| # Verify DB status |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT status FROM series WHERE change_id = ?', (change_id,) |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == 'waiting' |
| |
| # Verify tracking commit |
| _cover_text, trk = b4.review.load_tracking(gitdir, f'b4/review/{change_id}') |
| assert trk['series']['status'] == 'waiting' |
| |
| @pytest.mark.asyncio |
| async def test_mark_new_as_waiting(self, gitdir: str) -> None: |
| """Marking a new (unimported) series as waiting should update DB only.""" |
| identifier = 'test-new-waiting' |
| change_id = 'new-waiting-1' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] needs v2', |
| 'message_id': 'newwait@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| |
| await pilot.press('w') # shortcut for waiting |
| await pilot.pause() |
| |
| # Verify DB status changed |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT status FROM series WHERE change_id = ?', (change_id,) |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == 'waiting' |
| |
| |
| class TestTrackingDetailPanel: |
| """Tests for the detail panel shown on series highlight.""" |
| |
| @pytest.mark.asyncio |
| async def test_detail_panel_shows_on_highlight( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| _seed_db('test-detail', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-detail') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| from textual.containers import Vertical |
| |
| panel = app.query_one('#details-panel', Vertical) |
| # Panel should have non-zero height (auto-shown on first highlight) |
| assert panel.styles.height is not None |
| |
| @pytest.mark.asyncio |
| async def test_detail_panel_hides_on_escape(self, tmp_path: pathlib.Path) -> None: |
| _seed_db('test-detail-hide', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-detail-hide') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| await pilot.press('escape') |
| await pilot.pause() |
| |
| from textual.containers import Vertical |
| |
| panel = app.query_one('#details-panel', Vertical) |
| assert panel.styles.height is not None |
| assert panel.styles.height.value == 0 |
| |
| @pytest.mark.asyncio |
| async def test_detail_panel_updates_on_navigation( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| """Navigating to a different series should update the detail panel.""" |
| _seed_db('test-detail-nav', SAMPLE_SERIES) |
| |
| app = TrackingApp('test-detail-nav') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Should be showing charlie details (last inserted, added_at desc) |
| assert app._selected_series is not None |
| assert 'charlie' in app._selected_series.get('subject', '') |
| |
| await pilot.press('j') |
| await pilot.pause() |
| assert app._selected_series is not None |
| assert 'bravo' in app._selected_series.get('subject', '') |
| |
| await pilot.press('j') |
| await pilot.pause() |
| assert app._selected_series is not None |
| assert 'alpha' in app._selected_series.get('subject', '') |
| |
| |
| class TestTrackingMultipleSeries: |
| """Tests for workflows involving multiple series.""" |
| |
| @pytest.mark.asyncio |
| async def test_mixed_statuses_with_branches(self, gitdir: str) -> None: |
| """App should correctly display a mix of new and reviewing series.""" |
| identifier = 'test-mixed' |
| change_id_rev = 'mixed-reviewing-1' |
| _create_review_branch( |
| gitdir, change_id_rev, identifier=identifier, subject='Reviewing series' |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id_rev, |
| 'subject': '[PATCH] reviewing series', |
| 'status': 'reviewing', |
| 'sent_at': '2026-03-10T12:00:00+00:00', |
| 'message_id': 'rev@ex.com', |
| }, |
| { |
| 'change_id': 'mixed-new-1', |
| 'subject': '[PATCH] new series', |
| 'sent_at': '2026-03-10T11:00:00+00:00', |
| 'message_id': 'new@ex.com', |
| }, |
| { |
| 'change_id': 'mixed-snoozed-1', |
| 'subject': '[PATCH] snoozed series', |
| 'status': 'snoozed', |
| 'sent_at': '2026-03-10T10:00:00+00:00', |
| 'message_id': 'snz@ex.com', |
| }, |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| assert len(items) == 3 |
| |
| statuses = [i.series['status'] for i in items] |
| # Both reviewing and new are actionable (top), snoozed is not (bottom). |
| # Within actionable, sorted by added_at desc (new inserted after reviewing). |
| assert statuses[0] == 'new' |
| assert statuses[1] == 'reviewing' |
| assert statuses[2] == 'snoozed' |
| |
| @pytest.mark.asyncio |
| async def test_navigate_and_review_second_series(self, gitdir: str) -> None: |
| """Navigate to a non-first series and enter review mode.""" |
| identifier = 'test-nav-review' |
| change_id = 'nav-review-target' |
| _create_review_branch( |
| gitdir, change_id, identifier=identifier, subject='Target series' |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'nav-review-first', |
| 'subject': '[PATCH] first (new)', |
| 'sent_at': '2026-03-10T12:00:00+00:00', |
| 'message_id': 'first@ex.com', |
| }, |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] target (reviewing)', |
| 'status': 'reviewing', |
| 'sent_at': '2026-03-10T11:00:00+00:00', |
| 'message_id': 'target@ex.com', |
| }, |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| lv = app.query_one('#tracking-list', ListView) |
| # Active series (reviewing) comes first in the list |
| items = [c for c in lv.children if isinstance(c, TrackedSeriesItem)] |
| assert items[0].series['status'] == 'reviewing' |
| |
| # It's already highlighted at index 0, press r |
| await pilot.press('r') |
| await pilot.pause() |
| assert app.return_value == f'b4/review/{change_id}' |
| |
| |
| class TestTrackingSnoozeRemembersChoice: |
| """Tests for snooze remembering last choices within a session.""" |
| |
| @pytest.mark.asyncio |
| async def test_snooze_remembers_last_input(self, tmp_path: pathlib.Path) -> None: |
| """Second snooze should pre-populate with the first snooze's input.""" |
| identifier = 'test-snooze-memory' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'mem-1', |
| 'subject': '[PATCH] first', |
| 'message_id': 'mem1@ex.com', |
| }, |
| { |
| 'change_id': 'mem-2', |
| 'subject': '[PATCH] second', |
| 'message_id': 'mem2@ex.com', |
| }, |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| |
| # Snooze the first series with a tag |
| await pilot.press('a') |
| await pilot.pause() |
| await pilot.press('s') |
| await pilot.pause() |
| tag_input = app.screen.query_one('#snooze-tag', Input) |
| tag_input.value = 'v6.15-rc3' |
| await pilot.press('ctrl+y') |
| await pilot.pause() |
| await pilot.pause() |
| |
| # Move to the other (non-snoozed) series before snoozing it. |
| # The cursor may still be on the just-snoozed item, so press |
| # down then up to ensure we land on a non-snoozed item. |
| first_cid = ( |
| app._selected_series.get('change_id') if app._selected_series else None |
| ) |
| if app._selected_series and app._selected_series.get('status') == 'snoozed': |
| await pilot.press('down') |
| await pilot.pause() |
| # If down didn't change, try up |
| if ( |
| app._selected_series |
| and app._selected_series.get('change_id') == first_cid |
| ): |
| await pilot.press('up') |
| await pilot.pause() |
| |
| await pilot.press('a') |
| await pilot.pause() |
| assert isinstance(app.screen, ActionScreen) |
| await pilot.press('s') |
| await pilot.pause() |
| assert isinstance(app.screen, SnoozeScreen) |
| # The tag field should be pre-populated |
| tag_input = app.screen.query_one('#snooze-tag', Input) |
| assert tag_input.value == 'v6.15-rc3' |
| |
| await pilot.press('escape') |
| |
| |
| # --------------------------------------------------------------------------- |
| # Lifecycle / state-machine tests |
| # --------------------------------------------------------------------------- |
| |
| |
| def _get_db_status(identifier: str, change_id: str) -> str: |
| """Read the current status of a series from the tracking database.""" |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute('SELECT status FROM series WHERE change_id = ?', (change_id,)) |
| row = cursor.fetchone() |
| conn.close() |
| assert row is not None, f'Series {change_id} not found in DB' |
| return str(row[0]) |
| |
| |
| def _get_action_keys(app: TrackingApp) -> List[str]: |
| """Get the list of action keys from the currently-open ActionScreen.""" |
| assert isinstance(app.screen, ActionScreen) |
| lv = app.screen.query_one('#action-list', ListView) |
| return [c.key for c in lv.children if isinstance(c, ActionItem)] |
| |
| |
| class TestSeriesLifecycle: |
| """End-to-end lifecycle: new → reviewing → waiting → reviewing |
| → snoozed → unsnoozed → accepted (seeded) → archived (mocked). |
| |
| Drives every transition that the TUI can perform headlessly, and |
| seeds the DB directly for transitions requiring network or |
| external processes (take, thank). |
| """ |
| |
| @pytest.mark.asyncio |
| async def test_full_lifecycle(self, gitdir: str) -> None: |
| """Drive a single series through the reviewing → waiting → |
| reviewing → snoozed → unsnoozed → accepted → archived lifecycle. |
| |
| We start from 'reviewing' because new → reviewing requires |
| network calls (_checkout_new_series). For accepted → archived |
| we mock _archive_branch to avoid tar/file I/O. |
| """ |
| identifier = 'test-lifecycle' |
| change_id = 'lifecycle-series-1' |
| branch_name = f'b4/review/{change_id}' |
| |
| # Seed series as 'reviewing' with a real review branch |
| _create_review_branch( |
| gitdir, change_id, identifier=identifier, status='reviewing' |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] lifecycle test series', |
| 'sender_name': 'Lifecycle Author', |
| 'sender_email': 'lifecycle@example.com', |
| 'status': 'reviewing', |
| 'message_id': 'lifecycle@ex.com', |
| } |
| ], |
| ) |
| |
| # === Phase 1: reviewing → waiting === |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Verify action menu for 'reviewing' |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert 'take' in actions |
| assert 'rebase' in actions |
| assert 'waiting' in actions |
| assert 'snooze' in actions |
| assert 'abandon' in actions |
| # Select 'waiting' |
| await pilot.press('w') |
| await pilot.pause() |
| |
| assert _get_db_status(identifier, change_id) == 'waiting' |
| |
| # Verify tracking commit also updated |
| _cover_text, trk = b4.review.load_tracking(gitdir, branch_name) |
| assert trk['series']['status'] == 'waiting' |
| |
| # === Phase 2: waiting → reviewing (re-review) === |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Verify action menu for 'waiting' |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert 'review' in actions |
| assert 'abandon' in actions |
| assert 'archive' in actions |
| assert 'take' not in actions |
| assert 'snooze' not in actions |
| await pilot.press('escape') |
| await pilot.pause() |
| |
| # Press 'r' to re-review → exits app |
| await pilot.press('r') |
| await pilot.pause() |
| assert app.return_value == branch_name |
| |
| assert _get_db_status(identifier, change_id) == 'reviewing' |
| |
| # === Phase 3: reviewing → snoozed === |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| await pilot.press('s') # snooze shortcut |
| await pilot.pause() |
| assert isinstance(app.screen, SnoozeScreen) |
| |
| tag_input = app.screen.query_one('#snooze-tag', Input) |
| tag_input.value = 'v6.15-rc1' |
| await pilot.press('ctrl+y') |
| await pilot.pause() |
| |
| assert _get_db_status(identifier, change_id) == 'snoozed' |
| |
| # Verify tracking commit stores previous_state |
| _cover_text, trk = b4.review.load_tracking(gitdir, branch_name) |
| assert trk['series']['status'] == 'snoozed' |
| assert trk['series']['snoozed']['previous_state'] == 'reviewing' |
| |
| # === Phase 4: snoozed → unsnoozed (back to reviewing) === |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Verify action menu for 'snoozed' |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert 'unsnooze' in actions |
| assert 'abandon' in actions |
| assert 'archive' in actions |
| assert 'snooze' not in actions |
| assert 'take' not in actions |
| |
| # Select 'unsnooze' via shortcut |
| await pilot.press('u') |
| await pilot.pause() |
| |
| assert _get_db_status(identifier, change_id) == 'reviewing' |
| |
| # Verify tracking commit restored |
| _cover_text, trk = b4.review.load_tracking(gitdir, branch_name) |
| assert trk['series']['status'] == 'reviewing' |
| assert 'snoozed' not in trk['series'] |
| |
| # === Phase 5: reviewing → accepted (seed directly) === |
| # The real 'take' flow needs suspend + am + editor, so we seed. |
| conn = tracking.get_db(identifier) |
| conn.execute( |
| 'UPDATE series SET status = ? WHERE change_id = ?', ('accepted', change_id) |
| ) |
| conn.commit() |
| conn.close() |
| # Also update the tracking commit |
| b4.review.update_tracking_status(gitdir, branch_name, 'accepted') |
| |
| # === Phase 6: verify 'accepted' action menu === |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert 'thank' in actions |
| assert 'abandon' in actions |
| assert 'archive' in actions |
| # Should NOT have take/snooze/etc, but review is allowed |
| assert 'review' in actions |
| assert 'take' not in actions |
| assert 'snooze' not in actions |
| assert 'waiting' not in actions |
| await pilot.press('escape') |
| |
| # === Phase 7: accepted → archived (mock _archive_branch) === |
| def _mock_archive( |
| self_app: TrackingApp, |
| cid: str, |
| rev: Optional[int], |
| rbranch: str, |
| pw_series_id: Optional[int] = None, |
| notify: bool = True, |
| ) -> bool: |
| """Simplified archive: just update DB status.""" |
| aconn = tracking.get_db(self_app._identifier) |
| tracking.update_series_status(aconn, cid, 'archived', revision=rev) |
| aconn.close() |
| return True |
| |
| app = TrackingApp(identifier) |
| with patch.object(TrackingApp, '_archive_branch', _mock_archive): |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert 'archive' in actions |
| # Select 'archive' |
| await pilot.press('x') # shortcut for archive |
| await pilot.pause() |
| # Should show confirmation dialog |
| assert isinstance(app.screen, ConfirmScreen) |
| await pilot.press('y') |
| await pilot.pause() |
| |
| assert _get_db_status(identifier, change_id) == 'archived' |
| |
| @pytest.mark.asyncio |
| async def test_new_directly_to_snoozed(self, tmp_path: pathlib.Path) -> None: |
| """A new series can be snoozed without ever entering review.""" |
| identifier = 'test-lifecycle-snooze-new' |
| change_id = 'direct-snooze-1' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] snooze from new', |
| 'message_id': 'ds@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| await pilot.press('s') |
| await pilot.pause() |
| assert isinstance(app.screen, SnoozeScreen) |
| dur_input = app.screen.query_one('#snooze-duration', Input) |
| dur_input.value = '3d' |
| await pilot.press('ctrl+y') |
| await pilot.pause() |
| |
| assert _get_db_status(identifier, change_id) == 'snoozed' |
| |
| @pytest.mark.asyncio |
| async def test_thanked_to_archived(self, gitdir: str) -> None: |
| """A thanked series can only be archived.""" |
| identifier = 'test-lifecycle-thanked' |
| change_id = 'thanked-series-1' |
| _create_review_branch( |
| gitdir, change_id, identifier=identifier, status='thanked' |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] thanked ready for archive', |
| 'status': 'thanked', |
| 'message_id': 'thanked@ex.com', |
| } |
| ], |
| ) |
| |
| # Verify action menu: only 'archive' should be available |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert actions == ['archive'] |
| |
| await pilot.press('escape') |
| |
| @pytest.mark.asyncio |
| async def test_accepted_action_menu(self, gitdir: str) -> None: |
| """Accepted series should show review, thank, abandon, and archive.""" |
| identifier = 'test-lifecycle-accepted' |
| change_id = 'accepted-menu-1' |
| _create_review_branch( |
| gitdir, change_id, identifier=identifier, status='accepted' |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] accepted series menu test', |
| 'status': 'accepted', |
| 'message_id': 'acc@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert set(actions) == {'review', 'thank', 'abandon', 'archive'} |
| await pilot.press('escape') |
| |
| @pytest.mark.asyncio |
| async def test_gone_series_actions(self, tmp_path: pathlib.Path) -> None: |
| """A 'gone' series (branch deleted externally) should allow |
| review and abandon.""" |
| identifier = 'test-lifecycle-gone' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'gone-1', |
| 'subject': '[PATCH] gone series', |
| 'status': 'gone', |
| 'message_id': 'gone@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert actions == ['review', 'abandon'] |
| await pilot.press('escape') |
| |
| @pytest.mark.asyncio |
| async def test_snooze_roundtrip_preserves_previous_state(self, gitdir: str) -> None: |
| """Unsnooze should restore the previous state stored in tracking. |
| |
| We seed a series as 'snoozed' with previous_state='waiting' in |
| the tracking commit, then unsnooze via the TUI and verify it |
| restores to 'waiting' (not the default 'reviewing'). |
| """ |
| identifier = 'test-lifecycle-snooze-wait' |
| change_id = 'snooze-wait-1' |
| branch_name = f'b4/review/{change_id}' |
| # Create branch with 'snoozed' status + snoozed metadata |
| _create_review_branch( |
| gitdir, change_id, identifier=identifier, status='snoozed' |
| ) |
| # Manually inject snoozed.previous_state into tracking commit |
| cover_text, trk = b4.review.load_tracking(gitdir, branch_name) |
| trk['series']['snoozed'] = { |
| 'until': 'tag:v99', |
| 'previous_state': 'waiting', |
| } |
| b4.review.save_tracking_ref(gitdir, branch_name, cover_text, trk) |
| |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] waiting then snoozed', |
| 'status': 'snoozed', |
| 'message_id': 'sw@ex.com', |
| } |
| ], |
| ) |
| |
| # Unsnooze should restore to 'waiting' |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| await pilot.press('u') |
| await pilot.pause() |
| |
| assert _get_db_status(identifier, change_id) == 'waiting' |
| |
| # Verify tracking commit cleaned up |
| _cover_text, trk = b4.review.load_tracking(gitdir, branch_name) |
| assert trk['series']['status'] == 'waiting' |
| assert 'snoozed' not in trk['series'] |
| |
| @pytest.mark.asyncio |
| async def test_abandon_from_any_branch_state(self, gitdir: str) -> None: |
| """Abandon should work from reviewing, waiting, and snoozed states.""" |
| for status in ('reviewing', 'snoozed'): |
| identifier = f'test-lifecycle-abandon-{status}' |
| change_id = f'abandon-{status}' |
| _create_review_branch( |
| gitdir, change_id, identifier=identifier, status=status |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': f'[PATCH] abandon from {status}', |
| 'status': status, |
| 'message_id': f'ab-{status}@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('a') |
| await pilot.pause() |
| actions = _get_action_keys(app) |
| assert 'abandon' in actions, f'abandon missing for {status}' |
| await pilot.press('A') # abandon shortcut |
| await pilot.pause() |
| assert isinstance(app.screen, ConfirmScreen) |
| await pilot.press('y') |
| await pilot.pause() |
| |
| # Verify series removed from DB |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT change_id FROM series WHERE change_id = ?', (change_id,) |
| ) |
| assert cursor.fetchone() is None, ( |
| f'Series should be gone after abandon from {status}' |
| ) |
| conn.close() |
| |
| # Verify branch deleted |
| branch_name = f'b4/review/{change_id}' |
| assert not b4.git_branch_exists(gitdir, branch_name), ( |
| f'Branch should be deleted after abandon from {status}' |
| ) |
| |
| |
| @patch('b4.review.tracking.get_review_target_branches', return_value=['master']) |
| class TestTargetBranch: |
| """Tests for per-series target branch tracking.""" |
| |
| @pytest.mark.asyncio |
| async def test_set_target_branch_from_new( |
| self, _mock_branches: Any, gitdir: str |
| ) -> None: |
| """Press t on a new series, type a branch, confirm — DB is updated.""" |
| identifier = 'test-target-new' |
| change_id = 'target-new-1' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] target branch test', |
| 'message_id': 'target-new@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Press t to open target branch dialog |
| await pilot.press('t') |
| await pilot.pause() |
| assert isinstance(app.screen, TargetBranchScreen) |
| |
| # Type branch name and confirm |
| inp = app.screen.query_one('#target-branch-input', Input) |
| inp.value = 'master' |
| await pilot.pause() |
| # Use ctrl+y to confirm |
| with patch('b4.git_branch_exists', return_value=True): |
| await pilot.press('ctrl+y') |
| await pilot.pause() |
| |
| # Verify DB updated |
| conn = tracking.get_db(identifier) |
| target = tracking.get_target_branch(conn, change_id) |
| conn.close() |
| assert target == 'master' |
| |
| @pytest.mark.asyncio |
| async def test_set_target_branch_from_reviewing( |
| self, _mock_branches: Any, gitdir: str |
| ) -> None: |
| """Set target on a reviewing series — tracking commit updated too.""" |
| identifier = 'test-target-rev' |
| change_id = 'target-rev-1' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] target reviewing test', |
| 'status': 'reviewing', |
| 'message_id': 'target-rev@ex.com', |
| } |
| ], |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('t') |
| await pilot.pause() |
| assert isinstance(app.screen, TargetBranchScreen) |
| |
| inp = app.screen.query_one('#target-branch-input', Input) |
| inp.value = 'master' |
| await pilot.pause() |
| with patch('b4.git_branch_exists', return_value=True): |
| await pilot.press('ctrl+y') |
| await pilot.pause() |
| |
| # Verify DB updated |
| conn = tracking.get_db(identifier) |
| target = tracking.get_target_branch(conn, change_id) |
| conn.close() |
| assert target == 'master' |
| |
| # Verify tracking commit updated |
| review_branch = f'b4/review/{change_id}' |
| _cover, trk = b4.review.load_tracking(gitdir, review_branch) |
| assert trk['series'].get('target-branch') == 'master' |
| |
| @pytest.mark.asyncio |
| async def test_target_branch_in_details(self, gitdir: str) -> None: |
| """Verify detail panel shows Target: row when target is set.""" |
| identifier = 'test-target-detail' |
| change_id = 'target-detail-1' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] target detail test', |
| 'message_id': 'target-detail@ex.com', |
| } |
| ], |
| ) |
| # Set target in DB |
| conn = tracking.get_db(identifier) |
| tracking.update_target_branch(conn, change_id, 'sound/for-next') |
| conn.close() |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Detail panel should be visible after selecting series |
| target_widget = app.query_one('#detail-target', Static) |
| text = _static_text(target_widget) |
| assert 'sound/for-next' in text |
| |
| @pytest.mark.asyncio |
| async def test_clear_target_branch(self, _mock_branches: Any, gitdir: str) -> None: |
| """Ctrl+d in modal clears the target branch.""" |
| identifier = 'test-target-clear' |
| change_id = 'target-clear-1' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH] clear target test', |
| 'message_id': 'target-clear@ex.com', |
| } |
| ], |
| ) |
| # Set target first |
| conn = tracking.get_db(identifier) |
| tracking.update_target_branch(conn, change_id, 'old-branch') |
| conn.close() |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| await pilot.press('t') |
| await pilot.pause() |
| assert isinstance(app.screen, TargetBranchScreen) |
| # Ctrl+d to clear |
| await pilot.press('ctrl+d') |
| await pilot.pause() |
| await pilot.pause() |
| |
| # Screen should be dismissed |
| assert not isinstance(app.screen, TargetBranchScreen) |
| |
| # Verify DB cleared |
| # https://github.com/python/mypy/issues/9457: |
| # app.screen is stale-narrowed across await. |
| conn = tracking.get_db(identifier) # type: ignore[unreachable] |
| target = tracking.get_target_branch(conn, change_id) |
| conn.close() |
| assert target is None |
| |
| |
| # --------------------------------------------------------------------------- |
| # Helpers for update-revision tests |
| # --------------------------------------------------------------------------- |
| |
| |
| def _make_mock_lser( |
| revision: int = 2, expected: int = 1, complete: bool = False |
| ) -> b4.LoreSeries: |
| """Build a minimal LoreSeries usable by _on_update_* callbacks. |
| |
| Patches list contains a single MagicMock with msgid and body |
| attributes so the Phase 3 metadata extraction succeeds. |
| """ |
| from unittest.mock import MagicMock |
| |
| lser = b4.LoreSeries(revision, expected) |
| lser.complete = complete |
| lser.fromname = 'Test Author' |
| lser.fromemail = 'test@example.com' |
| mock_patch = MagicMock() |
| mock_patch.msgid = 'test-update-msgid@example.com' |
| mock_patch.body = 'patch body' |
| mock_patch.date = None |
| lser.patches = [mock_patch] |
| return lser |
| |
| |
| def _setup_update_test( |
| gitdir: str, |
| identifier: str, |
| change_id: str, |
| current_rev: int = 1, |
| target_rev: int = 2, |
| ) -> str: |
| """Seed a DB + review branch for update-revision tests. |
| |
| Returns the review branch name. |
| """ |
| branch = _create_review_branch( |
| gitdir, |
| change_id, |
| identifier=identifier, |
| revision=current_rev, |
| status='reviewing', |
| ) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': f'[PATCH v{current_rev}] update test', |
| 'revision': current_rev, |
| 'status': 'reviewing', |
| 'message_id': f'v{current_rev}@ex.com', |
| } |
| ], |
| ) |
| # Register the target revision so _do_update_revision can look it up |
| conn = tracking.get_db(identifier) |
| tracking.add_revision( |
| conn, |
| change_id, |
| target_rev, |
| f'v{target_rev}@ex.com', |
| subject=f'[PATCH v{target_rev}] update test', |
| ) |
| conn.close() |
| return branch |
| |
| |
| class TestUpdateRevisionWorkflow: |
| """Tests for the three-phase update-revision workflow. |
| |
| The refactored _do_update_revision uses a temporary upgrade branch |
| so the old review branch is never modified until the new revision |
| has been successfully applied. |
| """ |
| |
| # --- Phase 1: _do_update_revision (DB lookup + worker push) ---------- |
| |
| @pytest.mark.asyncio |
| async def test_no_msgid_shows_error(self, gitdir: str) -> None: |
| """Target revision without a message-id should notify an error.""" |
| identifier = 'test-update-nomsgid' |
| change_id = 'update-nomsgid-1' |
| _create_review_branch(gitdir, change_id, identifier=identifier) |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': change_id, |
| 'subject': '[PATCH v1] no msgid test', |
| 'status': 'reviewing', |
| 'message_id': 'v1@ex.com', |
| } |
| ], |
| ) |
| # Register v2 without a message-id |
| conn = tracking.get_db(identifier) |
| tracking.add_revision( |
| conn, change_id, 2, '', subject='[PATCH v2] no msgid test' |
| ) |
| conn.close() |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Call the method directly — worker should not be pushed |
| app._do_update_revision(change_id, 1, 2) |
| await pilot.pause() |
| # Should stay on the main screen, not a WorkerScreen |
| assert not isinstance( |
| app.screen, |
| __import__( |
| 'b4.review_tui._modals', fromlist=['WorkerScreen'] |
| ).WorkerScreen, |
| ) |
| |
| # --- Phase 2: _on_update_prepared (base selection screen) ------------ |
| |
| @pytest.mark.asyncio |
| async def test_prepared_none_is_noop(self, tmp_path: pathlib.Path) -> None: |
| """A None result (worker cancelled) should do nothing.""" |
| identifier = 'test-update-none' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'noop-1', |
| 'subject': '[PATCH] noop', |
| 'message_id': 'noop@ex.com', |
| } |
| ], |
| ) |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| app._on_update_prepared( |
| None, 'noop-1', 1, 2, 'v2@ex.com', 'subj', 'b4/review/noop-1' |
| ) |
| await pilot.pause() |
| # No BaseSelectionScreen should be pushed |
| from b4.review_tui._modals import BaseSelectionScreen |
| |
| assert not isinstance(app.screen, BaseSelectionScreen) |
| |
| @pytest.mark.asyncio |
| async def test_prepared_pushes_base_selection(self, tmp_path: pathlib.Path) -> None: |
| """Successful worker result should push BaseSelectionScreen.""" |
| identifier = 'test-update-base' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'base-1', |
| 'subject': '[PATCH] base select', |
| 'message_id': 'base@ex.com', |
| } |
| ], |
| ) |
| lser = _make_mock_lser() |
| ambytes = b'fake mbox' |
| result = (lser, ambytes, 'abc123456789', 'Guessed base: foo', 1) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| app._on_update_prepared( |
| result, |
| 'base-1', |
| 1, |
| 2, |
| 'v2@ex.com', |
| '[PATCH v2] base select', |
| 'b4/review/base-1', |
| ) |
| await pilot.pause() |
| from b4.review_tui._modals import BaseSelectionScreen |
| |
| assert isinstance(app.screen, BaseSelectionScreen) |
| |
| # --- Phase 3: _on_update_base_selected (apply + swap) ---------------- |
| |
| @pytest.mark.asyncio |
| async def test_base_selected_none_cancels(self, tmp_path: pathlib.Path) -> None: |
| """Passing None as base_sha should cancel the update.""" |
| identifier = 'test-update-cancel' |
| _seed_db( |
| identifier, |
| [ |
| { |
| 'change_id': 'cancel-1', |
| 'subject': '[PATCH] cancel', |
| 'message_id': 'cancel@ex.com', |
| } |
| ], |
| ) |
| lser = _make_mock_lser() |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| app._on_update_base_selected( |
| None, |
| lser, |
| b'mbox', |
| 1, |
| 'cancel-1', |
| 1, |
| 2, |
| 'v2@ex.com', |
| 'subj', |
| 'b4/review/cancel-1', |
| ) |
| await pilot.pause() |
| # App should still be running — not exited |
| assert app.is_running |
| |
| @pytest.mark.asyncio |
| async def test_apply_failure_preserves_old_branch(self, gitdir: str) -> None: |
| """When git-am fails the old review branch must remain intact.""" |
| identifier = 'test-update-fail' |
| change_id = 'update-fail-1' |
| review_branch = _setup_update_test(gitdir, identifier, change_id) |
| upgrade_branch = f'b4/review/_tmp-{change_id}-v2-upgrade' |
| |
| # Snapshot old branch HEAD before the attempt |
| ecode, old_head = b4.git_run_command(gitdir, ['rev-parse', review_branch]) |
| assert ecode == 0 |
| old_head = old_head.strip() |
| |
| lser = _make_mock_lser() |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| with ( |
| patch.object( |
| app, 'suspend', return_value=__import__('contextlib').nullcontext() |
| ), |
| patch.object(app, 'exit'), |
| patch('b4.review_tui._tracking_app._wait_for_enter'), |
| patch( |
| 'b4.git_fetch_am_into_repo', |
| side_effect=RuntimeError('apply failed'), |
| ), |
| ): |
| app._on_update_base_selected( |
| 'HEAD', |
| lser, |
| b'mbox', |
| 1, |
| change_id, |
| 1, |
| 2, |
| 'v2@ex.com', |
| 'subj', |
| review_branch, |
| ) |
| await pilot.pause() |
| |
| # Old review branch must still exist with unchanged HEAD |
| assert b4.git_branch_exists(gitdir, review_branch) |
| ecode, cur_head = b4.git_run_command(gitdir, ['rev-parse', review_branch]) |
| assert ecode == 0 |
| assert cur_head.strip() == old_head |
| |
| # Upgrade branch must not exist |
| assert not b4.git_branch_exists(gitdir, upgrade_branch) |
| |
| # DB should still show original revision |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT revision, status FROM series WHERE change_id = ?', (change_id,) |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row[0] == 1 |
| assert row[1] == 'reviewing' |
| |
| @pytest.mark.asyncio |
| async def test_conflict_abort_preserves_old_branch(self, gitdir: str) -> None: |
| """When user aborts conflict resolution the old branch stays.""" |
| identifier = 'test-update-abort' |
| change_id = 'update-abort-1' |
| review_branch = _setup_update_test(gitdir, identifier, change_id) |
| upgrade_branch = f'b4/review/_tmp-{change_id}-v2-upgrade' |
| |
| ecode, old_head = b4.git_run_command(gitdir, ['rev-parse', review_branch]) |
| assert ecode == 0 |
| old_head = old_head.strip() |
| |
| lser = _make_mock_lser() |
| conflict = b4.AmConflictError('/tmp/fake-wt', 'conflict output') |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| with ( |
| patch.object( |
| app, 'suspend', return_value=__import__('contextlib').nullcontext() |
| ), |
| patch.object(app, 'exit'), |
| patch('b4.review_tui._tracking_app._wait_for_enter'), |
| patch('b4.git_fetch_am_into_repo', side_effect=conflict), |
| patch( |
| 'b4.review_tui._tracking_app._resolve_worktree_am_conflict', |
| return_value=False, |
| ), |
| ): |
| app._on_update_base_selected( |
| 'HEAD', |
| lser, |
| b'mbox', |
| 1, |
| change_id, |
| 1, |
| 2, |
| 'v2@ex.com', |
| 'subj', |
| review_branch, |
| ) |
| await pilot.pause() |
| |
| # Old review branch must be untouched |
| assert b4.git_branch_exists(gitdir, review_branch) |
| ecode, cur_head = b4.git_run_command(gitdir, ['rev-parse', review_branch]) |
| assert ecode == 0 |
| assert cur_head.strip() == old_head |
| |
| # Upgrade branch must not linger |
| assert not b4.git_branch_exists(gitdir, upgrade_branch) |
| |
| @pytest.mark.asyncio |
| async def test_successful_upgrade_renames_branch(self, gitdir: str) -> None: |
| """On success the upgrade branch replaces the old review branch.""" |
| identifier = 'test-update-ok' |
| change_id = 'update-ok-1' |
| review_branch = _setup_update_test(gitdir, identifier, change_id) |
| |
| lser = _make_mock_lser() |
| |
| # Pre-create the upgrade branch to simulate create_review_branch |
| ecode, base = b4.git_run_command(gitdir, ['rev-parse', 'HEAD']) |
| assert ecode == 0 |
| base = base.strip() |
| |
| def _fake_create( |
| topdir: str, |
| branch: str, |
| base_commit: str, |
| lser_arg: b4.LoreSeries, |
| linkurl: str, |
| linkmask: str, |
| num_prereqs: int = 0, |
| identifier: Optional[str] = None, |
| status: str = 'reviewing', |
| **kwargs: Any, |
| ) -> None: |
| """Simulate create_review_branch by making a real branch.""" |
| branch_suffix = branch.removeprefix('b4/review/') |
| _create_review_branch( |
| topdir, |
| branch_suffix, |
| identifier=identifier or 'test', |
| revision=2, |
| status='reviewing', |
| ) |
| |
| def _mock_archive( |
| self_app: TrackingApp, |
| cid: str, |
| rev: Optional[int], |
| rbranch: str, |
| pw_series_id: Optional[int] = None, |
| notify: bool = True, |
| ) -> bool: |
| """Delete branch + mark archived in DB.""" |
| b4.git_run_command(gitdir, ['branch', '-D', rbranch]) |
| aconn = tracking.get_db(self_app._identifier) |
| tracking.update_series_status(aconn, cid, 'archived', revision=rev) |
| aconn.close() |
| return True |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| with ( |
| patch.object( |
| app, 'suspend', return_value=__import__('contextlib').nullcontext() |
| ), |
| patch('b4.review_tui._tracking_app._wait_for_enter'), |
| patch('b4.git_fetch_am_into_repo'), |
| patch('b4.review.create_review_branch', side_effect=_fake_create), |
| patch('b4.review.get_review_branch_patch_ids', return_value=[]), |
| patch( |
| 'b4.review.load_tracking', |
| return_value=('', {'series': {}, 'patches': []}), |
| ), |
| patch('b4.review.reanchor_patch_comments'), |
| patch('b4.review.save_tracking_ref'), |
| patch.object(TrackingApp, '_archive_branch', _mock_archive), |
| ): |
| app._on_update_base_selected( |
| base, |
| lser, |
| b'mbox', |
| 1, |
| change_id, |
| 1, |
| 2, |
| 'v2@ex.com', |
| '[PATCH v2] update test', |
| review_branch, |
| ) |
| await pilot.pause() |
| |
| # Upgrade branch should be gone (was renamed) |
| assert not b4.git_branch_exists( |
| gitdir, f'b4/review/_tmp-{change_id}-v2-upgrade' |
| ) |
| # Upgrade branch should have been renamed to review branch |
| assert b4.git_branch_exists(gitdir, review_branch) |
| |
| # DB should show v2 as reviewing |
| conn = tracking.get_db(identifier) |
| cursor = conn.execute( |
| 'SELECT revision, status FROM series' |
| ' WHERE change_id = ? AND revision = 2', |
| (change_id,), |
| ) |
| row = cursor.fetchone() |
| conn.close() |
| assert row is not None |
| assert row[1] == 'reviewing' |
| |
| # Should return to tracking list, not exit to review |
| assert app.is_running |
| |
| @pytest.mark.asyncio |
| async def test_archive_failure_leaves_both_branches(self, gitdir: str) -> None: |
| """If archiving fails, both branches are left for manual recovery.""" |
| identifier = 'test-update-archfail' |
| change_id = 'update-archfail-1' |
| review_branch = _setup_update_test(gitdir, identifier, change_id) |
| upgrade_branch = f'b4/review/_tmp-{change_id}-v2-upgrade' |
| |
| lser = _make_mock_lser() |
| |
| def _fake_create(topdir: str, branch: str, *args: Any, **kwargs: Any) -> None: |
| branch_suffix = branch.removeprefix('b4/review/') |
| _create_review_branch( |
| topdir, |
| branch_suffix, |
| identifier=identifier, |
| revision=2, |
| status='reviewing', |
| ) |
| |
| app = TrackingApp(identifier) |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| with ( |
| patch.object( |
| app, 'suspend', return_value=__import__('contextlib').nullcontext() |
| ), |
| patch.object(app, 'exit'), |
| patch('b4.review_tui._tracking_app._wait_for_enter'), |
| patch('b4.git_fetch_am_into_repo'), |
| patch('b4.review.create_review_branch', side_effect=_fake_create), |
| patch('b4.review.get_review_branch_patch_ids', return_value=[]), |
| patch( |
| 'b4.review.load_tracking', |
| return_value=('', {'series': {}, 'patches': []}), |
| ), |
| patch('b4.review.reanchor_patch_comments'), |
| patch('b4.review.save_tracking_ref'), |
| patch.object(TrackingApp, '_archive_branch', return_value=False), |
| ): |
| app._on_update_base_selected( |
| 'HEAD', |
| lser, |
| b'mbox', |
| 1, |
| change_id, |
| 1, |
| 2, |
| 'v2@ex.com', |
| 'subj', |
| review_branch, |
| ) |
| await pilot.pause() |
| |
| # Both branches should exist — user can recover manually |
| assert b4.git_branch_exists(gitdir, review_branch) |
| assert b4.git_branch_exists(gitdir, upgrade_branch) |
| |
| |
| class TestLoadSeriesCaching: |
| """Tests for _load_series batching and caching.""" |
| |
| @pytest.mark.asyncio |
| async def test_caches_populated_on_first_load(self, tmp_path: pathlib.Path) -> None: |
| """Caches should be populated after the initial _load_series call.""" |
| _seed_db('cache-pop', SAMPLE_SERIES) |
| |
| app = TrackingApp('cache-pop') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| assert app._cached_branch_tips is not None |
| assert app._cached_newest_revisions is not None |
| assert app._cached_revision_counts is not None |
| |
| @pytest.mark.asyncio |
| async def test_caches_survive_db_poll_no_change( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| """Caches should persist when _check_db_changed finds no change.""" |
| _seed_db('cache-nochg', SAMPLE_SERIES) |
| |
| app = TrackingApp('cache-nochg') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| tips_id = id(app._cached_branch_tips) |
| app._check_db_changed() |
| await pilot.pause() |
| # Same object — cache was not rebuilt |
| assert id(app._cached_branch_tips) == tips_id |
| |
| @pytest.mark.asyncio |
| async def test_full_invalidation_clears_all_caches( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| """_invalidate_caches() without change_id clears everything.""" |
| _seed_db('cache-full-inv', SAMPLE_SERIES) |
| |
| app = TrackingApp('cache-full-inv') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| assert app._cached_branch_tips is not None |
| app._invalidate_caches() |
| assert app._cached_branch_tips is None |
| # https://github.com/python/mypy/issues/9457: |
| # app._cached_branch_tips is stale-narrowed across a method call. |
| assert app._cached_newest_revisions is None # type: ignore[unreachable] |
| assert app._cached_revision_counts is None |
| assert app._cached_art_counts is None |
| |
| @pytest.mark.asyncio |
| async def test_selective_invalidation_keeps_other_caches( |
| self, tmp_path: pathlib.Path |
| ) -> None: |
| """_invalidate_caches(change_id) only evicts that ART entry.""" |
| _seed_db('cache-sel-inv', SAMPLE_SERIES) |
| |
| app = TrackingApp('cache-sel-inv') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Manually populate ART cache with test data |
| app._cached_art_counts = { |
| 'b4/review/test-change-alpha': (1, 2, 0), |
| 'b4/review/test-change-bravo': (0, 1, 0), |
| } |
| app._invalidate_caches('test-change-alpha') |
| # Alpha evicted, bravo still there |
| assert 'b4/review/test-change-alpha' not in app._cached_art_counts |
| assert 'b4/review/test-change-bravo' in app._cached_art_counts |
| # Other caches untouched |
| assert app._cached_branch_tips is not None |
| assert app._cached_newest_revisions is not None |
| |
| @pytest.mark.asyncio |
| async def test_revisions_stashed_in_series(self, tmp_path: pathlib.Path) -> None: |
| """_load_series should stash _revisions list in each series dict.""" |
| _seed_db('cache-revisions', SAMPLE_SERIES) |
| # Add a revision record so there's something to find |
| conn = tracking.get_db('cache-revisions') |
| tracking.add_revision(conn, 'test-change-charlie', 1, 'charlie-v1@example.com') |
| tracking.add_revision(conn, 'test-change-charlie', 2, 'charlie-v2@example.com') |
| conn.close() |
| |
| app = TrackingApp('cache-revisions') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| # Find the charlie series and check its stashed revisions |
| charlie = [ |
| s |
| for s in app._all_series |
| if s.get('change_id') == 'test-change-charlie' |
| ] |
| assert len(charlie) == 1 |
| revs = charlie[0].get('_revisions', []) |
| assert len(revs) == 2 |
| assert [r['revision'] for r in revs] == [1, 2] |
| |
| @pytest.mark.asyncio |
| async def test_snoozed_until_in_series(self, tmp_path: pathlib.Path) -> None: |
| """_load_series should include snoozed_until from the DB.""" |
| series = [ |
| { |
| 'change_id': 'test-snooze-detail', |
| 'subject': '[PATCH] snooze test', |
| 'sender_name': 'Tester', |
| 'status': 'snoozed', |
| } |
| ] |
| _seed_db('cache-snooze', series) |
| conn = tracking.get_db('cache-snooze') |
| tracking.snooze_series( |
| conn, 'test-snooze-detail', '2026-06-01T00:00:00', revision=1 |
| ) |
| conn.close() |
| |
| app = TrackingApp('cache-snooze') |
| async with app.run_test(size=(120, 30)) as pilot: |
| await pilot.pause() |
| assert len(app._all_series) == 1 |
| assert app._all_series[0].get('snoozed_until') == '2026-06-01T00:00:00' |