blob: 6e518fc34d4781bd625e28075a3463f6f8ac8a1e [file] [edit]
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2020 by the Linux Foundation
import email.message
from typing import List, Optional, Tuple
from unittest import mock
import b4
# ---------------------------------------------------------------------------
# Helpers for building synthetic EmailMessage objects
# ---------------------------------------------------------------------------
def _make_msg(
msgid: str,
subject: str,
from_addr: str = 'Test Author <test@example.com>',
date: str = 'Mon, 23 Mar 2026 12:00:00 +0000',
in_reply_to: Optional[str] = None,
references: Optional[str] = None,
body: str = 'Hello\n',
) -> email.message.EmailMessage:
msg = email.message.EmailMessage()
msg['Message-ID'] = f'<{msgid}>'
msg['Subject'] = subject
msg['From'] = from_addr
msg['Date'] = date
if in_reply_to:
msg['In-Reply-To'] = f'<{in_reply_to}>'
if references:
msg['References'] = references
msg.set_payload(body, 'utf-8')
return msg
# ===========================================================================
# parse_msgid tests
# ===========================================================================
class TestParseMsgid:
def test_bare_msgid(self) -> None:
assert (
b4.parse_msgid('20260323041505.2088-1-user@example.com')
== '20260323041505.2088-1-user@example.com'
)
def test_angle_brackets(self) -> None:
assert (
b4.parse_msgid('<20260323041505.2088-1-user@example.com>')
== '20260323041505.2088-1-user@example.com'
)
def test_lore_url(self) -> None:
result = b4.parse_msgid(
'https://lore.kernel.org/all/20260323041505.2088-1-user@example.com/'
)
assert result == '20260323041505.2088-1-user@example.com'
def test_lore_url_with_r_shorthand(self) -> None:
result = b4.parse_msgid(
'https://lore.kernel.org/r/20260323041505.2088-1-user@example.com'
)
assert result == '20260323041505.2088-1-user@example.com'
def test_lore_url_percent_encoded(self) -> None:
result = b4.parse_msgid('https://lore.kernel.org/all/abc%2Bdef@example.com/')
assert result == 'abc+def@example.com'
def test_patchwork_url(self) -> None:
result = b4.parse_msgid(
'https://patchwork.kernel.org/project/linux-mm/patch/20260323041505.2088-1-user@example.com/'
)
assert result == '20260323041505.2088-1-user@example.com'
def test_id_prefix(self) -> None:
assert (
b4.parse_msgid('id:20260323041505.2088-1-user@example.com')
== '20260323041505.2088-1-user@example.com'
)
def test_rfc822msgid_prefix(self) -> None:
assert (
b4.parse_msgid('rfc822msgid:20260323041505.2088-1-user@example.com')
== '20260323041505.2088-1-user@example.com'
)
def test_whitespace_stripped(self) -> None:
assert b4.parse_msgid(' <foo@bar.com> ') == 'foo@bar.com'
def test_generic_http_url_with_at_sign(self) -> None:
result = b4.parse_msgid('https://some.archive.org/msg/20260323.abc@example.com')
assert result == '20260323.abc@example.com'
def test_non_url_passthrough(self) -> None:
"""A bare msgid without special prefixes passes through unchanged."""
assert b4.parse_msgid('simple-msgid@host') == 'simple-msgid@host'
def test_midmask_override_for_foreign_server(self) -> None:
"""When a URL points to a different server, midmask is overridden."""
old_midmask = b4.MAIN_CONFIG.get('midmask')
b4.parse_msgid('https://other.archive.org/linux-mm/foo@bar.com/')
assert b4.MAIN_CONFIG['midmask'] == 'https://other.archive.org/linux-mm/%s'
# Restore
b4.MAIN_CONFIG['midmask'] = old_midmask
# ===========================================================================
# LoreSeries.rewrite_subject_counter tests
# ===========================================================================
class TestRewriteSubjectCounter:
def test_basic_rewrite(self) -> None:
msg = _make_msg('a@b', '[PATCH 1/1] Fix the frobnicator')
b4.LoreSeries.rewrite_subject_counter(msg, 2, 5)
assert msg['Subject'] == '[PATCH 2/5] Fix the frobnicator'
def test_preserves_rfc_prefix(self) -> None:
msg = _make_msg('a@b', '[PATCH RFC 1/1] Add new feature')
b4.LoreSeries.rewrite_subject_counter(msg, 3, 7)
assert msg['Subject'] == '[PATCH RFC 3/7] Add new feature'
def test_preserves_version(self) -> None:
msg = _make_msg('a@b', '[PATCH v3 1/2] Refactor widgets')
b4.LoreSeries.rewrite_subject_counter(msg, 1, 4)
assert msg['Subject'] == '[PATCH v3 1/4] Refactor widgets'
def test_zero_pads_counter(self) -> None:
msg = _make_msg('a@b', '[PATCH 1/1] Something')
b4.LoreSeries.rewrite_subject_counter(msg, 2, 15)
assert msg['Subject'] == '[PATCH 02/15] Something'
def test_cover_letter(self) -> None:
msg = _make_msg('a@b', '[PATCH 0/3] Cover letter subject')
b4.LoreSeries.rewrite_subject_counter(msg, 0, 5)
assert msg['Subject'] == '[PATCH 0/5] Cover letter subject'
def test_bare_subject_gets_patch_prefix(self) -> None:
msg = _make_msg('a@b', 'Fix the thing')
b4.LoreSeries.rewrite_subject_counter(msg, 1, 3)
assert msg['Subject'] == '[PATCH 1/3] Fix the thing'
# ===========================================================================
# LoreSeries.identify_cover_letter tests
# ===========================================================================
class TestIdentifyCoverLetter:
def test_cover_detected(self) -> None:
cover = _make_msg('cover@x', '[PATCH 0/2] Series title')
p1 = _make_msg('p1@x', '[PATCH 1/2] First')
p2 = _make_msg('p2@x', '[PATCH 2/2] Second')
all_msgs = [cover, p1, p2]
msgids = ['cover@x', 'p1@x', 'p2@x']
cover_msgid, patch_msgs = b4.LoreSeries.identify_cover_letter(all_msgs, msgids)
assert cover_msgid == 'cover@x'
assert len(patch_msgs) == 2
assert b4.LoreMessage.get_clean_msgid(patch_msgs[0]) == 'p1@x'
assert b4.LoreMessage.get_clean_msgid(patch_msgs[1]) == 'p2@x'
def test_no_cover(self) -> None:
p1 = _make_msg('p1@x', '[PATCH 1/2] First')
p2 = _make_msg('p2@x', '[PATCH 2/2] Second')
all_msgs = [p1, p2]
msgids = ['p1@x', 'p2@x']
cover_msgid, patch_msgs = b4.LoreSeries.identify_cover_letter(all_msgs, msgids)
assert cover_msgid is None
assert len(patch_msgs) == 2
def test_preserves_cli_order(self) -> None:
p1 = _make_msg('p1@x', '[PATCH 1/3] First')
p2 = _make_msg('p2@x', '[PATCH 2/3] Second')
p3 = _make_msg('p3@x', '[PATCH 3/3] Third')
all_msgs = [p1, p2, p3]
# Pass in reverse order
msgids = ['p3@x', 'p1@x', 'p2@x']
_, patch_msgs = b4.LoreSeries.identify_cover_letter(all_msgs, msgids)
result_ids = [b4.LoreMessage.get_clean_msgid(m) for m in patch_msgs]
assert result_ids == ['p3@x', 'p1@x', 'p2@x']
def test_child_messages_ignored(self) -> None:
"""Messages present in all_msgs but not in msgids should not appear in patch_msgs."""
p1 = _make_msg('p1@x', '[PATCH 1/2] First')
reply = _make_msg('reply@x', 'Re: [PATCH 1/2] First', in_reply_to='p1@x')
p2 = _make_msg('p2@x', '[PATCH 2/2] Second')
all_msgs = [p1, reply, p2]
msgids = ['p1@x', 'p2@x']
cover_msgid, patch_msgs = b4.LoreSeries.identify_cover_letter(all_msgs, msgids)
assert cover_msgid is None
assert len(patch_msgs) == 2
def test_inferred_counter_zero_not_cover(self) -> None:
"""A message with inferred counter 0 (e.g. just [PATCH]) should not be treated as cover."""
# A bare [PATCH] message gets counter=1, expected=1, counters_inferred=True
# but a message that somehow parses as counter=0 with inferred counters should not
# be treated as a cover letter
p1 = _make_msg('p1@x', '[PATCH] Single patch fix')
p2 = _make_msg('p2@x', '[PATCH] Another fix')
all_msgs = [p1, p2]
msgids = ['p1@x', 'p2@x']
cover_msgid, patch_msgs = b4.LoreSeries.identify_cover_letter(all_msgs, msgids)
assert cover_msgid is None
assert len(patch_msgs) == 2
# ===========================================================================
# LoreSeries.renumber_patches tests
# ===========================================================================
class TestRenumberPatches:
def test_all_explicit_correct_expected(self) -> None:
"""When all counters are explicit and expected is correct, nothing changes."""
msgs = [
_make_msg('a@x', '[PATCH 1/2] First'),
_make_msg('b@x', '[PATCH 2/2] Second'),
]
b4.LoreSeries.renumber_patches(msgs)
assert b4.LoreSubject(msgs[0]['Subject']).counter == 1
assert b4.LoreSubject(msgs[0]['Subject']).expected == 2
assert b4.LoreSubject(msgs[1]['Subject']).counter == 2
assert b4.LoreSubject(msgs[1]['Subject']).expected == 2
def test_all_explicit_wrong_expected(self) -> None:
"""When all counters are explicit but expected is wrong, fix expected only."""
msgs = [
_make_msg('a@x', '[PATCH 1/1] First'),
_make_msg('b@x', '[PATCH 2/2] Second'),
_make_msg('c@x', '[PATCH 3/3] Third'),
]
b4.LoreSeries.renumber_patches(msgs)
for msg in msgs:
assert b4.LoreSubject(msg['Subject']).expected == 3
# Counters preserved
assert b4.LoreSubject(msgs[0]['Subject']).counter == 1
assert b4.LoreSubject(msgs[1]['Subject']).counter == 2
assert b4.LoreSubject(msgs[2]['Subject']).counter == 3
def test_inferred_counters_renumbered(self) -> None:
"""When counters are inferred, renumber based on list order."""
msgs = [
_make_msg('a@x', '[PATCH] Alpha fix'),
_make_msg('b@x', '[PATCH] Beta fix'),
_make_msg('c@x', '[PATCH] Gamma fix'),
]
b4.LoreSeries.renumber_patches(msgs)
for i, msg in enumerate(msgs, 1):
subj = b4.LoreSubject(msg['Subject'])
assert subj.counter == i
assert subj.expected == 3
def test_mixed_explicit_and_inferred(self) -> None:
"""When some counters are inferred, renumber everything based on order."""
msgs = [
_make_msg('a@x', '[PATCH 1/2] Has counter'),
_make_msg('b@x', '[PATCH] No counter'),
]
b4.LoreSeries.renumber_patches(msgs)
assert b4.LoreSubject(msgs[0]['Subject']).counter == 1
assert b4.LoreSubject(msgs[0]['Subject']).expected == 2
assert b4.LoreSubject(msgs[1]['Subject']).counter == 2
assert b4.LoreSubject(msgs[1]['Subject']).expected == 2
def test_empty_list(self) -> None:
"""Empty list should be a no-op."""
b4.LoreSeries.renumber_patches([])
def test_preserves_version_during_renumber(self) -> None:
msgs = [
_make_msg('a@x', '[PATCH v2] First fix'),
_make_msg('b@x', '[PATCH v2] Second fix'),
]
b4.LoreSeries.renumber_patches(msgs)
for msg in msgs:
subj = b4.LoreSubject(msg['Subject'])
assert subj.revision == 2
assert subj.expected == 2
# ===========================================================================
# LoreSeries.rethread_messages tests
# ===========================================================================
class TestRethreadMessages:
def test_patches_threaded_to_cover(self) -> None:
cover = _make_msg('cover@x', '[PATCH 0/2] Cover')
p1 = _make_msg('p1@x', '[PATCH 1/2] First')
p2 = _make_msg('p2@x', '[PATCH 2/2] Second')
all_msgs = [cover, p1, p2]
b4.LoreSeries.rethread_messages(all_msgs, 'cover@x', {'p1@x', 'p2@x'})
# Cover has no threading headers
assert cover.get('In-Reply-To') is None
assert cover.get('References') is None
# Patches thread to cover
assert p1['In-Reply-To'] == '<cover@x>'
assert p1['References'] == '<cover@x>'
assert p2['In-Reply-To'] == '<cover@x>'
assert p2['References'] == '<cover@x>'
def test_child_messages_untouched(self) -> None:
cover = _make_msg('cover@x', '[PATCH 0/2] Cover')
p1 = _make_msg('p1@x', '[PATCH 1/2] First')
reply = _make_msg(
'reply@x', 'Re: [PATCH 1/2] First', in_reply_to='p1@x', references='<p1@x>'
)
all_msgs = [cover, p1, reply]
b4.LoreSeries.rethread_messages(all_msgs, 'cover@x', {'p1@x'})
# Reply should keep its original threading
assert reply['In-Reply-To'] == '<p1@x>'
assert reply['References'] == '<p1@x>'
def test_strips_old_threading_from_cover(self) -> None:
"""If the cover had pre-existing threading, it should be stripped."""
cover = _make_msg(
'cover@x',
'[PATCH 0/2] Cover',
in_reply_to='old-parent@x',
references='<old-parent@x>',
)
p1 = _make_msg('p1@x', '[PATCH 1/2] First')
all_msgs = [cover, p1]
b4.LoreSeries.rethread_messages(all_msgs, 'cover@x', {'p1@x'})
assert cover.get('In-Reply-To') is None
assert cover.get('References') is None
def test_replaces_old_threading_on_patches(self) -> None:
"""Patches should lose their old threading and get cover as parent."""
cover = _make_msg('cover@x', '[PATCH 0/1] Cover')
p1 = _make_msg(
'p1@x',
'[PATCH 1/1] Fix',
in_reply_to='unrelated@x',
references='<unrelated@x> <other@x>',
)
all_msgs = [cover, p1]
b4.LoreSeries.rethread_messages(all_msgs, 'cover@x', {'p1@x'})
assert p1['In-Reply-To'] == '<cover@x>'
assert p1['References'] == '<cover@x>'
def test_threads_under_first_patch_when_no_cover(self) -> None:
"""Without a cover, patches 2+ should thread under patch 1."""
p1 = _make_msg('p1@x', '[PATCH 1/2] First')
p2 = _make_msg('p2@x', '[PATCH 2/2] Second')
all_msgs = [p1, p2]
# p1 is the root, only p2 needs rethreading
b4.LoreSeries.rethread_messages(all_msgs, 'p1@x', {'p2@x'})
# p1 (root) has no threading headers
assert p1.get('In-Reply-To') is None
assert p1.get('References') is None
# p2 threads to p1
assert p2['In-Reply-To'] == '<p1@x>'
assert p2['References'] == '<p1@x>'
# ===========================================================================
# Integration: full rethread pipeline with LoreMailbox
# ===========================================================================
class TestRethreadIntegration:
@staticmethod
def _run_pipeline(
msgids: List[str], all_msgs: List[email.message.EmailMessage]
) -> Tuple[str, b4.LoreSeries]:
"""Run the rethread pipeline and feed into LoreMailbox."""
cover_msgid, all_msgs = b4.LoreSeries.rethread_series(msgids, all_msgs)
lmbx = b4.LoreMailbox()
for msg in all_msgs:
lmbx.add_message(msg)
lser = lmbx.get_series(codereview_trailers=False)
assert lser is not None
return cover_msgid, lser
def test_numbered_patches_with_cover(self) -> None:
"""Properly numbered patches with a cover letter should produce a complete series."""
cover = _make_msg(
'cover@x',
'[PATCH 0/2] Widget overhaul',
body='This series overhauls widgets.\n',
)
p1 = _make_msg(
'p1@x',
'[PATCH 1/2] Refactor widget core',
body='---\n widget.c | 10 +\n 1 file changed\n\ndiff --git a/widget.c b/widget.c\n--- a/widget.c\n+++ b/widget.c\n@@ -1 +1 @@\n-old\n+new\n',
)
p2 = _make_msg(
'p2@x',
'[PATCH 2/2] Add widget tests',
body='---\n test.c | 5 +\n 1 file changed\n\ndiff --git a/test.c b/test.c\n--- a/test.c\n+++ b/test.c\n@@ -1 +1 @@\n-old\n+new\n',
)
all_msgs = [cover, p1, p2]
msgids = ['cover@x', 'p1@x', 'p2@x']
cover_msgid, lser = self._run_pipeline(msgids, all_msgs)
assert cover_msgid == 'cover@x'
assert lser.complete
assert lser.has_cover
assert lser.expected == 2
def test_unnumbered_patches_no_cover(self) -> None:
"""Unnumbered patches without a cover should get renumbered and threaded under patch 1."""
p1 = _make_msg(
'p1@x',
'[PATCH] Add alpha feature',
body='---\n a.c | 1 +\n 1 file changed\n\ndiff --git a/a.c b/a.c\n--- a/a.c\n+++ b/a.c\n@@ -1 +1 @@\n-old\n+new\n',
)
p2 = _make_msg(
'p2@x',
'[PATCH] Add beta feature',
body='---\n b.c | 1 +\n 1 file changed\n\ndiff --git a/b.c b/b.c\n--- a/b.c\n+++ b/b.c\n@@ -1 +1 @@\n-old\n+new\n',
)
all_msgs = [p1, p2]
msgids = ['p1@x', 'p2@x']
root_msgid, lser = self._run_pipeline(msgids, all_msgs)
assert root_msgid == 'p1@x'
assert lser.complete
assert not lser.has_cover
assert lser.expected == 2
def test_followup_trailers_preserved(self) -> None:
"""Review replies should be associated with the correct patch after rethreading."""
p1 = _make_msg(
'p1@x',
'[PATCH 1/2] First patch',
body='---\n a.c | 1 +\n 1 file changed\n\ndiff --git a/a.c b/a.c\n--- a/a.c\n+++ b/a.c\n@@ -1 +1 @@\n-old\n+new\n',
)
review = _make_msg(
'rev@x',
'Re: [PATCH 1/2] First patch',
in_reply_to='p1@x',
references='<p1@x>',
from_addr='Reviewer <rev@example.com>',
body='Looks good.\n\nReviewed-by: Reviewer <rev@example.com>\n',
)
p2 = _make_msg(
'p2@x',
'[PATCH 2/2] Second patch',
body='---\n b.c | 1 +\n 1 file changed\n\ndiff --git a/b.c b/b.c\n--- a/b.c\n+++ b/b.c\n@@ -1 +1 @@\n-old\n+new\n',
)
all_msgs = [p1, review, p2]
msgids = ['p1@x', 'p2@x']
_, lser = self._run_pipeline(msgids, all_msgs)
assert lser.complete
# The review trailer should have landed on patch 1
assert lser.patches[1] is not None
has_reviewed_by = any(
t.name == 'Reviewed-by' for t in lser.patches[1].followup_trailers
)
assert has_reviewed_by
def test_wrong_expected_fixed(self) -> None:
"""Patches claiming 1/1 each should have expected fixed to match actual count."""
p1 = _make_msg(
'p1@x',
'[PATCH 1/1] First',
body='---\n a.c | 1 +\n 1 file changed\n\ndiff --git a/a.c b/a.c\n--- a/a.c\n+++ b/a.c\n@@ -1 +1 @@\n-old\n+new\n',
)
p2 = _make_msg(
'p2@x',
'[PATCH 1/1] Second',
body='---\n b.c | 1 +\n 1 file changed\n\ndiff --git a/b.c b/b.c\n--- a/b.c\n+++ b/b.c\n@@ -1 +1 @@\n-old\n+new\n',
)
all_msgs = [p1, p2]
msgids = ['p1@x', 'p2@x']
# Both say 1/1 but there are 2 patches. After renumber_series,
# expected should be 2 for both.
_, patch_msgs = b4.LoreSeries.identify_cover_letter(all_msgs, msgids)
b4.LoreSeries.renumber_patches(patch_msgs)
for msg in patch_msgs:
subj = b4.LoreSubject(msg['Subject'])
assert subj.expected == 2
# ===========================================================================
# discover_rethread_series tests (mock network calls)
# ===========================================================================
class TestDiscoverRethreadSeries:
@staticmethod
def _mock_discover(
seed_msg: email.message.EmailMessage,
search_results: List[email.message.EmailMessage],
) -> List[str]:
"""Run discover_rethread_series with mocked network calls."""
seed_msgid = b4.LoreMessage.get_clean_msgid(seed_msg)
assert seed_msgid is not None
with (
mock.patch('b4.get_pi_thread_by_msgid', return_value=[seed_msg]),
mock.patch('b4.get_pi_search_results', return_value=search_results),
):
return b4.discover_rethread_series(seed_msgid)
def test_discovers_numbered_series(self) -> None:
"""From a single [PATCH 1/3], discover the full series."""
seed = _make_msg('p1@x', '[PATCH 1/3] First fix')
p2 = _make_msg('p2@x', '[PATCH 2/3] Second fix')
p3 = _make_msg('p3@x', '[PATCH 3/3] Third fix')
result = self._mock_discover(seed, [seed, p2, p3])
assert result == ['p1@x', 'p2@x', 'p3@x']
def test_discovers_cover_letter(self) -> None:
"""If a cover letter exists, it should be found and listed first."""
seed = _make_msg('p1@x', '[PATCH 1/2] First fix')
cover = _make_msg('c@x', '[PATCH 0/2] Series title')
p2 = _make_msg('p2@x', '[PATCH 2/2] Second fix')
result = self._mock_discover(seed, [cover, seed, p2])
assert result == ['c@x', 'p1@x', 'p2@x']
def test_filters_different_expected(self) -> None:
"""Patches with a different expected count should be excluded."""
seed = _make_msg('p1@x', '[PATCH 1/3] My fix')
match = _make_msg('p2@x', '[PATCH 2/3] My fix part 2')
other = _make_msg('o@x', '[PATCH 1/5] Unrelated series')
result = self._mock_discover(seed, [seed, match, other])
assert 'o@x' not in result
assert result == ['p1@x', 'p2@x']
def test_filters_different_revision(self) -> None:
"""Patches with a different version should be excluded."""
seed = _make_msg('p1@x', '[PATCH v2 1/2] Fix widget')
match = _make_msg('p2@x', '[PATCH v2 2/2] Fix gadget')
v1 = _make_msg('old@x', '[PATCH v1 1/2] Fix widget')
result = self._mock_discover(seed, [seed, match, v1])
assert 'old@x' not in result
assert result == ['p1@x', 'p2@x']
def test_filters_replies(self) -> None:
"""Reply messages should be excluded."""
seed = _make_msg('p1@x', '[PATCH 1/2] First fix')
p2 = _make_msg('p2@x', '[PATCH 2/2] Second fix')
reply = _make_msg('r@x', 'Re: [PATCH 1/2] First fix')
result = self._mock_discover(seed, [seed, p2, reply])
assert 'r@x' not in result
assert result == ['p1@x', 'p2@x']
def test_bare_patch_discovery(self) -> None:
"""Bare [PATCH] messages should discover other bare [PATCH] messages."""
seed = _make_msg('p1@x', '[PATCH] Alpha fix')
p2 = _make_msg('p2@x', '[PATCH] Beta fix')
numbered = _make_msg('n@x', '[PATCH 1/3] Numbered patch')
result = self._mock_discover(seed, [seed, p2, numbered])
assert 'n@x' not in result
assert set(result) == {'p1@x', 'p2@x'}
def test_no_results_returns_seed_only(self) -> None:
"""If search returns nothing, return only the seed msgid."""
seed = _make_msg('p1@x', '[PATCH 1/2] Fix')
seed_msgid = b4.LoreMessage.get_clean_msgid(seed)
assert seed_msgid is not None
with (
mock.patch('b4.get_pi_thread_by_msgid', return_value=[seed]),
mock.patch('b4.get_pi_search_results', return_value=None),
):
result = b4.discover_rethread_series(seed_msgid)
assert result == ['p1@x']