blob: 2e59347cee9182f572f8785c682a2a8e1d238ea2 [file] [edit]
import email
import email.message
import email.parser
import email.policy
import email.utils
import io
import os
import pathlib
import socket
from typing import Any, Dict, List, Literal, Optional, Tuple
import pytest
import b4
@pytest.mark.parametrize(
'source,expected',
[
('good-valid-trusted', (True, True, True, 'B6C41CE35664996C', '1623274836')),
('good-valid-notrust', (True, True, False, 'B6C41CE35664996C', '1623274836')),
('good-invalid-notrust', (True, False, False, 'B6C41CE35664996C', None)),
('badsig', (False, False, False, 'B6C41CE35664996C', None)),
('no-pubkey', (False, False, False, None, None)),
],
)
def test_check_gpg_status(
sampledir: str,
source: str,
expected: Tuple[bool, bool, bool, Optional[str], Optional[str]],
) -> None:
with open(f'{sampledir}/gpg-{source}.txt', 'r') as fh:
status = fh.read()
assert b4.check_gpg_status(status) == expected
@pytest.mark.parametrize(
'source,regex,flags,ismbox',
[
(None, r'^From git@z ', 0, False),
(None, r'\n\nFrom git@z ', 0, False),
('save-7bit-clean', r'From: Unicôdé', 0, True),
# mailbox.mbox does not properly handle 8bit-clean headers
('save-8bit-clean', r'From: Unicôdé', 0, False),
],
)
def test_save_git_am_mbox(
sampledir: Optional[str],
tmp_path: pathlib.Path,
source: Optional[str],
regex: str,
flags: int,
ismbox: bool,
) -> None:
import re
msgs: List[email.message.EmailMessage]
if source is not None:
if ismbox:
msgs = b4.get_msgs_from_mailbox_or_maildir(f'{sampledir}/{source}.txt')
else:
with open(f'{sampledir}/{source}.txt', 'rb') as fh:
msg = email.parser.BytesParser(
policy=b4.emlpolicy, _class=email.message.EmailMessage
).parse(fh)
msgs = [msg]
else:
msgs = list()
for x in range(0, 3):
msg = email.message.EmailMessage()
msg.set_payload(f'Hello world {x}\n')
msg['Subject'] = f'Hello world {x}'
msg['From'] = f'Me{x} <me{x}@foo.bar>'
msgs.append(msg)
dest = os.path.join(tmp_path, 'out')
with open(dest, 'wb') as fh:
b4.save_git_am_mbox(msgs, fh)
with open(dest, 'r') as fh:
res = fh.read()
assert re.search(regex, res, flags=flags)
def _msgid_domain(msgid: str) -> str:
return msgid.strip('<>').rsplit('@', maxsplit=1)[1]
def test_make_msgid_avoids_host_domain_by_default() -> None:
stdlib_msgid = email.utils.make_msgid()
b4_msgid = b4.make_msgid(idstring='b4-test')
assert _msgid_domain(stdlib_msgid) == socket.getfqdn()
assert _msgid_domain(b4_msgid) == 'b4'
assert _msgid_domain(b4_msgid) != socket.getfqdn()
def test_make_msgid_custom_cmd_opt_in(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setitem(
b4.MAIN_CONFIG, 'custom-msgid-cmd', 'echo custom-1234@example.com'
)
# The custom command is only consulted when explicitly allowed.
assert b4.make_msgid(allow_custom_msgid_cmd=True) == '<custom-1234@example.com>'
# Without the opt-in, the built-in id is used and the command is ignored.
assert _msgid_domain(b4.make_msgid(idstring='b4-review')) == 'b4'
def test_make_msgid_custom_cmd_preserves_brackets(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setitem(
b4.MAIN_CONFIG, 'custom-msgid-cmd', 'echo <wrapped-5678@example.com>'
)
assert b4.make_msgid(allow_custom_msgid_cmd=True) == '<wrapped-5678@example.com>'
def test_make_msgid_custom_cmd_list_uses_first(
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Config options defined multiple times arrive as a list; use the first.
monkeypatch.setitem(
b4.MAIN_CONFIG,
'custom-msgid-cmd',
['echo first@example.com', 'echo second@example.com'],
)
assert b4.make_msgid(allow_custom_msgid_cmd=True) == '<first@example.com>'
@pytest.mark.parametrize(
'cmdstr',
[
None, # unset
'false', # command fails
'true', # command succeeds but produces no output
],
)
def test_make_msgid_custom_cmd_falls_back(
monkeypatch: pytest.MonkeyPatch, cmdstr: Optional[str]
) -> None:
monkeypatch.setitem(b4.MAIN_CONFIG, 'custom-msgid-cmd', cmdstr)
msgid = b4.make_msgid(idstring='b4-ty', allow_custom_msgid_cmd=True)
assert _msgid_domain(msgid) == 'b4'
assert msgid.endswith('.b4-ty@b4>')
@pytest.mark.parametrize(
'source,expected',
[
(
'trailers-test-simple',
[
('person', 'Reported-by', '"Doe, Jane" <jane@example.com>', None),
('person', 'Reviewed-by', 'Bogus Bupkes <bogus@example.com>', None),
('utility', 'Fixes', 'abcdef01234567890', None),
('utility', 'Link', 'https://msgid.link/some@msgid.here', None),
],
),
(
'trailers-test-extinfo',
[
('person', 'Reported-by', 'Some, One <somewhere@example.com>', None),
(
'person',
'Reviewed-by',
'Bogus Bupkes <bogus@example.com>',
'[for the parts that are bogus]',
),
('utility', 'Fixes', 'abcdef01234567890', None),
(
'person',
'Tested-by',
'Some Person <bogus2@example.com>',
' [this person visually indented theirs]',
),
(
'utility',
'Link',
'https://msgid.link/some@msgid.here',
' # initial submission',
),
(
'person',
'Signed-off-by',
'Wrapped Persontrailer <broken@example.com>',
None,
),
],
),
],
)
def test_parse_trailers(
sampledir: str, source: str, expected: List[Tuple[str, str, str, Optional[str]]]
) -> None:
msgs = b4.get_msgs_from_mailbox_or_maildir(f'{sampledir}/{source}.txt')
for msg in msgs:
lmsg = b4.LoreMessage(msg)
_, _, trs, _, _ = b4.LoreMessage.get_body_parts(lmsg.body)
assert len(expected) == len(trs)
for tr in trs:
mytype, myname, myvalue, myextinfo = expected.pop(0)
assert tr.name == myname
assert tr.value == myvalue
assert tr.extinfo == myextinfo
assert tr.type == mytype
mytr = b4.LoreTrailer(name=myname, value=myvalue, extinfo=myextinfo)
assert tr == mytr
assert tr.extinfo == mytr.extinfo
@pytest.mark.parametrize(
'name,value,exp_type,exp_addr,exp_value',
[
# Simple name
(
'Signed-off-by',
'Simple Name <simple@example.com>',
'person',
('Simple Name', 'simple@example.com'),
'Simple Name <simple@example.com>',
),
# Double quotes in display name must be preserved
(
'Signed-off-by',
'Jane "JD" Doe <jd@example.com>',
'person',
('Jane "JD" Doe', 'jd@example.com'),
'Jane "JD" Doe <jd@example.com>',
),
# Outer RFC 2822 quotes around a name with comma
(
'Reported-by',
'"Doe, Jane" <jane@example.com>',
'person',
('"Doe, Jane"', 'jane@example.com'),
'"Doe, Jane" <jane@example.com>',
),
# Comma in name without quotes
(
'Reported-by',
'Some, One <somewhere@example.com>',
'person',
('Some, One', 'somewhere@example.com'),
'Some, One <somewhere@example.com>',
),
# Parentheses in display name
(
'Tested-by',
'Developer Foo (EXAMPLECORP) <dev@example.com>',
'person',
('Developer Foo (EXAMPLECORP)', 'dev@example.com'),
'Developer Foo (EXAMPLECORP) <dev@example.com>',
),
# Bare angle-bracket email
(
'Cc',
'<bare@example.com>',
'person',
('', 'bare@example.com'),
'bare@example.com',
),
# Bare email without angle brackets
(
'Cc',
'bare@example.com',
'person',
('', 'bare@example.com'),
'bare@example.com',
),
],
)
def test_trailer_addr_parsing(
name: str, value: str, exp_type: str, exp_addr: Tuple[str, str], exp_value: str
) -> None:
tr = b4.LoreTrailer(name=name, value=value)
assert tr.type == exp_type
assert tr.addr == exp_addr
assert tr.value == exp_value
@pytest.mark.parametrize(
'source,serargs,amargs,reference,b4cfg',
[
('single', {}, {}, 'defaults', {}),
('single', {}, {'noaddtrailers': True}, 'noadd', {}),
('single', {}, {'addmysob': True}, 'addmysob', {}),
('single', {}, {'addmysob': True, 'copyccs': True}, 'copyccs', {}),
('single', {}, {'addmysob': True, 'addlink': True}, 'addlink', {}),
(
'single',
{},
{'addmysob': True, 'addlink': True},
'addmsgid',
{'linktrailermask': 'Message-ID: <%s>'},
),
(
'single',
{},
{'addmysob': True, 'copyccs': True},
'ordered',
{'trailer-order': 'Cc,Tested*,Reviewed*,*'},
),
('single', {'sloppytrailers': True}, {'addmysob': True}, 'sloppy', {}),
('with-cover', {}, {'addmysob': True}, 'defaults', {}),
('with-cover', {}, {'addmysob': True, 'addlink': True}, 'addlink', {}),
('custody', {}, {'addmysob': True, 'copyccs': True}, 'unordered', {}),
(
'custody',
{},
{'addmysob': True, 'copyccs': True},
'ordered',
{'trailer-order': 'Cc,Fixes*,Link*,Suggested*,Reviewed*,Tested*,*'},
),
(
'custody',
{},
{'addmysob': True, 'copyccs': True},
'with-ignored',
{'trailers-ignore-from': 'followup-reviewer1@example.com'},
),
('partial-reroll', {}, {'addmysob': True}, 'defaults', {}),
('nore', {}, {}, 'defaults', {}),
('non-git-patch', {}, {}, 'defaults', {}),
('non-git-patch-with-comments', {}, {}, 'defaults', {}),
('with-diffstat', {}, {}, 'defaults', {}),
('name-parens', {}, {}, 'defaults', {}),
('bare-address', {}, {}, 'defaults', {}),
('stripped-lines', {}, {}, 'defaults', {}),
('htmljunk', {}, {}, 'defaults', {}),
],
)
def test_followup_trailers(
sampledir: str,
source: str,
serargs: Dict[str, Any],
amargs: Dict[str, Any],
reference: str,
b4cfg: Dict[str, Any],
) -> None:
b4.MAIN_CONFIG.update(b4cfg)
lmbx = b4.LoreMailbox()
for msg in b4.get_msgs_from_mailbox_or_maildir(
f'{sampledir}/trailers-followup-{source}.mbox'
):
lmbx.add_message(msg)
lser = lmbx.get_series(**serargs)
assert lser is not None
amsgs = lser.get_am_ready(**amargs)
ifh = io.BytesIO()
b4.save_git_am_mbox(amsgs, ifh)
with open(f'{sampledir}/trailers-followup-{source}-ref-{reference}.txt', 'r') as fh:
assert ifh.getvalue().decode() == fh.read()
@pytest.mark.parametrize(
'hval,verify,tr',
[
('short-ascii', 'short-ascii', 'encode'),
('short-unicôde', '=?utf-8?q?short-unic=C3=B4de?=', 'encode'),
# Long ascii
(
(
'Lorem ipsum dolor sit amet consectetur adipiscing elit '
'sed do eiusmod tempor incididunt ut labore et dolore magna aliqua'
),
(
'Lorem ipsum dolor sit amet consectetur adipiscing elit sed do\n'
' eiusmod tempor incididunt ut labore et dolore magna aliqua'
),
'encode',
),
# Long unicode
(
(
'Lorem îpsum dolor sit amet consectetur adipiscing elît '
'sed do eiusmod tempôr incididunt ut labore et dolôre magna aliqua'
),
(
'=?utf-8?q?Lorem_=C3=AEpsum_dolor_sit_amet_consectetur_adipiscin?=\n'
' =?utf-8?q?g_el=C3=AEt_sed_do_eiusmod_temp=C3=B4r_incididunt_ut_labore_et?=\n'
' =?utf-8?q?_dol=C3=B4re_magna_aliqua?='
),
'encode',
),
# Exactly 75 long
(
'Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu',
'Lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiu',
'encode',
),
# Unicode that breaks on escape boundary
(
'Lorem ipsum dolor sit amet consectetur adipiscin elît',
'=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipiscin_el?=\n =?utf-8?q?=C3=AEt?=',
'encode',
),
# Unicode that's just 1 too long
(
'Lorem ipsum dolor sit amet consectetur adipi elît',
'=?utf-8?q?Lorem_ipsum_dolor_sit_amet_consectetur_adipi_el=C3=AE?=\n =?utf-8?q?t?=',
'encode',
),
# A single address
('foo@example.com', 'foo@example.com', 'encode'),
# Two addresses
(
'foo@example.com, bar@example.com',
'foo@example.com, bar@example.com',
'encode',
),
# Mixed addresses
(
'foo@example.com, Foo Bar <bar@example.com>',
'foo@example.com, Foo Bar <bar@example.com>',
'encode',
),
# Mixed Unicode
(
'foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>',
'foo@example.com, Foo Bar <bar@example.com>, \n =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>',
'encode',
),
(
'foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quux, Foo" <quux@example.com>',
(
'foo@example.com, Foo Bar <bar@example.com>, \n'
' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, "Quux, Foo" <quux@example.com>'
),
'encode',
),
(
'01234567890123456789012345678901234567890123456789012345678901@example.org, ä <foo@example.org>',
(
'01234567890123456789012345678901234567890123456789012345678901@example.org, \n'
' =?utf-8?q?=C3=A4?= <foo@example.org>'
),
'encode',
),
# Test for https://github.com/python/cpython/issues/100900
(
'foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quûx, Foo" <quux@example.com>',
(
'foo@example.com, Foo Bar <bar@example.com>, \n'
' =?utf-8?q?F=C3=B4o_Baz?= <baz@example.com>, \n =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>'
),
'encode',
),
# Test preserve
(
'foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, "Quûx, Foo" <quux@example.com>',
'foo@example.com, Foo Bar <bar@example.com>, Fôo Baz <baz@example.com>, \n "Quûx, Foo" <quux@example.com>',
'preserve',
),
# Test decode
(
'foo@example.com, Foo Bar <bar@example.com>, =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>',
'foo@example.com, Foo Bar <bar@example.com>, \n "Quûx, Foo" <quux@example.com>',
'decode',
),
# Test short message-id
(
'Message-ID: <20240319-short-message-id@example.com>',
'<20240319-short-message-id@example.com>',
'encode',
),
# Test long message-id
(
'Message-ID: <20240319-very-long-message-id-that-spans-multiple-lines-for-sure-because-longer-than-75-characters-abcde123456@longdomain.example.com>',
'<20240319-very-long-message-id-that-spans-multiple-lines-for-sure-because-longer-than-75-characters-abcde123456@longdomain.example.com>',
'encode',
),
],
)
def test_header_wrapping(
sampledir: str, hval: str, verify: str, tr: Literal['encode', 'decode', 'preserve']
) -> None:
if ':' in hval:
chunks = hval.split(':', maxsplit=1)
hname = chunks[0].strip()
hval = chunks[1].strip()
else:
hname = 'To' if '@' in hval else 'X-Header'
wrapped = b4.LoreMessage.wrap_header((hname, hval), transform=tr)
assert wrapped.decode() == f'{hname}: {verify}'
_wname, wval = wrapped.split(b':', maxsplit=1)
if tr != 'decode':
cval = b4.LoreMessage.clean_header(wval.decode())
assert cval == hval
@pytest.mark.parametrize(
'pairs,verify,clean',
[
(
[('', 'foo@example.com'), ('Foo Bar', 'bar@example.com')],
'foo@example.com, Foo Bar <bar@example.com>',
True,
),
(
[('', 'foo@example.com'), ('Foo, Bar', 'bar@example.com')],
'foo@example.com, "Foo, Bar" <bar@example.com>',
True,
),
(
[('', 'foo@example.com'), ('Fôo, Bar', 'bar@example.com')],
'foo@example.com, "Fôo, Bar" <bar@example.com>',
True,
),
(
[
('', 'foo@example.com'),
('=?utf-8?q?Qu=C3=BBx_Foo?=', 'quux@example.com'),
],
'foo@example.com, Quûx Foo <quux@example.com>',
True,
),
(
[
('', 'foo@example.com'),
('=?utf-8?q?Qu=C3=BBx=2C_Foo?=', 'quux@example.com'),
],
'foo@example.com, "Quûx, Foo" <quux@example.com>',
True,
),
(
[
('', 'foo@example.com'),
('=?utf-8?q?Qu=C3=BBx=2C_Foo?=', 'quux@example.com'),
],
'foo@example.com, =?utf-8?q?Qu=C3=BBx=2C_Foo?= <quux@example.com>',
False,
),
# Pre-quoted display name with special chars must not be double-quoted
(
[('', 'foo@example.com'), ('"Example.org Tools"', 'tools@example.org')],
'foo@example.com, "Example.org Tools" <tools@example.org>',
True,
),
(
[('', 'foo@example.com'), ('"Doe, Jane"', 'jane@example.com')],
'foo@example.com, "Doe, Jane" <jane@example.com>',
True,
),
# Unquoted name with internal quotes
(
[('', 'foo@example.com'), ('Jane "JD" Doe', 'jd@example.com')],
'foo@example.com, "Jane \\"JD\\" Doe" <jd@example.com>',
True,
),
# Name starting with quote but not fully quoted
(
[('', 'foo@example.com'), ('"JD" Doe', 'jd@example.com')],
'foo@example.com, "\\"JD\\" Doe" <jd@example.com>',
True,
),
# Pre-quoted name with internal quotes
(
[('', 'foo@example.com'), ('"Jane "JD" Doe"', 'jd@example.com')],
'foo@example.com, "Jane \\"JD\\" Doe" <jd@example.com>',
True,
),
],
)
def test_format_addrs(pairs: List[Tuple[str, str]], verify: str, clean: bool) -> None:
formatted = b4.format_addrs(pairs, clean)
assert formatted == verify
@pytest.mark.parametrize(
'intrange,upper,expected',
[
('1-3', 5, [1, 2, 3]),
('-1', 5, [5]),
('1,3-5', 5, [1, 3, 4, 5]),
('1', 5, [1]),
('3', 5, [3]),
('5', 5, [5]),
('1,3,4-', 6, [1, 3, 4, 5, 6]),
('1-3,5,-1', 7, [1, 2, 3, 5, 7]),
('-7', 5, []),
('1-8', 3, [1, 2, 3]),
],
)
def test_parse_int_range(intrange: str, upper: int, expected: List[int]) -> None:
assert list(b4.parse_int_range(intrange, upper)) == expected
@pytest.mark.parametrize(
'body_link,extra_link,expect_count',
[
# Exact same URL — should dedup to one
(
'https://patch.msgid.link/20240101-test-v1-1-abc123@example.com',
'https://patch.msgid.link/20240101-test-v1-1-abc123@example.com',
1,
),
# Same URL, different case — should still dedup
(
'https://patch.msgid.link/20240101-TEST-V1-1-ABC123@example.com',
'https://patch.msgid.link/20240101-test-v1-1-abc123@example.com',
1,
),
# Different domains, same message-id — should dedup to one
(
'https://lore.kernel.org/r/20240101-test-v1-1-abc123@example.com',
'https://patch.msgid.link/20240101-test-v1-1-abc123@example.com',
1,
),
# URL-encoded message-id — should match decoded form
(
'https://lore.kernel.org/r/20240101-test-v1-1-abc123%40example.com',
'https://patch.msgid.link/20240101-test-v1-1-abc123@example.com',
1,
),
# Different message-ids — both should survive
(
'https://lore.kernel.org/r/20240101-foo-v1-1-aaa@example.com',
'https://patch.msgid.link/20240101-bar-v1-1-bbb@example.com',
2,
),
],
)
def test_link_trailer_dedup(body_link: str, extra_link: str, expect_count: int) -> None:
"""Link: trailers already in the body should not be duplicated by extras."""
raw = (
f'From: Test Author <test@example.com>\n'
f'Subject: [PATCH] test link dedup\n'
f'Date: Mon, 1 Jan 2024 00:00:00 +0000\n'
f'Message-Id: <20240101-test-v1-1-abc123@example.com>\n'
f'\n'
f'Commit body here.\n'
f'\n'
f'Signed-off-by: Test Author <test@example.com>\n'
f'Link: {body_link}\n'
)
msg = email.message_from_string(raw, policy=email.policy.EmailPolicy(utf8=True))
lmsg = b4.LoreMessage(msg)
extra = b4.LoreTrailer(name='Link', value=extra_link)
lmsg.fix_trailers(extras=[extra])
# Count Link: trailers in the result
_, _, trailers, _, _ = b4.LoreMessage.get_body_parts(lmsg.body)
link_trailers = [t for t in trailers if t.lname == 'link']
assert len(link_trailers) == expect_count
class TestTakeFlow:
"""Simulate the 'take' flow using the actual code path: build email
messages (as if fetched from lore), feed through LoreMailbox →
LoreSeries → get_am_ready(addlink=True) → git am.
No network access — messages are constructed in-memory.
"""
@staticmethod
def _make_patch_msg(
msgid: str,
subject: str,
body: str,
diff: str,
counter: int = 1,
expected: int = 1,
in_reply_to: Optional[str] = None,
) -> email.message.EmailMessage:
"""Build a realistic patch email like what lore returns.
The *body* should contain the full commit message including
trailers (Signed-off-by, Link, etc.) — just like a real patch
email from a mailing list.
"""
if expected > 1:
prefix = f'[PATCH {counter}/{expected}]'
else:
prefix = '[PATCH]'
raw = (
f'From: Test Author <test@example.com>\n'
f'Subject: {prefix} {subject}\n'
f'Date: Mon, 1 Jan 2024 00:00:00 +0000\n'
f'Message-Id: <{msgid}>\n'
)
if in_reply_to:
raw += f'In-Reply-To: <{in_reply_to}>\n'
raw += f'References: <{in_reply_to}>\n'
raw += f'\n{body}\n---\n{diff}\n'
return email.message_from_string(
raw, policy=email.policy.EmailPolicy(utf8=True)
)
@staticmethod
def _make_reply_msg(
msgid: str,
in_reply_to: str,
from_name: str,
from_email: str,
trailer_lines: List[str],
) -> email.message.EmailMessage:
"""Build a followup reply with trailers."""
trailers = '\n'.join(trailer_lines)
raw = (
f'From: {from_name} <{from_email}>\n'
f'Subject: Re: [PATCH] test\n'
f'Date: Mon, 1 Jan 2024 01:00:00 +0000\n'
f'Message-Id: <{msgid}>\n'
f'In-Reply-To: <{in_reply_to}>\n'
f'References: <{in_reply_to}>\n'
f'\n'
f'> Some quoted text\n'
f'\n'
f'{trailers}\n'
)
return email.message_from_string(
raw, policy=email.policy.EmailPolicy(utf8=True)
)
def test_link_dedup_with_followups(self, gitdir: str) -> None:
"""Patch already has Link: in body, get_am_ready(addlink=True)
should not duplicate it. Followup trailers should be added."""
patch_msgid = '20240101-widget-v1-1-abc123@example.com'
link_url = f'https://patch.msgid.link/{patch_msgid}'
patch_msg = self._make_patch_msg(
msgid=patch_msgid,
subject='Add widget support',
body=(
'This adds a fancy widget.\n'
'\n'
'Signed-off-by: Test Author <test@example.com>\n'
f'Link: {link_url}\n'
),
diff=(
' file1.txt | 1 +\n'
' 1 file changed, 1 insertion(+)\n'
'\n'
'diff --git a/file1.txt b/file1.txt\n'
'index b352682..6713e9f 100644\n'
'--- a/file1.txt\n'
'+++ b/file1.txt\n'
'@@ -1,3 +1,4 @@\n'
' This is file 1.\n'
' It has a single line.\n'
' This is a second line I added.\n'
'+widget\n'
),
)
reply_msg = self._make_reply_msg(
msgid='reply-1@example.com',
in_reply_to=patch_msgid,
from_name='Reviewer One',
from_email='reviewer@example.com',
trailer_lines=[
'Reviewed-by: Reviewer One <reviewer@example.com>',
],
)
reply_msg2 = self._make_reply_msg(
msgid='reply-2@example.com',
in_reply_to=patch_msgid,
from_name='Acker Two',
from_email='acker@example.com',
trailer_lines=[
'Acked-by: Acker Two <acker@example.com>',
],
)
# Feed through LoreMailbox → LoreSeries (actual take code path)
lmbx = b4.LoreMailbox()
for msg in [patch_msg, reply_msg, reply_msg2]:
lmbx.add_message(msg)
lser = lmbx.get_series()
assert lser is not None
am_msgs = lser.get_am_ready(addlink=True)
assert len(am_msgs) == 1
# Apply to master via git am
ifh = io.BytesIO()
b4.save_git_am_mbox(am_msgs, ifh)
ecode, out = b4.git_run_command(gitdir, ['am'], stdin=ifh.getvalue())
assert ecode == 0, f'git am failed: {out}'
ecode, result = b4.git_run_command(gitdir, ['log', '-1', '--format=%B'])
assert ecode == 0
# Exactly one Link: trailer, not two
assert result.count(f'Link: {link_url}') == 1, (
f'Duplicate Link: found:\n{result}'
)
# Followup trailers applied
assert 'Reviewed-by: Reviewer One <reviewer@example.com>' in result
assert 'Acked-by: Acker Two <acker@example.com>' in result
def test_link_added_when_not_present(self, gitdir: str) -> None:
"""Patch without Link: should get one added by addlink=True."""
patch_msgid = '20240101-cursor-v1-1-def456@example.com'
patch_msg = self._make_patch_msg(
msgid=patch_msgid,
subject='Fix cursor rendering',
body=(
'This fixes a cursor bug.\n'
'\n'
'Signed-off-by: Test Author <test@example.com>\n'
),
diff=(
' file1.txt | 1 +\n'
' 1 file changed, 1 insertion(+)\n'
'\n'
'diff --git a/file1.txt b/file1.txt\n'
'index b352682..e147dad 100644\n'
'--- a/file1.txt\n'
'+++ b/file1.txt\n'
'@@ -1,3 +1,4 @@\n'
' This is file 1.\n'
' It has a single line.\n'
' This is a second line I added.\n'
'+cursor fix\n'
),
)
lmbx = b4.LoreMailbox()
lmbx.add_message(patch_msg)
lser = lmbx.get_series()
assert lser is not None
am_msgs = lser.get_am_ready(addlink=True)
assert len(am_msgs) == 1
ifh = io.BytesIO()
b4.save_git_am_mbox(am_msgs, ifh)
ecode, out = b4.git_run_command(gitdir, ['am'], stdin=ifh.getvalue())
assert ecode == 0, f'git am failed: {out}'
ecode, result = b4.git_run_command(gitdir, ['log', '-1', '--format=%B'])
assert ecode == 0
expected_link = f'https://patch.msgid.link/{patch_msgid}'
assert f'Link: {expected_link}' in result
assert result.count('Link:') == 1
def test_followup_trailers_without_addlink(self, gitdir: str) -> None:
"""Followups should be applied even with addlink=False."""
patch_msgid = '20240101-verifier-v1-1-789abc@example.com'
patch_msg = self._make_patch_msg(
msgid=patch_msgid,
subject='Refactor verifier',
body=(
'Clean up the verifier logic.\n'
'\n'
'Signed-off-by: Test Author <test@example.com>\n'
),
diff=(
' file1.txt | 1 +\n'
' 1 file changed, 1 insertion(+)\n'
'\n'
'diff --git a/file1.txt b/file1.txt\n'
'index b352682..6a8b771 100644\n'
'--- a/file1.txt\n'
'+++ b/file1.txt\n'
'@@ -1,3 +1,4 @@\n'
' This is file 1.\n'
' It has a single line.\n'
' This is a second line I added.\n'
'+verifier\n'
),
)
reply_msg = self._make_reply_msg(
msgid='reply-v-1@example.com',
in_reply_to=patch_msgid,
from_name='Alice Author',
from_email='alice@example.com',
trailer_lines=[
'Reviewed-by: Alice Author <alice@example.com>',
'Tested-by: Alice Author <alice@example.com>',
],
)
lmbx = b4.LoreMailbox()
for msg in [patch_msg, reply_msg]:
lmbx.add_message(msg)
lser = lmbx.get_series()
assert lser is not None
am_msgs = lser.get_am_ready(addlink=False)
assert len(am_msgs) == 1
ifh = io.BytesIO()
b4.save_git_am_mbox(am_msgs, ifh)
ecode, out = b4.git_run_command(gitdir, ['am'], stdin=ifh.getvalue())
assert ecode == 0, f'git am failed: {out}'
ecode, result = b4.git_run_command(gitdir, ['log', '-1', '--format=%B'])
assert ecode == 0
assert 'Reviewed-by: Alice Author <alice@example.com>' in result
assert 'Tested-by: Alice Author <alice@example.com>' in result
assert 'Link:' not in result
def test_different_link_domains_same_msgid_deduped(self, gitdir: str) -> None:
"""If the patch body has a lore.kernel.org Link: and addlink
generates a patch.msgid.link one for the same message-id,
only the original should survive (dedup by message-id)."""
patch_msgid = '20240101-drm-v1-1-aabbcc@example.com'
lore_link = f'https://lore.kernel.org/r/{patch_msgid}'
patch_msg = self._make_patch_msg(
msgid=patch_msgid,
subject='Fix DRM issue',
body=(
'Fix the DRM subsystem.\n'
'\n'
'Signed-off-by: Test Author <test@example.com>\n'
f'Link: {lore_link}\n'
),
diff=(
' file1.txt | 1 +\n'
' 1 file changed, 1 insertion(+)\n'
'\n'
'diff --git a/file1.txt b/file1.txt\n'
'index b352682..4a2161b 100644\n'
'--- a/file1.txt\n'
'+++ b/file1.txt\n'
'@@ -1,3 +1,4 @@\n'
' This is file 1.\n'
' It has a single line.\n'
' This is a second line I added.\n'
'+drm fix\n'
),
)
lmbx = b4.LoreMailbox()
lmbx.add_message(patch_msg)
lser = lmbx.get_series()
assert lser is not None
am_msgs = lser.get_am_ready(addlink=True)
assert len(am_msgs) == 1
ifh = io.BytesIO()
b4.save_git_am_mbox(am_msgs, ifh)
ecode, out = b4.git_run_command(gitdir, ['am'], stdin=ifh.getvalue())
assert ecode == 0, f'git am failed: {out}'
ecode, result = b4.git_run_command(gitdir, ['log', '-1', '--format=%B'])
assert ecode == 0
# Same message-id in both URLs, so deduped to one Link:
assert lore_link in result
assert result.count('Link:') == 1
@pytest.mark.parametrize(
'subject,extras,expected',
[
('[PATCH] This is a patch', None, '[PATCH] This is a patch'),
('[PATCH v3] This is a patch', None, '[PATCH v3] This is a patch'),
('[PATCH RFC v3] This is a patch', None, '[PATCH RFC v3] This is a patch'),
(
'[RFC PATCH v3 1/3] This is a patch',
None,
'[RFC PATCH v3 1/3] This is a patch',
),
(
'[RESEND PATCH v3 1/3] This is a patch',
None,
'[RESEND PATCH v3 1/3] This is a patch',
),
(
'[PATCH RFC v3 2/3] This is a patch',
['RFC'],
'[PATCH RFC v3 2/3] This is a patch',
),
(
'[PATCH RFC v3 3/12] This is a patch',
None,
'[PATCH RFC v3 03/12] This is a patch',
),
(
'[PATCH RFC v3] This is a [patch]',
['RFC'],
'[PATCH RFC v3] This is a [patch]',
),
(
'[PATCH RFC v3 2/3] This is a patch',
['netdev', 'bpf'],
'[PATCH RFC netdev bpf v3 2/3] This is a patch',
),
],
)
def test_lore_subject_prefixes(
subject: str, extras: Optional[List[str]], expected: str
) -> None:
lsubj = b4.LoreSubject(subject)
assert lsubj.get_rebuilt_subject(eprefixes=extras) == expected
class TestGetLoreNode:
"""Tests for get_lore_node() liblore integration."""
def setup_method(self) -> None:
b4.LORENODE = None
def test_uses_from_git_config(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""get_lore_node() constructs via LoreNode.from_git_config()."""
from unittest.mock import MagicMock
import liblore
mock_node = MagicMock()
mock_from_gc = MagicMock(return_value=mock_node)
monkeypatch.setattr(liblore.LoreNode, 'from_git_config', mock_from_gc)
node = b4.get_lore_node()
mock_from_gc.assert_called_once()
assert node is mock_node
def test_sets_user_agent(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""get_lore_node() calls set_user_agent with b4's identity."""
from unittest.mock import MagicMock
import liblore
mock_node = MagicMock()
monkeypatch.setattr(
liblore.LoreNode, 'from_git_config', MagicMock(return_value=mock_node)
)
b4.get_lore_node()
mock_node.set_user_agent.assert_called_once_with('b4', b4.__VERSION__)
def test_does_not_inject_session(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""get_lore_node() lets liblore own its session."""
from unittest.mock import MagicMock
import liblore
mock_node = MagicMock()
monkeypatch.setattr(
liblore.LoreNode, 'from_git_config', MagicMock(return_value=mock_node)
)
b4.get_lore_node()
mock_node.set_requests_session.assert_not_called()
def test_passes_cache_settings(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""cache_dir and cache_ttl from b4 config are passed through."""
from unittest.mock import MagicMock
import liblore
b4.MAIN_CONFIG['cache-expire'] = '5'
mock_node = MagicMock()
mock_from_gc = MagicMock(return_value=mock_node)
monkeypatch.setattr(liblore.LoreNode, 'from_git_config', mock_from_gc)
b4.get_lore_node()
call_kwargs = mock_from_gc.call_args.kwargs
assert call_kwargs['cache_ttl'] == 300
assert 'lore' in call_kwargs['cache_dir']
def test_singleton(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Repeated calls return the same LoreNode instance."""
from unittest.mock import MagicMock
import liblore
mock_node = MagicMock()
mock_from_gc = MagicMock(return_value=mock_node)
monkeypatch.setattr(liblore.LoreNode, 'from_git_config', mock_from_gc)
n1 = b4.get_lore_node()
n2 = b4.get_lore_node()
assert n1 is n2
assert mock_from_gc.call_count == 1