blob: 290c99179278289f8e242514368f6476f388095c [file] [edit]
import argparse
import datetime
import io
import os
import re
from email.message import EmailMessage
from typing import Any, Dict
from unittest import mock
import pytest
import b4
import b4.review
from b4.review import tracking as review_tracking
from b4.review_tui._modals import SnoozeScreen
from b4.review_tui._tracking_app import _format_attestation, _format_snooze_until
class TestGetReviewDataDir:
"""Tests for get_review_data_dir()."""
def test_creates_directory(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify the review data directory is created."""
reviewdir = review_tracking.get_review_data_dir()
assert os.path.isdir(reviewdir)
assert reviewdir.endswith('b4/review')
class TestDbOperations:
"""Tests for database operations."""
def test_init_db_creates_schema(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify init_db creates the expected schema."""
conn = review_tracking.init_db('test-init')
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
)
tables = [row[0] for row in cursor.fetchall()]
assert 'schema_version' in tables
assert 'series' in tables
conn.close()
def test_init_db_sets_version(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify schema version is set."""
conn = review_tracking.init_db('test-version')
cursor = conn.execute('SELECT version FROM schema_version')
version = cursor.fetchone()[0]
assert version == review_tracking.SCHEMA_VERSION
conn.close()
def test_db_exists(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify db_exists returns correct values."""
assert not review_tracking.db_exists('nonexistent')
review_tracking.init_db('exists-test').close()
assert review_tracking.db_exists('exists-test')
def test_get_db_raises_for_missing(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify get_db raises FileNotFoundError for missing database."""
with pytest.raises(FileNotFoundError):
review_tracking.get_db('does-not-exist')
def test_add_series_to_db(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify series can be added to the database."""
conn = review_tracking.init_db('add-series-test')
track_id = review_tracking.add_series_to_db(
conn,
change_id='test-change-id',
revision=1,
subject='Test series subject',
sender_name='Test Author',
sender_email='author@example.com',
sent_at='2024-01-15T10:00:00+00:00',
message_id='test-msgid@example.com',
num_patches=3,
)
assert track_id == 1
cursor = conn.execute(
'SELECT track_id, change_id, subject FROM series WHERE change_id = ?',
('test-change-id',),
)
row = cursor.fetchone()
assert row is not None
assert row[0] == track_id
assert row[2] == 'Test series subject'
conn.close()
def test_add_series_with_pw_series_id(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify series can be added with patchwork series ID."""
conn = review_tracking.init_db('pw-series-test')
track_id = review_tracking.add_series_to_db(
conn,
change_id='test-change-id',
revision=1,
subject='Test subject',
sender_name='Test Author',
sender_email='author@example.com',
sent_at='2024-01-15T10:00:00+00:00',
message_id='test-msgid@example.com',
num_patches=3,
pw_series_id=12345,
)
cursor = conn.execute(
'SELECT pw_series_id FROM series WHERE track_id = ?', (track_id,)
)
row = cursor.fetchone()
assert row[0] == 12345
conn.close()
def test_add_series_multiple_revisions(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify multiple revisions can be tracked for the same change-id."""
conn = review_tracking.init_db('multi-rev-test')
# Add v1
track_id_v1 = review_tracking.add_series_to_db(
conn,
'change-123',
1,
'Subject v1',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid-v1@example.com',
3,
)
# Add v2
track_id_v2 = review_tracking.add_series_to_db(
conn,
'change-123',
2,
'Subject v2',
'Author',
'a@example.com',
'2024-01-16T10:00:00+00:00',
'msgid-v2@example.com',
4,
)
# Different track_ids
assert track_id_v1 != track_id_v2
cursor = conn.execute(
'SELECT track_id, revision, num_patches FROM series WHERE change_id = ? ORDER BY revision',
('change-123',),
)
rows = cursor.fetchall()
assert len(rows) == 2
assert rows[0] == (track_id_v1, 1, 3)
assert rows[1] == (track_id_v2, 2, 4)
conn.close()
def test_add_series_upsert(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify adding same (change_id, revision) updates the record."""
conn = review_tracking.init_db('upsert-test')
track_id_1 = review_tracking.add_series_to_db(
conn,
'change-456',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid-old@example.com',
3,
)
track_id_2 = review_tracking.add_series_to_db(
conn,
'change-456',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid-new@example.com',
5,
)
# Same track_id after upsert
assert track_id_1 == track_id_2
cursor = conn.execute(
'SELECT track_id, message_id, num_patches FROM series WHERE change_id = ? AND revision = ?',
('change-456', 1),
)
row = cursor.fetchone()
assert row == (track_id_1, 'msgid-new@example.com', 5)
conn.close()
def test_get_tracked_pw_series_ids(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify get_tracked_pw_series_ids returns correct IDs."""
conn = review_tracking.init_db('pw-ids-test')
# Add series with pw_series_id
review_tracking.add_series_to_db(
conn,
'change-1',
1,
'Subject 1',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
pw_series_id=100,
)
review_tracking.add_series_to_db(
conn,
'change-2',
1,
'Subject 2',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
pw_series_id=200,
)
# Add series without pw_series_id
review_tracking.add_series_to_db(
conn,
'change-3',
1,
'Subject 3',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
)
conn.close()
ids = review_tracking.get_tracked_pw_series_ids('pw-ids-test')
assert ids == {100, 200}
def test_get_tracked_pw_series_ids_nonexistent_db(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify get_tracked_pw_series_ids returns empty set for missing db."""
ids = review_tracking.get_tracked_pw_series_ids('nonexistent-project')
assert ids == set()
def test_is_pw_series_tracked(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify is_pw_series_tracked works correctly."""
conn = review_tracking.init_db('is-tracked-test')
review_tracking.add_series_to_db(
conn,
'change-1',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
pw_series_id=12345,
)
conn.close()
assert review_tracking.is_pw_series_tracked('is-tracked-test', 12345) is True
assert review_tracking.is_pw_series_tracked('is-tracked-test', 99999) is False
def test_is_pw_series_tracked_nonexistent_db(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify is_pw_series_tracked returns False for missing db."""
assert review_tracking.is_pw_series_tracked('nonexistent', 12345) is False
def test_get_all_tracked_series(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify get_all_tracked_series returns all series with correct fields."""
conn = review_tracking.init_db('all-series-test')
review_tracking.add_series_to_db(
conn,
'change-1',
1,
'First series',
'Author One',
'one@example.com',
'2024-01-15T10:00:00+00:00',
'msgid-1@example.com',
3,
)
review_tracking.add_series_to_db(
conn,
'change-2',
2,
'Second series',
'Author Two',
'two@example.com',
'2024-01-16T10:00:00+00:00',
'msgid-2@example.com',
5,
)
conn.close()
result = review_tracking.get_all_tracked_series('all-series-test')
assert len(result) == 2
# Results are ordered by added_at DESC, so the second one is first
assert result[0]['subject'] == 'Second series'
assert result[0]['revision'] == 2
assert result[0]['sender_name'] == 'Author Two'
assert result[0]['status'] == 'new'
assert result[1]['subject'] == 'First series'
assert result[1]['revision'] == 1
def test_get_all_tracked_series_nonexistent_db(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify get_all_tracked_series returns empty list for missing db."""
result = review_tracking.get_all_tracked_series('nonexistent-project')
assert result == []
class TestRepoMetadata:
"""Tests for repository metadata operations."""
def test_save_and_get_repo_metadata(self, gitdir: str) -> None:
"""Verify metadata can be saved and retrieved."""
git_dir = os.path.join(gitdir, '.git')
review_tracking.save_repo_metadata(git_dir, 'test-project')
metadata_path = review_tracking.get_repo_metadata_path(git_dir)
assert os.path.exists(metadata_path)
identifier = review_tracking.get_repo_identifier(gitdir)
assert identifier == 'test-project'
def test_get_repo_identifier_returns_none_for_missing(self, gitdir: str) -> None:
"""Verify get_repo_identifier returns None when no metadata exists."""
identifier = review_tracking.get_repo_identifier(gitdir)
assert identifier is None
def test_get_repo_identifier_resolves_from_worktree(self, gitdir: str) -> None:
"""Verify get_repo_identifier resolves identifier from worktree."""
# Enroll the main repo
git_dir = os.path.join(gitdir, '.git')
review_tracking.save_repo_metadata(git_dir, 'worktree-project')
# Create a real worktree
worktree_dir = os.path.join(str(os.path.dirname(gitdir)), 'worktree')
out, _logstr = b4.git_run_command(
gitdir, ['worktree', 'add', worktree_dir, '-b', 'wt-branch']
)
assert out == 0
identifier = review_tracking.get_repo_identifier(worktree_dir)
assert identifier == 'worktree-project'
class TestResolveIdentifier:
"""Tests for resolve_identifier()."""
def test_uses_cmdargs_identifier(self, gitdir: str) -> None:
"""Verify command line identifier takes precedence."""
# Set up repo metadata
git_dir = os.path.join(gitdir, '.git')
review_tracking.save_repo_metadata(git_dir, 'repo-identifier')
cmdargs = argparse.Namespace(identifier='cmdline-identifier')
result = review_tracking.resolve_identifier(cmdargs, gitdir)
assert result == 'cmdline-identifier'
def test_falls_back_to_repo_metadata(self, gitdir: str) -> None:
"""Verify falls back to repo metadata when no cmdargs identifier."""
git_dir = os.path.join(gitdir, '.git')
review_tracking.save_repo_metadata(git_dir, 'repo-identifier')
cmdargs = argparse.Namespace(identifier=None)
result = review_tracking.resolve_identifier(cmdargs, gitdir)
assert result == 'repo-identifier'
def test_returns_none_when_no_identifier(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify returns None when no identifier available."""
cmdargs = argparse.Namespace(identifier=None)
# Pass a non-git directory
result = review_tracking.resolve_identifier(cmdargs, str(tmp_path))
assert result is None
class TestCmdEnroll:
"""Tests for cmd_enroll()."""
def test_enroll_creates_database(self, gitdir: str) -> None:
"""Verify enroll creates the database."""
cmdargs = argparse.Namespace(repo_path=gitdir, identifier='enroll-test')
review_tracking.cmd_enroll(cmdargs)
assert review_tracking.db_exists('enroll-test')
def test_enroll_creates_metadata_file(self, gitdir: str) -> None:
"""Verify enroll creates metadata file in .git directory."""
cmdargs = argparse.Namespace(repo_path=gitdir, identifier='metadata-test')
review_tracking.cmd_enroll(cmdargs)
metadata_path = os.path.join(gitdir, '.git', 'b4-review', 'metadata.json')
assert os.path.exists(metadata_path)
def test_enroll_uses_dirname_as_default_identifier(self, gitdir: str) -> None:
"""Verify enroll uses directory name as default identifier."""
cmdargs = argparse.Namespace(repo_path=gitdir, identifier=None)
review_tracking.cmd_enroll(cmdargs)
dirname = os.path.basename(gitdir)
assert review_tracking.db_exists(dirname)
def test_enroll_uses_current_directory_when_no_path(self, gitdir: str) -> None:
"""Verify enroll uses current directory when no path specified."""
# gitdir fixture already changes cwd to the test repo
cmdargs = argparse.Namespace(repo_path=None, identifier='current-dir-test')
review_tracking.cmd_enroll(cmdargs)
assert review_tracking.db_exists('current-dir-test')
metadata_path = os.path.join(gitdir, '.git', 'b4-review', 'metadata.json')
assert os.path.exists(metadata_path)
def test_enroll_fails_when_no_path_and_not_in_repo(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify enroll fails when no path and not in a git repo."""
# Change to a non-git directory
non_git_dir = os.path.join(str(tmp_path), 'not-a-repo')
os.makedirs(non_git_dir)
oldcwd = os.getcwd()
os.chdir(non_git_dir)
try:
cmdargs = argparse.Namespace(repo_path=None, identifier='test')
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_enroll(cmdargs)
assert exc_info.value.code == 1
finally:
os.chdir(oldcwd)
def test_enroll_fails_for_nonexistent_path(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify enroll fails for non-existent paths."""
cmdargs = argparse.Namespace(repo_path='/nonexistent/path', identifier='test')
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_enroll(cmdargs)
assert exc_info.value.code == 1
def test_enroll_fails_for_non_git_directory(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify enroll fails for non-git directories."""
non_git_dir = os.path.join(str(tmp_path), 'not-a-repo')
os.makedirs(non_git_dir)
cmdargs = argparse.Namespace(repo_path=non_git_dir, identifier='test')
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_enroll(cmdargs)
assert exc_info.value.code == 1
def test_enroll_fails_when_repo_already_enrolled(self, gitdir: str) -> None:
"""Verify enroll fails when repository already has metadata."""
# First enrollment
cmdargs = argparse.Namespace(repo_path=gitdir, identifier='first-id')
review_tracking.cmd_enroll(cmdargs)
# Second enrollment of same repo should fail
cmdargs2 = argparse.Namespace(repo_path=gitdir, identifier='second-id')
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_enroll(cmdargs2)
assert exc_info.value.code == 1
@mock.patch('builtins.input', return_value='y')
def test_enroll_reuses_existing_db_when_confirmed(
self, mock_input: mock.Mock, gitdir: str, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify enroll can reuse existing database for different repo."""
# Create database via first enrollment
cmdargs = argparse.Namespace(repo_path=gitdir, identifier='shared-db')
review_tracking.cmd_enroll(cmdargs)
# Create a second git repo
second_repo = os.path.join(str(tmp_path), 'second-repo')
b4.git_run_command(None, ['init', second_repo])
# Enroll second repo with same identifier - user confirms
cmdargs2 = argparse.Namespace(repo_path=second_repo, identifier='shared-db')
review_tracking.cmd_enroll(cmdargs2)
# Metadata file should exist in second repo's .git
metadata_path = os.path.join(second_repo, '.git', 'b4-review', 'metadata.json')
assert os.path.exists(metadata_path)
mock_input.assert_called_once()
@mock.patch('builtins.input', return_value='n')
def test_enroll_aborts_when_existing_db_declined(
self, mock_input: mock.Mock, gitdir: str, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify enroll aborts when user declines to use existing database."""
# Create database via first enrollment
cmdargs = argparse.Namespace(repo_path=gitdir, identifier='declined-db')
review_tracking.cmd_enroll(cmdargs)
# Create a second git repo
second_repo = os.path.join(str(tmp_path), 'second-repo')
b4.git_run_command(None, ['init', second_repo])
# Enroll second repo with same identifier - user declines
cmdargs2 = argparse.Namespace(repo_path=second_repo, identifier='declined-db')
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_enroll(cmdargs2)
# Exit code 0 for user-initiated cancellation
assert exc_info.value.code == 0
# Metadata file should NOT exist in second repo
metadata_path = os.path.join(second_repo, '.git', 'b4-review', 'metadata.json')
assert not os.path.exists(metadata_path)
def test_enroll_from_worktree_writes_metadata_to_common_dir(
self, gitdir: str
) -> None:
"""Verify enroll from a worktree writes metadata to the shared .git."""
# Create a real worktree
worktree_dir = os.path.join(str(os.path.dirname(gitdir)), 'worktree')
out, _logstr = b4.git_run_command(
gitdir, ['worktree', 'add', worktree_dir, '-b', 'wt-branch']
)
assert out == 0
cmdargs = argparse.Namespace(repo_path=worktree_dir, identifier='worktree-test')
review_tracking.cmd_enroll(cmdargs)
# Database should be created
assert review_tracking.db_exists('worktree-test')
# Metadata should exist in the main repo's .git directory
metadata_path = os.path.join(gitdir, '.git', 'b4-review', 'metadata.json')
assert os.path.exists(metadata_path)
def test_enroll_from_worktree_already_enrolled(self, gitdir: str) -> None:
"""Verify enrolling from worktree exits 0 when repo already enrolled."""
# Enroll the main repo first
cmdargs = argparse.Namespace(repo_path=gitdir, identifier='main-id')
review_tracking.cmd_enroll(cmdargs)
# Create a real worktree
worktree_dir = os.path.join(str(os.path.dirname(gitdir)), 'worktree')
out, _logstr = b4.git_run_command(
gitdir, ['worktree', 'add', worktree_dir, '-b', 'wt-branch']
)
assert out == 0
# Enrolling from worktree with same identifier should exit 0
cmdargs2 = argparse.Namespace(repo_path=worktree_dir, identifier='main-id')
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_enroll(cmdargs2)
assert exc_info.value.code == 0
def test_enroll_from_worktree_conflicting_identifier(self, gitdir: str) -> None:
"""Verify enrolling from worktree fails with a different identifier."""
# Enroll the main repo first
cmdargs = argparse.Namespace(repo_path=gitdir, identifier='main-id')
review_tracking.cmd_enroll(cmdargs)
# Create a real worktree
worktree_dir = os.path.join(str(os.path.dirname(gitdir)), 'worktree')
out, _logstr = b4.git_run_command(
gitdir, ['worktree', 'add', worktree_dir, '-b', 'wt-branch']
)
assert out == 0
# Enrolling from worktree with different identifier should fail
cmdargs2 = argparse.Namespace(repo_path=worktree_dir, identifier='different-id')
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_enroll(cmdargs2)
assert exc_info.value.code == 1
class TestCmdTrack:
"""Tests for cmd_track()."""
def _make_mock_lore_message(
self,
msgid: str = 'test-msgid@example.com',
fromname: str = 'Test Author',
fromemail: str = 'author@example.com',
subject: str = 'Test patch',
date: datetime.datetime = datetime.datetime(
2024, 1, 15, 10, 0, 0, tzinfo=datetime.timezone.utc
),
) -> mock.Mock:
"""Create a mock LoreMessage."""
lmsg = mock.Mock()
lmsg.msgid = msgid
lmsg.fromname = fromname
lmsg.fromemail = fromemail
lmsg.subject = subject
lmsg.date = date
lmsg.lsubject.get_slug.return_value = 'test-series'
return lmsg
def _make_mock_lore_series(
self,
revision: int = 1,
expected: int = 3,
change_id: str | None = 'test-change-id',
has_cover: bool = True,
cover_msgid: str = 'cover@example.com',
first_patch_msgid: str = 'patch1@example.com',
fromname: str = 'Test Author',
fromemail: str = 'author@example.com',
subject: str = 'Test series',
) -> mock.Mock:
"""Create a mock LoreSeries."""
lser = mock.Mock()
lser.revision = revision
lser.expected = expected
lser.change_id = change_id
lser.has_cover = has_cover
lser.fromname = fromname
lser.fromemail = fromemail
lser.subject = subject
lser.fingerprint = 'mock-fingerprint-0123456789ab'
# Set up patches list
cover = self._make_mock_lore_message(cover_msgid) if has_cover else None
patch1 = self._make_mock_lore_message(first_patch_msgid)
lser.patches = [cover, patch1, None, None] # Cover + 3 patches (2 missing)
return lser
@mock.patch('b4.retrieve_messages')
@mock.patch('b4.LoreMailbox')
def test_track_with_change_id(
self, mock_mailbox_class: mock.Mock, mock_retrieve: mock.Mock, gitdir: str
) -> None:
"""Verify tracking a series with a change-id."""
# Set up enrolled project
cmdargs_enroll = argparse.Namespace(repo_path=gitdir, identifier='track-test')
review_tracking.cmd_enroll(cmdargs_enroll)
# Mock the series retrieval
mock_msg = mock.Mock()
mock_retrieve.return_value = ('test-msgid', [mock_msg])
mock_lser = self._make_mock_lore_series(change_id='real-change-id')
mock_mailbox = mock.Mock()
mock_mailbox.series = {1: mock_lser}
mock_mailbox.get_series.return_value = mock_lser
mock_mailbox_class.return_value = mock_mailbox
cmdargs = argparse.Namespace(
series_id='test-msgid@example.com',
identifier='track-test',
msgid=None,
noparent=False,
wantname=None,
wantver=None,
)
review_tracking.cmd_track(cmdargs)
# Verify it was added to database
conn = review_tracking.get_db('track-test')
cursor = conn.execute('SELECT change_id, revision FROM series')
row = cursor.fetchone()
assert row['change_id'] == 'real-change-id'
assert row['revision'] == 1
conn.close()
@mock.patch('b4.retrieve_messages')
@mock.patch('b4.LoreMailbox')
def test_track_generates_change_id_without_change_id(
self, mock_mailbox_class: mock.Mock, mock_retrieve: mock.Mock, gitdir: str
) -> None:
"""Verify tracking generates a change-id when series has none."""
cmdargs_enroll = argparse.Namespace(repo_path=gitdir, identifier='noid-test')
review_tracking.cmd_enroll(cmdargs_enroll)
mock_msg = mock.Mock()
mock_retrieve.return_value = ('test-msgid', [mock_msg])
mock_lser = self._make_mock_lore_series(change_id=None)
mock_mailbox = mock.Mock()
mock_mailbox.series = {1: mock_lser}
mock_mailbox.get_series.return_value = mock_lser
mock_mailbox_class.return_value = mock_mailbox
cmdargs = argparse.Namespace(
series_id='test-msgid@example.com',
identifier='noid-test',
msgid=None,
noparent=False,
wantname=None,
wantver=None,
)
review_tracking.cmd_track(cmdargs)
conn = review_tracking.get_db('noid-test')
cursor = conn.execute('SELECT change_id FROM series')
row = cursor.fetchone()
# Format: YYYYMMDD-slug-fingerprint[:12]
change_id = row['change_id']
assert change_id.startswith('20240115-')
assert 'test-series' in change_id
conn.close()
@mock.patch('b4.retrieve_messages')
@mock.patch('b4.LoreMailbox')
def test_track_uses_first_patch_without_cover(
self, mock_mailbox_class: mock.Mock, mock_retrieve: mock.Mock, gitdir: str
) -> None:
"""Verify tracking uses first patch msgid when no cover letter."""
cmdargs_enroll = argparse.Namespace(
repo_path=gitdir, identifier='no-cover-test'
)
review_tracking.cmd_enroll(cmdargs_enroll)
mock_msg = mock.Mock()
mock_retrieve.return_value = ('test-msgid', [mock_msg])
mock_lser = self._make_mock_lore_series(
has_cover=False, first_patch_msgid='first-patch@example.com'
)
mock_mailbox = mock.Mock()
mock_mailbox.series = {1: mock_lser}
mock_mailbox.get_series.return_value = mock_lser
mock_mailbox_class.return_value = mock_mailbox
cmdargs = argparse.Namespace(
series_id='test-msgid@example.com',
identifier='no-cover-test',
msgid=None,
noparent=False,
wantname=None,
wantver=None,
)
review_tracking.cmd_track(cmdargs)
conn = review_tracking.get_db('no-cover-test')
cursor = conn.execute('SELECT message_id FROM series')
row = cursor.fetchone()
assert row['message_id'] == 'first-patch@example.com'
conn.close()
@mock.patch('b4.review.tracking.resolve_identifier', return_value=None)
def test_track_fails_without_identifier(
self, mock_resolve: mock.Mock, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify track fails when no identifier can be resolved."""
cmdargs = argparse.Namespace(
series_id='test-msgid@example.com',
identifier=None,
msgid=None,
noparent=False,
wantname=None,
wantver=None,
)
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_track(cmdargs)
assert exc_info.value.code == 1
@mock.patch('b4.retrieve_messages')
def test_track_fails_for_unenrolled_project(
self, mock_retrieve: mock.Mock, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify track fails when project is not enrolled."""
cmdargs = argparse.Namespace(
series_id='test-msgid@example.com',
identifier='not-enrolled',
msgid=None,
noparent=False,
wantname=None,
wantver=None,
)
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_track(cmdargs)
assert exc_info.value.code == 1
@mock.patch('b4.retrieve_messages')
def test_track_fails_when_retrieval_fails(
self, mock_retrieve: mock.Mock, gitdir: str
) -> None:
"""Verify track fails when series retrieval fails."""
cmdargs_enroll = argparse.Namespace(
repo_path=gitdir, identifier='retrieval-fail'
)
review_tracking.cmd_enroll(cmdargs_enroll)
mock_retrieve.return_value = (None, None)
cmdargs = argparse.Namespace(
series_id='test-msgid@example.com',
identifier='retrieval-fail',
msgid=None,
noparent=False,
wantname=None,
wantver=None,
)
with pytest.raises(SystemExit) as exc_info:
review_tracking.cmd_track(cmdargs)
assert exc_info.value.code == 1
class TestRevisions:
"""Tests for revision tracking helpers."""
def test_add_revision(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify a revision can be added and retrieved."""
conn = review_tracking.init_db('rev-add-test')
review_tracking.add_revision(
conn,
'change-abc',
1,
'msgid-v1@example.com',
subject='Test v1',
link='https://lore.kernel.org/r/msgid-v1',
)
revs = review_tracking.get_revisions(conn, 'change-abc')
assert len(revs) == 1
assert revs[0]['change_id'] == 'change-abc'
assert revs[0]['revision'] == 1
assert revs[0]['message_id'] == 'msgid-v1@example.com'
assert revs[0]['subject'] == 'Test v1'
assert revs[0]['link'] == 'https://lore.kernel.org/r/msgid-v1'
assert revs[0]['found_at'] is not None
conn.close()
def test_add_revision_idempotent(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify inserting the same revision twice results in one row."""
conn = review_tracking.init_db('rev-idem-test')
review_tracking.add_revision(conn, 'change-abc', 1, 'msgid-v1@example.com')
review_tracking.add_revision(conn, 'change-abc', 1, 'msgid-v1-dup@example.com')
revs = review_tracking.get_revisions(conn, 'change-abc')
assert len(revs) == 1
# First insert wins (INSERT OR IGNORE)
assert revs[0]['message_id'] == 'msgid-v1@example.com'
conn.close()
def test_get_revisions_ordered(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify revisions are returned in ascending order."""
conn = review_tracking.init_db('rev-order-test')
review_tracking.add_revision(conn, 'change-abc', 1, 'msgid-v1@example.com')
review_tracking.add_revision(conn, 'change-abc', 3, 'msgid-v3@example.com')
review_tracking.add_revision(conn, 'change-abc', 2, 'msgid-v2@example.com')
revs = review_tracking.get_revisions(conn, 'change-abc')
assert len(revs) == 3
assert [r['revision'] for r in revs] == [1, 2, 3]
conn.close()
def test_get_newest_revision(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify get_newest_revision returns the maximum version."""
conn = review_tracking.init_db('rev-newest-test')
review_tracking.add_revision(conn, 'change-abc', 1, 'msgid-v1@example.com')
review_tracking.add_revision(conn, 'change-abc', 3, 'msgid-v3@example.com')
review_tracking.add_revision(conn, 'change-abc', 2, 'msgid-v2@example.com')
assert review_tracking.get_newest_revision(conn, 'change-abc') == 3
conn.close()
def test_get_newest_revision_empty(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify get_newest_revision returns None when no revisions exist."""
conn = review_tracking.init_db('rev-empty-test')
assert review_tracking.get_newest_revision(conn, 'nonexistent') is None
conn.close()
def test_get_all_newest_revisions(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify bulk newest-revision query returns all change_ids."""
conn = review_tracking.init_db('rev-bulk-newest-test')
review_tracking.add_revision(conn, 'change-a', 1, 'a-v1@example.com')
review_tracking.add_revision(conn, 'change-a', 3, 'a-v3@example.com')
review_tracking.add_revision(conn, 'change-b', 2, 'b-v2@example.com')
result = review_tracking.get_all_newest_revisions(conn)
assert result == {'change-a': 3, 'change-b': 2}
conn.close()
def test_get_all_newest_revisions_empty(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify bulk newest-revision query returns empty dict with no data."""
conn = review_tracking.init_db('rev-bulk-newest-empty-test')
assert review_tracking.get_all_newest_revisions(conn) == {}
conn.close()
def test_get_all_revision_counts(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify bulk revision-count query returns correct counts."""
conn = review_tracking.init_db('rev-bulk-count-test')
review_tracking.add_revision(conn, 'change-a', 1, 'a-v1@example.com')
review_tracking.add_revision(conn, 'change-a', 2, 'a-v2@example.com')
review_tracking.add_revision(conn, 'change-a', 3, 'a-v3@example.com')
review_tracking.add_revision(conn, 'change-b', 1, 'b-v1@example.com')
result = review_tracking.get_all_revision_counts(conn)
assert result == {'change-a': 3, 'change-b': 1}
conn.close()
def test_get_all_revisions_grouped(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify bulk grouped revisions returns correct per-change-id lists."""
conn = review_tracking.init_db('rev-bulk-grouped-test')
review_tracking.add_revision(
conn, 'change-a', 2, 'a-v2@example.com', subject='A v2'
)
review_tracking.add_revision(
conn, 'change-a', 1, 'a-v1@example.com', subject='A v1'
)
review_tracking.add_revision(
conn, 'change-b', 1, 'b-v1@example.com', subject='B v1'
)
result = review_tracking.get_all_revisions_grouped(conn)
assert set(result.keys()) == {'change-a', 'change-b'}
# change-a should be sorted ascending
assert [r['revision'] for r in result['change-a']] == [1, 2]
assert len(result['change-b']) == 1
conn.close()
def test_get_all_revisions_grouped_empty(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify bulk grouped revisions returns empty dict with no data."""
conn = review_tracking.init_db('rev-bulk-grouped-empty-test')
assert review_tracking.get_all_revisions_grouped(conn) == {}
conn.close()
def test_delete_series(self, tmp_path: pytest.TempPathFactory) -> None:
"""Verify delete_series removes series and revisions for a change_id."""
conn = review_tracking.init_db('del-series-test')
# Add a series with revisions
review_tracking.add_series_to_db(
conn,
'change-del',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
)
review_tracking.add_revision(conn, 'change-del', 1, 'msgid-v1@example.com')
review_tracking.add_revision(conn, 'change-del', 2, 'msgid-v2@example.com')
# Add another series that should not be affected
review_tracking.add_series_to_db(
conn,
'change-keep',
1,
'Keep',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'keep@example.com',
1,
)
review_tracking.add_revision(conn, 'change-keep', 1, 'keep-v1@example.com')
review_tracking.delete_series(conn, 'change-del')
# Deleted change_id should be gone from both tables
cursor = conn.execute(
'SELECT * FROM series WHERE change_id = ?', ('change-del',)
)
assert cursor.fetchone() is None
assert review_tracking.get_revisions(conn, 'change-del') == []
# Other change_id should be untouched
cursor = conn.execute(
'SELECT * FROM series WHERE change_id = ?', ('change-keep',)
)
assert cursor.fetchone() is not None
assert len(review_tracking.get_revisions(conn, 'change-keep')) == 1
conn.close()
class TestUpdateSeriesStatus:
"""Tests for update_series_status()."""
def test_updates_existing_series(self, tmp_path: pytest.TempPathFactory) -> None:
conn = review_tracking.init_db('status-update-test')
review_tracking.add_series_to_db(
conn,
'change-status',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
)
review_tracking.update_series_status(conn, 'change-status', 'reviewing')
cursor = conn.execute(
'SELECT status FROM series WHERE change_id = ?', ('change-status',)
)
assert cursor.fetchone()[0] == 'reviewing'
conn.close()
def test_noop_for_nonexistent_change_id(
self, tmp_path: pytest.TempPathFactory
) -> None:
conn = review_tracking.init_db('status-noop-test')
# Should not raise
review_tracking.update_series_status(conn, 'nonexistent', 'reviewing')
conn.close()
class TestGitGetCommonDir:
"""Tests for git_get_common_dir()."""
def test_returns_git_dir_for_main_repo(self, gitdir: str) -> None:
"""Verify git_get_common_dir returns .git path for a normal repo."""
result = b4.git_get_common_dir(gitdir)
assert result is not None
expected = os.path.join(gitdir, '.git')
assert os.path.normpath(result) == os.path.normpath(expected)
def test_returns_shared_git_dir_from_worktree(self, gitdir: str) -> None:
"""Verify git_get_common_dir returns the shared .git from a worktree."""
worktree_dir = os.path.join(str(os.path.dirname(gitdir)), 'worktree')
out, _logstr = b4.git_run_command(
gitdir, ['worktree', 'add', worktree_dir, '-b', 'wt-branch']
)
assert out == 0
result = b4.git_get_common_dir(worktree_dir)
assert result is not None
expected = os.path.join(gitdir, '.git')
assert os.path.normpath(result) == os.path.normpath(expected)
def test_returns_none_for_non_git_dir(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify git_get_common_dir returns None outside a git repo."""
non_git = os.path.join(str(tmp_path), 'not-a-repo')
os.makedirs(non_git)
result = b4.git_get_common_dir(non_git)
assert result is None
class TestReviewTargetBranch:
"""Tests for review-target-branch config."""
def test_default_config_has_review_target_branch(self) -> None:
"""Verify review-target-branch is in DEFAULT_CONFIG."""
assert 'review-target-branch' in b4.DEFAULT_CONFIG
assert b4.DEFAULT_CONFIG['review-target-branch'] is None
def _create_review_branch(
topdir: str, change_id: str, tracking_data: Dict[str, Any]
) -> str:
"""Helper: create a b4/review/<change_id> branch with a tracking commit."""
branch = f'b4/review/{change_id}'
cover_text = f'Cover letter for {change_id}'
commit_msg = cover_text + '\n\n' + b4.review.make_review_magic_json(tracking_data)
# Create an orphan-ish branch off current HEAD
b4.git_run_command(topdir, ['branch', branch])
# Create a tracking commit on it via commit-tree
ecode, tree = b4.git_run_command(topdir, ['rev-parse', f'{branch}^{{tree}}'])
assert ecode == 0
tree = tree.strip()
ecode, parent = b4.git_run_command(topdir, ['rev-parse', branch])
assert ecode == 0
parent = parent.strip()
ecode, new_sha = b4.git_run_command(
topdir,
['commit-tree', tree, '-p', parent, '-F', '-'],
stdin=commit_msg.encode(),
)
assert ecode == 0
new_sha = new_sha.strip()
ecode, _ = b4.git_run_command(
topdir, ['update-ref', f'refs/heads/{branch}', new_sha]
)
assert ecode == 0
return branch
class TestUpdateTrackingStatus:
"""Tests for update_tracking_status() helper."""
def test_updates_status(self, gitdir: str) -> None:
"""Verify update_tracking_status writes status to tracking commit."""
tracking_data = {
'series': {
'identifier': 'test-proj',
'status': 'reviewing',
'revision': 1,
'change-id': 'status-test',
'subject': 'Test',
'fromname': 'Author',
'fromemail': 'a@example.com',
'expected': 1,
'complete': True,
'base-commit': 'abc123',
'prerequisite-commits': [],
'first-patch-commit': 'def456',
'header-info': {},
'link': '',
},
'followups': [],
'patches': [],
}
branch = _create_review_branch(gitdir, 'status-test', tracking_data)
result = b4.review.update_tracking_status(gitdir, branch, 'replied')
assert result is True
# Read back and verify
_cover, trk = b4.review.load_tracking(gitdir, branch)
assert trk['series']['status'] == 'replied'
def test_round_trip(self, gitdir: str) -> None:
"""Verify status survives a write-then-read round-trip."""
tracking_data = {
'series': {
'identifier': 'test-proj',
'status': 'reviewing',
'revision': 2,
'change-id': 'roundtrip-test',
'subject': 'Roundtrip',
'fromname': 'Author',
'fromemail': 'a@example.com',
'expected': 3,
'complete': True,
'base-commit': 'abc123',
'prerequisite-commits': [],
'first-patch-commit': 'def456',
'header-info': {},
'link': '',
},
'followups': [],
'patches': [],
}
branch = _create_review_branch(gitdir, 'roundtrip-test', tracking_data)
for new_status in ('replied', 'waiting', 'accepted', 'thanked'):
b4.review.update_tracking_status(gitdir, branch, new_status)
_cover, trk = b4.review.load_tracking(gitdir, branch)
assert trk['series']['status'] == new_status
def test_returns_false_for_missing_branch(self, gitdir: str) -> None:
"""Verify update_tracking_status returns False for non-existent branch."""
result = b4.review.update_tracking_status(
gitdir, 'b4/review/nonexistent', 'replied'
)
assert result is False
class TestGetReviewBranches:
"""Tests for get_review_branches()."""
def test_lists_review_branches(self, gitdir: str) -> None:
"""Verify get_review_branches finds b4/review/* branches."""
tracking_data: Dict[str, Any] = {
'series': {
'identifier': 'test-proj',
'status': 'reviewing',
'revision': 1,
'change-id': 'branch-list-1',
'subject': 'Test 1',
'fromname': 'A',
'fromemail': 'a@example.com',
'expected': 1,
'complete': True,
'base-commit': 'abc',
'prerequisite-commits': [],
'first-patch-commit': 'def',
'header-info': {},
'link': '',
},
'followups': [],
'patches': [],
}
_create_review_branch(gitdir, 'branch-list-1', tracking_data)
tracking_data['series']['change-id'] = 'branch-list-2'
_create_review_branch(gitdir, 'branch-list-2', tracking_data)
branches = review_tracking.get_review_branches(gitdir)
names = set(branches)
assert 'b4/review/branch-list-1' in names
assert 'b4/review/branch-list-2' in names
def test_returns_empty_when_none(self, gitdir: str) -> None:
"""Verify get_review_branches returns empty list with no review branches."""
branches = review_tracking.get_review_branches(gitdir)
assert branches == []
class TestRescanBranches:
"""Tests for rescan_branches()."""
def _make_tracking_data(
self,
change_id: str,
identifier: str = 'rescan-proj',
status: str = 'reviewing',
revision: int = 1,
subject: str = 'Test series',
) -> Dict[str, Any]:
return {
'series': {
'identifier': identifier,
'status': status,
'revision': revision,
'change-id': change_id,
'subject': subject,
'fromname': 'Test Author',
'fromemail': 'author@example.com',
'expected': 3,
'complete': True,
'base-commit': 'abc123',
'prerequisite-commits': [],
'first-patch-commit': 'def456',
'header-info': {
'msgid': f'{change_id}@example.com',
'sentdate': 'Mon, 15 Jan 2024 10:00:00 +0000',
},
'link': f'https://lore.kernel.org/r/{change_id}',
},
'followups': [],
'patches': [],
}
def test_rescan_single_branch(self, gitdir: str) -> None:
"""Verify rescan populates DB from a single branch."""
identifier = 'rescan-single'
review_tracking.init_db(identifier).close()
tracking_data = self._make_tracking_data(
'single-change', identifier=identifier, status='replied'
)
branch = _create_review_branch(gitdir, 'single-change', tracking_data)
review_tracking.rescan_branches(identifier, gitdir, branch=branch)
conn = review_tracking.get_db(identifier)
cursor = conn.execute(
'SELECT change_id, status, revision FROM series WHERE change_id = ?',
('single-change',),
)
row = cursor.fetchone()
assert row is not None
assert row['change_id'] == 'single-change'
assert row['status'] == 'replied'
assert row['revision'] == 1
conn.close()
def test_rescan_marks_gone(self, gitdir: str) -> None:
"""Verify full rescan marks missing branches as 'gone'."""
identifier = 'rescan-gone'
conn = review_tracking.init_db(identifier)
# Add a series to DB with 'reviewing' status but no corresponding branch
review_tracking.add_series_to_db(
conn,
'gone-change',
1,
'Gone series',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
)
review_tracking.update_series_status(conn, 'gone-change', 'reviewing')
conn.close()
review_tracking.rescan_branches(identifier, gitdir)
conn = review_tracking.get_db(identifier)
cursor = conn.execute(
'SELECT status FROM series WHERE change_id = ?', ('gone-change',)
)
row = cursor.fetchone()
assert row['status'] == 'gone'
conn.close()
def test_rescan_skips_mismatched_identifier(self, gitdir: str) -> None:
"""Verify rescan skips branches with a different identifier."""
identifier = 'rescan-mismatch'
review_tracking.init_db(identifier).close()
# Create branch with a different identifier
tracking_data = self._make_tracking_data(
'mismatch-change', identifier='other-project'
)
_create_review_branch(gitdir, 'mismatch-change', tracking_data)
review_tracking.rescan_branches(identifier, gitdir)
conn = review_tracking.get_db(identifier)
cursor = conn.execute(
'SELECT * FROM series WHERE change_id = ?', ('mismatch-change',)
)
row = cursor.fetchone()
assert row is None
conn.close()
def test_rescan_preserves_non_active_statuses(self, gitdir: str) -> None:
"""Verify full rescan does not mark accepted/thanked series as gone."""
identifier = 'rescan-preserve'
conn = review_tracking.init_db(identifier)
# Add an 'accepted' series with no branch — should NOT become 'gone'
review_tracking.add_series_to_db(
conn,
'accepted-change',
1,
'Accepted',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'msgid@example.com',
3,
)
review_tracking.update_series_status(conn, 'accepted-change', 'accepted')
conn.close()
review_tracking.rescan_branches(identifier, gitdir)
conn = review_tracking.get_db(identifier)
cursor = conn.execute(
'SELECT status FROM series WHERE change_id = ?', ('accepted-change',)
)
row = cursor.fetchone()
assert row['status'] == 'accepted'
conn.close()
def test_rescan_all_branches(self, gitdir: str) -> None:
"""Verify full rescan processes all review branches."""
identifier = 'rescan-all'
review_tracking.init_db(identifier).close()
for i in range(3):
cid = f'all-change-{i}'
tracking_data = self._make_tracking_data(cid, identifier=identifier)
_create_review_branch(gitdir, cid, tracking_data)
review_tracking.rescan_branches(identifier, gitdir)
conn = review_tracking.get_db(identifier)
cursor = conn.execute('SELECT COUNT(*) FROM series')
count = cursor.fetchone()[0]
assert count == 3
conn.close()
def test_sha_skips_unchanged_branch(self, gitdir: str) -> None:
"""Verify that a second rescan with no branch changes reports changed=0."""
identifier = 'rescan-sha-skip'
review_tracking.init_db(identifier).close()
tracking_data = self._make_tracking_data('sha-skip', identifier=identifier)
_create_review_branch(gitdir, 'sha-skip', tracking_data)
# First rescan: new branch, should be processed.
result = review_tracking.rescan_branches(identifier, gitdir)
assert result['changed'] == 1
# Second rescan: branch unchanged, should be skipped entirely.
result = review_tracking.rescan_branches(identifier, gitdir)
assert result['changed'] == 0
assert result['gone'] == 0
def test_sha_detects_changed_branch(self, gitdir: str) -> None:
"""Verify that updating a branch's tracking commit triggers a re-read."""
identifier = 'rescan-sha-change'
review_tracking.init_db(identifier).close()
tracking_data = self._make_tracking_data(
'sha-change', identifier=identifier, status='reviewing'
)
branch = _create_review_branch(gitdir, 'sha-change', tracking_data)
# First rescan: registers the branch with status 'reviewing'.
result = review_tracking.rescan_branches(identifier, gitdir)
assert result['changed'] == 1
# Amend the tracking commit on the branch with a different status.
tracking_data['series']['status'] = 'replied'
new_msg = 'Cover\n\n' + b4.review.make_review_magic_json(tracking_data)
_ecode, tree = b4.git_run_command(gitdir, ['rev-parse', f'{branch}^{{tree}}'])
tree = tree.strip()
_ecode, parent = b4.git_run_command(gitdir, ['rev-parse', branch])
parent = parent.strip()
_ecode, new_sha = b4.git_run_command(
gitdir,
['commit-tree', tree, '-p', parent, '-F', '-'],
stdin=new_msg.encode(),
)
b4.git_run_command(
gitdir, ['update-ref', f'refs/heads/{branch}', new_sha.strip()]
)
# Second rescan: SHA changed, should re-read and update status.
result = review_tracking.rescan_branches(identifier, gitdir)
assert result['changed'] == 1
conn = review_tracking.get_db(identifier)
row = conn.execute(
'SELECT status FROM series WHERE change_id = ?', ('sha-change',)
).fetchone()
assert row['status'] == 'replied'
conn.close()
def _make_test_mbox(n: int, date: str = 'Mon, 15 Jan 2024 10:00:00 +0000') -> bytes:
"""Build a minimal mbox with *n* unique messages (each has a distinct Message-ID)."""
parts = []
for i in range(n):
parts.append(
f'From foo@example.com Mon Jan 01 00:00:00 2024\n'
f'Message-ID: <msg{i}@example.com>\n'
f'Date: {date}\n\n'.encode()
)
return b''.join(parts)
class TestFollowupCounts:
"""Tests for message_count / seen_message_count tracking."""
def test_schema_has_followup_columns(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify fresh DB has message_count, seen_message_count, last_update_check, last_activity_at."""
conn = review_tracking.init_db('fc-schema-test')
cursor = conn.execute('PRAGMA table_info(series)')
col_names = {row[1] for row in cursor.fetchall()}
assert 'message_count' in col_names
assert 'seen_message_count' in col_names
assert 'last_update_check' in col_names
assert 'last_activity_at' in col_names
conn.close()
def test_migration_adds_followup_columns(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""Verify v1 DB gets followup/update columns during migration."""
import sqlite3 as _sqlite3
db_path = review_tracking.get_db_path('fc-migration-test')
# Manually build a schema-version 1 database (no branch_sha, no followup cols)
raw = _sqlite3.connect(db_path)
raw.executescript("""
CREATE TABLE schema_version (version INTEGER PRIMARY KEY);
CREATE TABLE series (
track_id INTEGER PRIMARY KEY,
change_id TEXT NOT NULL,
revision INTEGER NOT NULL,
status TEXT DEFAULT 'new',
UNIQUE (change_id, revision)
);
""")
raw.execute('INSERT INTO schema_version (version) VALUES (1)')
raw.commit()
raw.close()
# open via get_db which triggers migration
conn = review_tracking.get_db('fc-migration-test')
cursor = conn.execute('PRAGMA table_info(series)')
col_names = {row[1] for row in cursor.fetchall()}
assert 'branch_sha' in col_names
assert 'message_count' in col_names
assert 'seen_message_count' in col_names
assert 'last_update_check' in col_names
assert 'last_activity_at' in col_names
assert 'has_cover' not in col_names
row = conn.execute('SELECT version FROM schema_version').fetchone()
assert row[0] == review_tracking.SCHEMA_VERSION
conn.close()
@mock.patch('b4.review.tracking._fetch_thread_mbox_bytes')
def test_first_fetch_initialises_seen(
self, mock_mbox_bytes: mock.Mock, tmp_path: pytest.TempPathFactory
) -> None:
"""First update_message_counts sets seen = count (no badge shown yet)."""
# 9 unique messages in the thread
mock_mbox_bytes.return_value = _make_test_mbox(9)
conn = review_tracking.init_db('fc-first-test')
review_tracking.add_series_to_db(
conn,
'fc-change',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'cover@example.com',
3,
)
conn.close()
series_list = [
{
'change_id': 'fc-change',
'revision': 1,
'message_id': 'cover@example.com',
'num_patches': 3,
'status': 'new',
}
]
result = review_tracking.update_message_counts('fc-first-test', series_list)
assert result['updated'] == 1
assert result['errors'] == 0
conn = review_tracking.get_db('fc-first-test')
row = conn.execute(
'SELECT message_count, seen_message_count, last_update_check, last_activity_at'
' FROM series WHERE change_id = ?',
('fc-change',),
).fetchone()
assert row['message_count'] == 9
# First fetch: seen initialised to same value — no badge yet
assert row['seen_message_count'] == 9
assert row['last_update_check'] is not None
assert row['last_activity_at'] == '2024-01-15T10:00:00+00:00'
conn.close()
@mock.patch('b4.review.tracking._fetch_new_since')
@mock.patch('b4.review.tracking._fetch_thread_mbox_bytes')
def test_incremental_fetch_adds_new_count(
self,
mock_fetch: mock.Mock,
mock_new_since: mock.Mock,
tmp_path: pytest.TempPathFactory,
) -> None:
"""Incremental update adds new message count and keeps seen unchanged."""
# 9 unique messages in the thread
mock_fetch.return_value = _make_test_mbox(9)
# incremental: 3 new messages, with a newer activity date
mock_new_since.return_value = (3, '2024-02-01T00:00:00+00:00')
conn = review_tracking.init_db('fc-incr-test')
review_tracking.add_series_to_db(
conn,
'fc-change2',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'cover2@example.com',
3,
)
conn.close()
series_list = [
{
'change_id': 'fc-change2',
'revision': 1,
'message_id': 'cover2@example.com',
'num_patches': 3,
'status': 'reviewing',
}
]
# First fetch: seen = count = 9, last_update_check set
review_tracking.update_message_counts('fc-incr-test', series_list)
# Incremental: 3 new messages since last check
result = review_tracking.update_message_counts('fc-incr-test', series_list)
assert result['updated'] == 1
conn = review_tracking.get_db('fc-incr-test')
row = conn.execute(
'SELECT message_count, seen_message_count, last_activity_at FROM series'
' WHERE change_id = ?',
('fc-change2',),
).fetchone()
assert row['message_count'] == 12 # 9 + 3
assert row['seen_message_count'] == 9 # badge shows +3
assert row['last_activity_at'] == '2024-02-01T00:00:00+00:00'
conn.close()
@mock.patch('b4.review.tracking._fetch_new_since')
@mock.patch('b4.review.tracking._fetch_thread_mbox_bytes')
def test_incremental_noop_makes_no_db_write(
self,
mock_fetch: mock.Mock,
mock_new_since: mock.Mock,
tmp_path: pytest.TempPathFactory,
) -> None:
"""Incremental update with zero new messages writes nothing to the DB."""
# 9 unique messages in the thread
mock_fetch.return_value = _make_test_mbox(9)
mock_new_since.return_value = (0, None) # no new messages
conn = review_tracking.init_db('fc-noop-test')
review_tracking.add_series_to_db(
conn,
'fc-change3',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'cover3@example.com',
3,
)
conn.close()
series_list = [
{
'change_id': 'fc-change3',
'revision': 1,
'message_id': 'cover3@example.com',
'num_patches': 3,
'status': 'reviewing',
}
]
# First fetch sets the baseline
review_tracking.update_message_counts('fc-noop-test', series_list)
import os
db_path = review_tracking.get_db_path('fc-noop-test')
mtime_before = os.path.getmtime(db_path)
# Incremental no-op — should not touch the DB at all
result = review_tracking.update_message_counts('fc-noop-test', series_list)
assert result['updated'] == 0
assert result['errors'] == 0
assert os.path.getmtime(db_path) == mtime_before
def test_mark_all_messages_seen_clears_badge(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""mark_all_messages_seen sets seen_message_count = message_count."""
conn = review_tracking.init_db('fc-seen-test')
review_tracking.add_series_to_db(
conn,
'fc-seen',
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'cover3@example.com',
3,
)
# Manually set a delta
conn.execute(
'UPDATE series SET message_count = 10, seen_message_count = 6'
' WHERE change_id = ?',
('fc-seen',),
)
conn.commit()
review_tracking.mark_all_messages_seen(conn, 'fc-seen', 1)
conn.close()
# Reopen with get_db to get row_factory for named column access
conn = review_tracking.get_db('fc-seen-test')
row = conn.execute(
'SELECT message_count, seen_message_count FROM series WHERE change_id = ?',
('fc-seen',),
).fetchone()
assert row['message_count'] == 10
assert row['seen_message_count'] == 10
conn.close()
def test_followup_fetch_skips_offline(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""fetch_thread_message_count and _fetch_thread_mbox_bytes return None offline."""
# can_network is False in test fixture — no mock needed
assert review_tracking._fetch_thread_mbox_bytes('any@example.com') is None
assert review_tracking.fetch_thread_message_count('any@example.com') is None
def test_update_message_counts_skips_terminal_statuses(
self, tmp_path: pytest.TempPathFactory
) -> None:
"""update_message_counts skips archived/accepted/thanked series."""
conn = review_tracking.init_db('fc-skip-test')
for status in ('archived', 'accepted', 'thanked'):
cid = f'fc-{status}'
review_tracking.add_series_to_db(
conn,
cid,
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
f'{cid}@example.com',
3,
)
review_tracking.update_series_status(conn, cid, status)
conn.close()
series_list = [
{
'change_id': f'fc-{s}',
'revision': 1,
'message_id': f'fc-{s}@example.com',
'num_patches': 3,
'status': s,
}
for s in ('archived', 'accepted', 'thanked')
]
result = review_tracking.update_message_counts('fc-skip-test', series_list)
# None fetched — all skipped, no errors
assert result['updated'] == 0
assert result['errors'] == 0
def _make_mbox_bytes(num_msgs: int, prefix: str = 'msg') -> bytes:
"""Return valid mboxrd bytes containing *num_msgs* messages with unique IDs."""
result = b''
for i in range(num_msgs):
result += (
f'From sender@example.com Mon Jan 15 10:00:00 2024\n'
f'Message-ID: <{prefix}-{i}@example.com>\n'
f'From: Test Author <author@example.com>\n'
f'Date: Mon, 15 Jan 2024 10:00:00 +0000\n'
f'Subject: Test message {i}\n'
f'\n'
f'Body of message {i}\n'
f'\n'
).encode()
return result
def _make_test_msg(msgid: str = 'test@example.com') -> EmailMessage:
"""Return a minimal EmailMessage suitable for passing to _store_thread_blob."""
msg = EmailMessage()
msg['Message-ID'] = f'<{msgid}>'
msg['From'] = 'Test Author <author@example.com>'
msg['Date'] = 'Mon, 15 Jan 2024 10:00:00 +0000'
msg['Subject'] = 'Test'
msg.set_payload('Hello world\n')
return msg
def _make_blob_tracking_data(
change_id: str, identifier: str = 'blob-proj'
) -> Dict[str, Any]:
"""Return a minimal tracking dict for blob tests."""
return {
'series': {
'identifier': identifier,
'status': 'reviewing',
'revision': 1,
'change-id': change_id,
'subject': 'Test series',
'fromname': 'Test Author',
'fromemail': 'author@example.com',
'expected': 3,
'complete': True,
'base-commit': 'abc123',
'prerequisite-commits': [],
'first-patch-commit': 'def456',
'header-info': {
'msgid': f'{change_id}@example.com',
'sentdate': 'Mon, 15 Jan 2024 10:00:00 +0000',
},
'link': f'https://lore.kernel.org/r/{change_id}',
},
'followups': [],
'patches': [],
}
class TestFollowupBlob:
"""Tests for _store_thread_blob() and get_thread_mbox()."""
def test_store_thread_blob_writes_blob_and_updates_tracking_json(
self, gitdir: str
) -> None:
"""_store_thread_blob serializes msgs via save_mboxrd_mbox and records SHA."""
change_id = 'blob-write-test'
_create_review_branch(gitdir, change_id, _make_blob_tracking_data(change_id))
msgs = [_make_test_msg('cover@example.com')]
blob_sha = review_tracking._store_thread_blob(gitdir, change_id, msgs)
assert blob_sha is not None
# Blob content must equal what save_mboxrd_mbox produces for those messages
expected_buf = io.BytesIO()
b4.save_mboxrd_mbox(msgs, expected_buf)
ecode, content = b4.git_run_command(
gitdir, ['cat-file', 'blob', blob_sha], decode=False
)
assert ecode == 0
assert content == expected_buf.getvalue()
# Tracking commit JSON must carry the blob SHA
_cover, loaded = b4.review.load_tracking(gitdir, f'b4/review/{change_id}')
assert loaded['series']['thread-blob'] == blob_sha
def test_store_thread_blob_skips_update_when_sha_unchanged(
self, gitdir: str
) -> None:
"""_store_thread_blob avoids a new tracking commit when SHA is unchanged."""
change_id = 'blob-nochurn-test'
_create_review_branch(gitdir, change_id, _make_blob_tracking_data(change_id))
msgs = [_make_test_msg('nochurn@example.com')]
sha1 = review_tracking._store_thread_blob(gitdir, change_id, msgs)
assert sha1 is not None
ecode, tip1 = b4.git_run_command(
gitdir, ['rev-parse', f'b4/review/{change_id}']
)
assert ecode == 0
# Second call with identical messages — SHA and branch tip unchanged
sha2 = review_tracking._store_thread_blob(gitdir, change_id, msgs)
assert sha2 == sha1
ecode, tip2 = b4.git_run_command(
gitdir, ['rev-parse', f'b4/review/{change_id}']
)
assert ecode == 0
assert tip2.strip() == tip1.strip()
def test_get_thread_mbox_returns_bytes(self, gitdir: str) -> None:
"""get_thread_mbox returns the exact bytes written to the blob."""
sample = b'From mboxrd@z Thu Jan 1 00:00:00 1970\nSubject: hi\n\nbody\n'
ecode, blob_sha = b4.git_run_command(
gitdir, ['hash-object', '-w', '--stdin'], stdin=sample
)
assert ecode == 0
result = review_tracking.get_thread_mbox(gitdir, blob_sha.strip())
assert result == sample
def test_get_thread_mbox_returns_none_for_missing_sha(self, gitdir: str) -> None:
"""get_thread_mbox returns None (not an exception) for a bogus SHA."""
result = review_tracking.get_thread_mbox(gitdir, 'deadbeef' * 5)
assert result is None
@mock.patch('b4.review.tracking._fetch_thread_mbox_bytes')
def test_update_message_counts_stores_blob_on_first_fetch(
self, mock_mbox: mock.Mock, gitdir: str
) -> None:
"""update_message_counts writes a thread blob on the first fetch."""
mock_mbox.return_value = _make_mbox_bytes(9, prefix='ff')
change_id = 'blob-first-fetch'
_create_review_branch(
gitdir, change_id, _make_blob_tracking_data(change_id, 'blob-ff-proj')
)
conn = review_tracking.init_db('blob-ff-proj')
review_tracking.add_series_to_db(
conn,
change_id,
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'blob-first@example.com',
3,
)
conn.close()
series_list = [
{
'change_id': change_id,
'revision': 1,
'message_id': 'blob-first@example.com',
'num_patches': 3,
'status': 'reviewing',
}
]
review_tracking.update_message_counts(
'blob-ff-proj', series_list, topdir=gitdir
)
_cover, loaded = b4.review.load_tracking(gitdir, f'b4/review/{change_id}')
blob_sha = loaded['series'].get('thread-blob')
assert blob_sha is not None
# Blob must be readable
ecode, _ = b4.git_run_command(
gitdir, ['cat-file', 'blob', blob_sha], decode=False
)
assert ecode == 0
@mock.patch('b4.review.tracking._fetch_new_since')
@mock.patch('b4.review.tracking._fetch_thread_mbox_bytes')
def test_update_message_counts_updates_blob_on_incremental(
self, mock_fetch: mock.Mock, mock_new_since: mock.Mock, gitdir: str
) -> None:
"""update_message_counts replaces the blob when new replies arrive."""
# Different prefixes → different Message-IDs → different blobs
initial_mbox = _make_mbox_bytes(5, prefix='init')
larger_mbox = _make_mbox_bytes(8, prefix='upd')
mock_fetch.return_value = initial_mbox
mock_new_since.return_value = (3, '2024-02-01T00:00:00+00:00')
change_id = 'blob-incr-test'
_create_review_branch(
gitdir, change_id, _make_blob_tracking_data(change_id, 'blob-incr-proj')
)
conn = review_tracking.init_db('blob-incr-proj')
review_tracking.add_series_to_db(
conn,
change_id,
1,
'Subject',
'Author',
'a@example.com',
'2024-01-15T10:00:00+00:00',
'blob-incr@example.com',
3,
)
conn.close()
series_list = [
{
'change_id': change_id,
'revision': 1,
'message_id': 'blob-incr@example.com',
'num_patches': 3,
'status': 'reviewing',
}
]
# First fetch — stores initial blob
review_tracking.update_message_counts(
'blob-incr-proj', series_list, topdir=gitdir
)
_cover, loaded = b4.review.load_tracking(gitdir, f'b4/review/{change_id}')
sha_initial = loaded['series'].get('thread-blob')
assert sha_initial is not None
# Incremental — _fetch_thread_mbox_bytes now returns the larger mbox
mock_fetch.return_value = larger_mbox
review_tracking.update_message_counts(
'blob-incr-proj', series_list, topdir=gitdir
)
_cover, loaded = b4.review.load_tracking(gitdir, f'b4/review/{change_id}')
sha_updated = loaded['series'].get('thread-blob')
assert sha_updated is not None
assert sha_updated != sha_initial
class TestPatchState:
"""Tests for _get_patch_state() and _set_patch_state()."""
_EMAIL = 'reviewer@example.com'
_USERCFG: b4.ConfigDictT = {'email': _EMAIL, 'name': 'Test Reviewer'}
def _make_target(self, review_data: Dict[str, Any] | None = None) -> Dict[str, Any]:
"""Return a minimal target dict, optionally with review data."""
if review_data is None:
return {}
return {'reviews': {self._EMAIL: {'name': 'Test Reviewer', **review_data}}}
def test_no_data(self) -> None:
"""Empty reviews dict → no state."""
target = self._make_target()
assert b4.review._get_patch_state(target, self._USERCFG) == ''
def test_note_only(self) -> None:
"""A private note alone never triggers 'draft'."""
target = self._make_target({'note': 'just a private note'})
assert b4.review._get_patch_state(target, self._USERCFG) == ''
def test_comments(self) -> None:
"""Inline comments list → 'draft'."""
target = self._make_target(
{'comments': [{'path': 'a.c', 'line': 1, 'text': 'hi'}]}
)
assert b4.review._get_patch_state(target, self._USERCFG) == 'draft'
def test_reply(self) -> None:
"""Non-empty reply text → 'draft'."""
target = self._make_target({'reply': 'Looks good overall but...'})
assert b4.review._get_patch_state(target, self._USERCFG) == 'draft'
def test_reviewed_by(self) -> None:
"""Reviewed-by trailer → 'done'."""
target = self._make_target(
{'trailers': ['Reviewed-by: Test Reviewer <reviewer@example.com>']}
)
assert b4.review._get_patch_state(target, self._USERCFG) == 'done'
def test_acked_by(self) -> None:
"""Acked-by trailer → 'done'."""
target = self._make_target(
{'trailers': ['Acked-by: Test Reviewer <reviewer@example.com>']}
)
assert b4.review._get_patch_state(target, self._USERCFG) == 'done'
def test_nacked_by_alone(self) -> None:
"""NACKed-by trailer alone → 'draft' (explanation required)."""
target = self._make_target(
{'trailers': ['NACKed-by: Test Reviewer <reviewer@example.com>']}
)
assert b4.review._get_patch_state(target, self._USERCFG) == 'draft'
def test_nacked_by_with_acked(self) -> None:
"""NACK wins over Acked-by — result is still 'draft'."""
target = self._make_target(
{
'trailers': [
'NACKed-by: Test Reviewer <reviewer@example.com>',
'Acked-by: Test Reviewer <reviewer@example.com>',
]
}
)
assert b4.review._get_patch_state(target, self._USERCFG) == 'draft'
def test_explicit_done(self) -> None:
"""Stored patch-state=done with no other content → 'done'."""
target = self._make_target({'patch-state': 'done'})
assert b4.review._get_patch_state(target, self._USERCFG) == 'done'
def test_explicit_skip(self) -> None:
"""Stored patch-state=skip → 'skip'."""
target = self._make_target({'patch-state': 'skip'})
assert b4.review._get_patch_state(target, self._USERCFG) == 'skip'
def test_explicit_done_beats_nack(self) -> None:
"""Explicit done overrides a NACKed-by trailer (human override wins)."""
target = self._make_target(
{
'patch-state': 'done',
'trailers': ['NACKed-by: Test Reviewer <reviewer@example.com>'],
}
)
assert b4.review._get_patch_state(target, self._USERCFG) == 'done'
def test_set_and_clear(self) -> None:
"""_set_patch_state done then clear → state '' and entry cleaned up."""
target: dict[str, Any] = {}
b4.review._set_patch_state(target, self._USERCFG, 'done')
assert b4.review._get_patch_state(target, self._USERCFG) == 'done'
b4.review._set_patch_state(target, self._USERCFG, '')
assert b4.review._get_patch_state(target, self._USERCFG) == ''
# The review entry should have been cleaned up entirely
assert not target.get('reviews', {})
def test_set_skip_preserves_entry(self) -> None:
"""_set_patch_state skip keeps the review entry (not GC'd)."""
target: dict[str, Any] = {}
b4.review._set_patch_state(target, self._USERCFG, 'skip')
assert b4.review._get_patch_state(target, self._USERCFG) == 'skip'
# Entry must still be present so the skip is persisted
assert self._USERCFG['email'] in target.get('reviews', {})
class TestBuildReplyFromComments:
"""Tests for _build_reply_from_comments() context-limiting logic."""
# Minimal diff with a 40-line added hunk so we can test truncation.
# Lines are numbered from 1 in the diff (b_line), matching comment['line'].
_DIFF = (
'diff --git a/foo.py b/foo.py\n'
'--- a/foo.py\n'
'+++ b/foo.py\n'
'@@ -0,0 +1,40 @@\n' + ''.join(f'+line{i}\n' for i in range(1, 41))
)
def _make_comment(self, line: int, text: str) -> dict[str, Any]:
return {'path': 'b/foo.py', 'line': line, 'text': text}
def _call(self, comments: list[dict[str, Any]]) -> list[str]:
result = b4.review._build_reply_from_comments(self._DIFF, comments, [])
return result.splitlines()
def _skip_markers(self, lines: list[str]) -> list[str]:
"""Return all skip-marker lines from the output."""
return [line for line in lines if line.startswith('> [ ... skip')]
def test_short_hunk_no_skip_marker(self) -> None:
"""Comment within 5 lines of hunk start → no skip marker of any kind."""
lines = self._call([self._make_comment(3, 'nice')])
assert not self._skip_markers(lines)
# @@ header always present
assert any('@@ -0,0 +1,40 @@' in line for line in lines)
# All 3 added lines quoted
assert '> +line1' in lines
assert '> +line2' in lines
assert '> +line3' in lines
# Line 4 not quoted
assert '> +line4' not in lines
def test_few_skipped_lines_included_directly(self) -> None:
"""Gap of fewer than 3 lines → lines included, no skip marker."""
# Comment on line 8: window_start=3, skipped=2 (lines 1-2 before window)
lines = self._call([self._make_comment(8, 'here')])
assert not self._skip_markers(lines)
# Lines 1 and 2 should be included directly (gap < 3)
assert '> +line1' in lines
assert '> +line2' in lines
assert '> +line8' in lines
def test_distant_comment_gets_skip_marker(self) -> None:
"""Comment far from hunk start → skip marker with line count inserted."""
lines = self._call([self._make_comment(20, 'issue here')])
markers = self._skip_markers(lines)
assert len(markers) == 1
assert 'skip 14 lines' in markers[0]
# @@ header present
assert any('@@ -0,0 +1,40 @@' in line for line in lines)
# Only lines 15-20 quoted (5 context + the commented line)
assert '> +line15' in lines
assert '> +line20' in lines
# Line 14 not quoted
assert '> +line14' not in lines
# Comment text present
assert 'issue here' in lines
def test_two_distant_comments_skip_marker_between(self) -> None:
"""Two comments far apart produce exactly one skip marker between them.
Comment at line 2 is close to the @@ header (no leading marker);
comment at line 30 is far enough from line 2 to need a gap marker.
"""
comments = [
self._make_comment(2, 'first comment'),
self._make_comment(30, 'second comment'),
]
lines = self._call(comments)
assert len(self._skip_markers(lines)) == 1
assert 'first comment' in lines
assert 'second comment' in lines
def test_two_comments_both_distant_two_skip_markers(self) -> None:
"""Two comments both far from hunk start and each other → two skip markers."""
comments = [
self._make_comment(20, 'first comment'),
self._make_comment(40, 'second comment'),
]
lines = self._call(comments)
assert len(self._skip_markers(lines)) == 2
assert 'first comment' in lines
assert 'second comment' in lines
def test_two_adjacent_comments_no_extra_skip_marker(self) -> None:
"""Two comments close together → only one skip marker (before first window)."""
comments = [
self._make_comment(20, 'a'),
self._make_comment(22, 'b'),
]
lines = self._call(comments)
assert len(self._skip_markers(lines)) == 1
# Context window for comment at 20: lines 15-20
# Comment at 22: lines 21-22 (adjacent, no second skip)
assert '> +line15' in lines
assert '> +line22' in lines
def test_hunk_header_always_present(self) -> None:
"""The @@ hunk header is always included even for a comment on line 20."""
lines = self._call([self._make_comment(20, 'end')])
assert any('@@ -0,0 +1,40 @@' in line for line in lines)
assert self._skip_markers(lines)
assert '> +line20' in lines
assert '> +line14' not in lines
def test_no_duplicate_lines_between_comments(self) -> None:
"""Lines are never quoted twice when two comments share context."""
comments = [
self._make_comment(8, 'x'),
self._make_comment(10, 'y'),
]
lines = self._call(comments)
quoted = [line for line in lines if line.startswith('> +')]
# Each quoted diff line should appear exactly once
assert len(quoted) == len(set(quoted))
class TestFormatSnoozeUntil:
"""Tests for the _format_snooze_until() display helper."""
def test_date_only_string(self) -> None:
"""Date-only values get an 'until' prefix for backward compat."""
assert _format_snooze_until('2026-04-01') == 'until 2026-04-01'
def test_expired_datetime(self) -> None:
"""A datetime in the past returns 'expired'."""
past = (
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1)
).isoformat()
assert _format_snooze_until(past) == 'expired'
def test_future_days_hours_minutes(self) -> None:
"""A datetime ~1d 2h 30m in the future shows all three components."""
target = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
days=1, hours=2, minutes=30, seconds=30
)
result = _format_snooze_until(target.isoformat())
assert result.startswith('wakes in 1d 2h 30m')
assert '(' in result # contains the local date/time
def test_future_hours_only(self) -> None:
"""A datetime exactly 3h in the future shows hours (and maybe minutes)."""
target = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
hours=3, seconds=30
)
result = _format_snooze_until(target.isoformat())
assert 'wakes in' in result
assert '3h' in result
assert re.search(r'\(\d{4}-\d{2}-\d{2} \d{2}:\d{2}\)', result)
def test_future_minutes_only(self) -> None:
"""A datetime 45m in the future shows only minutes."""
target = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
minutes=45, seconds=30
)
result = _format_snooze_until(target.isoformat())
assert 'wakes in 45m' in result
assert 'd' not in result.split('(')[0]
assert 'h' not in result.split('(')[0]
def test_future_less_than_one_minute(self) -> None:
"""A datetime <1m away shows '<1m'."""
target = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
seconds=20
)
result = _format_snooze_until(target.isoformat())
assert 'wakes in <1m' in result
def test_tag_value(self) -> None:
"""A tag: prefixed value shows 'until tag <tagname>'."""
assert _format_snooze_until('tag:v6.15-rc3') == 'until tag v6.15-rc3'
def test_invalid_datetime_with_T(self) -> None:
"""A string containing T that isn't a valid datetime returns as-is."""
assert _format_snooze_until('NOT_A_DATE') == 'NOT_A_DATE'
def test_local_time_shown(self) -> None:
"""The parenthesised local time uses YYYY-MM-DD HH:MM format."""
target = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
hours=6
)
result = _format_snooze_until(target.isoformat())
local_dt = target.astimezone()
expected_str = local_dt.strftime('%Y-%m-%d %H:%M')
assert expected_str in result
class TestSnoozeDurationRegex:
"""Tests for SnoozeScreen._DURATION_RE pattern matching."""
@pytest.mark.parametrize(
'input_str,expected_value,expected_unit',
[
('30m', 30, 'm'),
('3h', 3, 'h'),
('1d', 1, 'd'),
('2w', 2, 'w'),
('7', 7, ''),
('30 m', 30, 'm'),
('3H', 3, 'H'),
('1D', 1, 'D'),
('2W', 2, 'W'),
('45M', 45, 'M'),
],
)
def test_valid_durations(
self, input_str: str, expected_value: int, expected_unit: str
) -> None:
"""Valid duration strings are parsed correctly."""
m = SnoozeScreen._DURATION_RE.match(input_str)
assert m is not None
assert int(m.group(1)) == expected_value
assert m.group(2) == expected_unit
@pytest.mark.parametrize(
'input_str',
[
'abc',
'3x',
'h3',
'm',
'',
'3.5h',
'-1d',
'3hh',
],
)
def test_invalid_durations(self, input_str: str) -> None:
"""Invalid duration strings are rejected."""
assert SnoozeScreen._DURATION_RE.match(input_str) is None
class TestGetExpiredSnoozedDatetime:
"""Verify get_expired_snoozed() works with full ISO datetimes."""
def _make_snoozed_series(
self, conn: Any, change_id: str, snoozed_until: str
) -> None:
"""Insert a snoozed series with a given wake-up time."""
review_tracking.add_series_to_db(
conn,
change_id=change_id,
revision=1,
subject='Test subject',
sender_name='Test Author',
sender_email='test@example.com',
sent_at='2026-01-01T10:00:00+00:00',
message_id=f'{change_id}@example.com',
num_patches=1,
)
review_tracking.snooze_series(conn, change_id, snoozed_until)
def test_past_datetime_is_expired(self) -> None:
"""A series snoozed until a past datetime shows up as expired."""
conn = review_tracking.init_db('snooze-past-dt')
past = (
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=5)
).isoformat()
self._make_snoozed_series(conn, 'past-dt-id', past)
expired = review_tracking.get_expired_snoozed(conn)
assert len(expired) == 1
assert expired[0]['change_id'] == 'past-dt-id'
conn.close()
def test_future_datetime_not_expired(self) -> None:
"""A series snoozed until a future datetime does not show up."""
conn = review_tracking.init_db('snooze-future-dt')
future = (
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=2)
).isoformat()
self._make_snoozed_series(conn, 'future-dt-id', future)
expired = review_tracking.get_expired_snoozed(conn)
assert len(expired) == 0
conn.close()
def test_past_date_only_is_expired(self) -> None:
"""A legacy date-only value in the past still works."""
conn = review_tracking.init_db('snooze-past-date')
yesterday = (
datetime.datetime.now(datetime.timezone.utc).date()
- datetime.timedelta(days=1)
).isoformat()
self._make_snoozed_series(conn, 'past-date-id', yesterday)
expired = review_tracking.get_expired_snoozed(conn)
assert len(expired) == 1
assert expired[0]['change_id'] == 'past-date-id'
conn.close()
def test_future_date_only_not_expired(self) -> None:
"""A legacy date-only value in the future still works."""
conn = review_tracking.init_db('snooze-future-date')
tomorrow = (
datetime.datetime.now(datetime.timezone.utc).date()
+ datetime.timedelta(days=2)
).isoformat()
self._make_snoozed_series(conn, 'future-date-id', tomorrow)
expired = review_tracking.get_expired_snoozed(conn)
assert len(expired) == 0
conn.close()
def test_mixed_date_and_datetime(self) -> None:
"""Both legacy date-only and new datetime values handled together."""
conn = review_tracking.init_db('snooze-mixed')
past_dt = (
datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(minutes=30)
).isoformat()
yesterday = (
datetime.datetime.now(datetime.timezone.utc).date()
- datetime.timedelta(days=1)
).isoformat()
future_dt = (
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=5)
).isoformat()
self._make_snoozed_series(conn, 'expired-dt', past_dt)
self._make_snoozed_series(conn, 'expired-date', yesterday)
self._make_snoozed_series(conn, 'still-sleeping', future_dt)
expired = review_tracking.get_expired_snoozed(conn)
expired_ids = {e['change_id'] for e in expired}
assert expired_ids == {'expired-dt', 'expired-date'}
conn.close()
def test_tag_snoozed_not_in_expired(self) -> None:
"""Tag-based snoozed entries don't appear in time-based expiry results."""
conn = review_tracking.init_db('snooze-tag-not-expired')
self._make_snoozed_series(conn, 'tag-id', 'tag:v6.15-rc3')
expired = review_tracking.get_expired_snoozed(conn)
assert len(expired) == 0
conn.close()
def test_get_tag_snoozed(self) -> None:
"""get_tag_snoozed returns only tag: prefixed entries."""
conn = review_tracking.init_db('snooze-tag-query')
# Add a tag-based snooze
self._make_snoozed_series(conn, 'tag-id', 'tag:v6.15-rc3')
# Add a time-based snooze
future = (
datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(hours=2)
).isoformat()
self._make_snoozed_series(conn, 'time-id', future)
tag_results = review_tracking.get_tag_snoozed(conn)
assert len(tag_results) == 1
assert tag_results[0]['change_id'] == 'tag-id'
assert tag_results[0]['snoozed_until'] == 'tag:v6.15-rc3'
conn.close()
# -- Tests for attestation DB operations -------------------------------------
class TestAttestationDb:
"""Tests for attestation storage and schema migration."""
def _add_test_series(
self, conn: Any, change_id: str = 'att-test-id', revision: int = 1
) -> int:
"""Insert a minimal series row and return its track_id."""
return review_tracking.add_series_to_db(
conn,
change_id=change_id,
revision=revision,
subject='Test attestation series',
sender_name='Test Author',
sender_email='author@example.com',
sent_at='2024-06-01T10:00:00+00:00',
message_id='att-test@example.com',
num_patches=2,
)
def test_new_series_has_pending_attestation(self) -> None:
"""Newly added series default to 'pending' attestation."""
conn = review_tracking.init_db('att-default')
self._add_test_series(conn)
row = conn.execute(
"SELECT attestation FROM series WHERE change_id = 'att-test-id'"
).fetchone()
assert row[0] == 'pending'
conn.close()
def test_update_attestation_stores_value(self) -> None:
"""update_attestation() writes the value to the DB."""
ident = 'att-update'
conn = review_tracking.init_db(ident)
self._add_test_series(conn)
conn.close()
review_tracking.update_attestation(
ident, 'att-test-id', 1, 'signed:DKIM/kernel.org'
)
conn = review_tracking.get_db(ident)
row = conn.execute(
"SELECT attestation FROM series WHERE change_id = 'att-test-id'"
).fetchone()
assert row[0] == 'signed:DKIM/kernel.org'
conn.close()
def test_update_attestation_none_value(self) -> None:
"""update_attestation() can store None (policy off)."""
ident = 'att-none'
conn = review_tracking.init_db(ident)
self._add_test_series(conn)
conn.close()
review_tracking.update_attestation(ident, 'att-test-id', 1, None)
conn = review_tracking.get_db(ident)
row = conn.execute(
"SELECT attestation FROM series WHERE change_id = 'att-test-id'"
).fetchone()
assert row[0] is None
conn.close()
def test_update_attestation_overwrite(self) -> None:
"""update_attestation() overwrites a previous value."""
ident = 'att-overwrite'
conn = review_tracking.init_db(ident)
self._add_test_series(conn)
conn.close()
review_tracking.update_attestation(ident, 'att-test-id', 1, 'none')
review_tracking.update_attestation(
ident, 'att-test-id', 1, 'signed:DKIM/kernel.org'
)
conn = review_tracking.get_db(ident)
row = conn.execute(
"SELECT attestation FROM series WHERE change_id = 'att-test-id'"
).fetchone()
assert row[0] == 'signed:DKIM/kernel.org'
conn.close()
def test_update_attestation_wrong_revision_no_crash(self) -> None:
"""update_attestation() for a non-existent revision silently does nothing."""
ident = 'att-wrong-rev'
conn = review_tracking.init_db(ident)
self._add_test_series(conn)
conn.close()
# revision 99 doesn't exist — should not raise
review_tracking.update_attestation(
ident, 'att-test-id', 99, 'signed:DKIM/kernel.org'
)
conn = review_tracking.get_db(ident)
row = conn.execute(
"SELECT attestation FROM series WHERE change_id = 'att-test-id'"
).fetchone()
assert row[0] == 'pending' # unchanged
conn.close()
def test_get_all_tracked_series_includes_attestation(self) -> None:
"""get_all_tracked_series() includes the attestation field."""
ident = 'att-listing'
conn = review_tracking.init_db(ident)
self._add_test_series(conn)
conn.close()
review_tracking.update_attestation(
ident, 'att-test-id', 1, 'nokey:ed25519/dev@example.com'
)
series_list = review_tracking.get_all_tracked_series(ident)
assert len(series_list) == 1
assert series_list[0]['attestation'] == 'nokey:ed25519/dev@example.com'
def test_get_all_tracked_series_includes_snoozed_until(self) -> None:
"""get_all_tracked_series() includes snoozed_until for snoozed series."""
ident = 'snoozed-listing'
conn = review_tracking.init_db(ident)
self._add_test_series(conn)
review_tracking.snooze_series(
conn, 'att-test-id', '2026-06-01T00:00:00', revision=1
)
conn.close()
series_list = review_tracking.get_all_tracked_series(ident)
assert len(series_list) == 1
assert series_list[0]['snoozed_until'] == '2026-06-01T00:00:00'
def test_schema_v4_migration_adds_attestation(self) -> None:
"""Migrating from schema v4 adds the attestation column."""
import sqlite3
ident = 'att-migrate-v4'
# Create a v4-style database manually
db_path = review_tracking.get_db_path(ident)
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = sqlite3.connect(db_path)
# Create the tables without the attestation column
conn.executescript("""
CREATE TABLE schema_version (version INTEGER PRIMARY KEY);
INSERT INTO schema_version VALUES (4);
CREATE TABLE series (
track_id INTEGER PRIMARY KEY,
change_id TEXT NOT NULL,
revision INTEGER NOT NULL,
subject TEXT,
sender_name TEXT,
sender_email TEXT,
sent_at TEXT,
added_at TEXT,
message_id TEXT,
num_patches INTEGER,
pw_series_id INTEGER,
status TEXT DEFAULT 'new',
fingerprint TEXT,
branch_sha TEXT,
message_count INT,
seen_message_count INT,
last_update_check TEXT,
last_activity_at TEXT,
snoozed_until TEXT,
UNIQUE (change_id, revision)
);
INSERT INTO series (change_id, revision, subject) VALUES ('migrate-id', 1, 'Test');
""")
conn.close()
# Opening via get_db triggers migration
conn = review_tracking.get_db(ident)
row = conn.execute(
"SELECT attestation FROM series WHERE change_id = 'migrate-id'"
).fetchone()
assert row[0] == 'pending'
version = conn.execute('SELECT version FROM schema_version').fetchone()[0]
assert version == review_tracking.SCHEMA_VERSION
conn.close()
# -- Tests for _format_attestation() display helper --------------------------
class TestFormatAttestation:
"""Tests for the _format_attestation() display helper."""
def test_pending_returns_none(self) -> None:
"""'pending' produces no display text."""
assert _format_attestation('pending') is None
def test_none_string_returns_none(self) -> None:
"""'none' (no signatures) produces no display text."""
assert _format_attestation('none') is None
def test_empty_string_returns_none(self) -> None:
"""Empty string produces no display text."""
assert _format_attestation('') is None
def test_signed_dkim(self) -> None:
"""A signed DKIM entry shows a checkmark and identity."""
text = _format_attestation('signed:DKIM/kernel.org')
assert text is not None
plain = text.plain
assert '\u2714' in plain # ✔
assert 'DKIM/kernel.org' in plain
def test_nokey_shows_question_mark(self) -> None:
"""A nokey entry shows a question mark, identity and '(no key)' hint."""
text = _format_attestation('nokey:ed25519/user@example.com')
assert text is not None
plain = text.plain
assert '?' in plain
assert 'ed25519/user@example.com' in plain
assert '(no key)' in plain
def test_badsig_shows_cross(self) -> None:
"""A badsig entry shows a cross mark, identity and failure hint."""
text = _format_attestation('badsig:ed25519/user@example.com')
assert text is not None
plain = text.plain
assert '\u2718' in plain # ✘
assert 'ed25519/user@example.com' in plain
assert '(signature failed)' in plain
def test_multiple_attestors_comma_separated(self) -> None:
"""Multiple attestors are comma-separated in the output."""
text = _format_attestation(
'signed:DKIM/kernel.org;nokey:ed25519/dev@example.com'
)
assert text is not None
plain = text.plain
assert ', ' in plain
assert 'DKIM/kernel.org' in plain
assert 'ed25519/dev@example.com' in plain
def test_unknown_status_shown_as_is(self) -> None:
"""An unrecognised status entry is shown verbatim."""
text = _format_attestation('mystery:foo/bar')
assert text is not None
assert 'mystery:foo/bar' in text.plain
def test_entry_without_colon_shown_as_is(self) -> None:
"""An entry with no colon separator is shown verbatim."""
text = _format_attestation('weirdvalue')
assert text is not None
assert 'weirdvalue' in text.plain