blob: 60a32eadae4dd734f3fbd32d3202d7bc63892679 [file] [edit]
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
"""Tests for b4._rewrite.rewrite_commit_messages() and notes migration."""
from __future__ import annotations
from contextlib import AbstractContextManager
from pathlib import Path
from typing import Any, List, Optional, Tuple, Union, cast
from unittest.mock import patch
import pygit2
import pytest
import b4
import b4._rewrite
def _commit(repo: pygit2.Repository, oid: Union[pygit2.Oid, str]) -> pygit2.Commit:
"""Helper: look up a commit and return it typed as Commit for mypy."""
return cast(pygit2.Commit, repo[oid])
SIG = pygit2.Signature('Test Author', 'test@example.com', 1700000000, 0)
BRANCH = 'refs/heads/master'
# -- Fixtures ----------------------------------------------------------------
@pytest.fixture()
def bare_repo(tmp_path: Path) -> pygit2.Repository:
"""A bare repo with HEAD symbolically pointing at refs/heads/master.
``core.logAllRefUpdates = always`` is enabled so ref updates write
reflog entries, matching the default behavior of non-bare user repos
where b4 is actually run.
"""
repo_dir = tmp_path / 'repo'
pygit2.init_repository(str(repo_dir), bare=True)
repo = pygit2.Repository(str(repo_dir))
repo.config['core.logAllRefUpdates'] = 'always'
return repo
def _mkcommit(
repo: pygit2.Repository,
message: str,
parent_oid: Optional[pygit2.Oid] = None,
file_content: bytes = b'hello\n',
) -> pygit2.Oid:
"""Create a one-file commit and return its OID.
Tree is a single blob at path ``file``; callers override ``file_content``
when they want different trees across a chain.
"""
blob_oid = repo.create_blob(file_content)
tb = repo.TreeBuilder()
tb.insert('file', blob_oid, pygit2.GIT_FILEMODE_BLOB)
tree_oid = tb.write()
parents = [parent_oid] if parent_oid is not None else []
return repo.create_commit(None, SIG, SIG, message, tree_oid, parents)
def _seed(repo: pygit2.Repository, messages: List[str]) -> List[pygit2.Oid]:
"""Seed a linear chain of commits with distinct trees; return OIDs."""
oids: List[pygit2.Oid] = []
parent: Optional[pygit2.Oid] = None
for i, msg in enumerate(messages):
oid = _mkcommit(repo, msg, parent_oid=parent, file_content=f'v{i}\n'.encode())
oids.append(oid)
parent = oid
repo.references.create(BRANCH, oids[-1])
repo.references['HEAD'].set_target(BRANCH) if 'HEAD' in repo.references else None
# A fresh bare repo already has HEAD -> refs/heads/master symbolically;
# creating refs/heads/master above makes it resolvable.
return oids
def _patch_gitdir(repo: pygit2.Repository) -> AbstractContextManager[Any]:
"""Patch b4.git_get_gitdir to return this repo's path."""
return patch('b4.git_get_gitdir', return_value=repo.path.rstrip('/'))
# -- rewrite_commit_messages core tests --------------------------------------
class TestRewriteCore:
def test_empty_edit_map_shortcircuits(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n', 'c\n'])
tip_before = bare_repo.references[BRANCH].target
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook') as mock_hook:
result = b4._rewrite.rewrite_commit_messages(
edit_map={},
start=str(oids[0]),
end='HEAD',
)
assert result == {}
assert bare_repo.references[BRANCH].target == tip_before
mock_hook.assert_not_called()
def test_rewrite_single_commit_preserves_tree_and_sigs(
self, bare_repo: pygit2.Repository
) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n', 'c\n'])
middle_hex = str(oids[1])
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={middle_hex: 'b (edited)\n'},
start=str(oids[0]),
end='HEAD',
)
new_middle = _commit(bare_repo, result[middle_hex])
old_middle = _commit(bare_repo, oids[1])
assert new_middle.tree_id == old_middle.tree_id
assert new_middle.author.name == old_middle.author.name
assert new_middle.author.email == old_middle.author.email
assert new_middle.author.time == old_middle.author.time
assert new_middle.committer.name == old_middle.committer.name
assert new_middle.committer.email == old_middle.committer.email
assert new_middle.committer.time == old_middle.committer.time
assert new_middle.message == 'b (edited)\n'
def test_rewrite_descendants_reparented(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n', 'c\n', 'd\n'])
second_hex = str(oids[1])
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={second_hex: 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
# Every commit after the first should have been re-emitted.
for old in oids[1:]:
assert str(old) in result
# Chain should be walk-able from the new tip.
new_tip = _commit(bare_repo, result[str(oids[3])])
assert str(new_tip.parents[0].id) == result[str(oids[2])]
assert str(new_tip.parents[0].parents[0].id) == result[str(oids[1])]
def test_branch_backup_created(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n'])
pre_tip = bare_repo.references[BRANCH].target
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
assert 'refs/original/master' in bare_repo.references
assert bare_repo.references['refs/original/master'].target == pre_tip
def test_reflog_entry_written(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n'])
reflog_msg = 'b4: custom reflog message'
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
reflog_msg=reflog_msg,
)
entries = list(bare_repo.references[BRANCH].log())
assert any(e.message == reflog_msg for e in entries)
def test_commit_not_in_edit_map_passes_message_through(
self, bare_repo: pygit2.Repository
) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n', 'c\n'])
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
# Third commit had its parent remapped but its message untouched.
new_third = _commit(bare_repo, result[str(oids[2])])
old_third = _commit(bare_repo, oids[2])
assert new_third.message == old_third.message
def test_trailing_newline_normalized(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n'])
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'no trailing newline'},
start=str(oids[0]),
end='HEAD',
)
assert (
_commit(bare_repo, result[str(oids[1])]).message == 'no trailing newline\n'
)
# -- Notes migration ---------------------------------------------------------
class TestNotesMigration:
def test_notes_migrated_default_ref(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n', 'c\n'])
bare_repo.create_note('the note body', SIG, SIG, str(oids[1]))
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b (edited)\n'},
start=str(oids[0]),
end='HEAD',
)
new_hex = result[str(oids[1])]
migrated = bare_repo.lookup_note(new_hex)
assert migrated.message == 'the note body'
# Old-OID note is still reachable (we don't delete).
original = bare_repo.lookup_note(str(oids[1]))
assert original.message == 'the note body'
def test_notes_migrated_custom_ref(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n'])
bare_repo.create_note(
'review note',
SIG,
SIG,
str(oids[1]),
'refs/notes/review',
)
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
new_hex = result[str(oids[1])]
migrated = bare_repo.lookup_note(new_hex, 'refs/notes/review')
assert migrated.message == 'review note'
def test_notes_migrated_multiple_refs(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n'])
bare_repo.create_note('default', SIG, SIG, str(oids[1]))
bare_repo.create_note(
'review',
SIG,
SIG,
str(oids[1]),
'refs/notes/review',
)
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
new_hex = result[str(oids[1])]
assert bare_repo.lookup_note(new_hex).message == 'default'
assert bare_repo.lookup_note(new_hex, 'refs/notes/review').message == 'review'
def test_no_notes_no_work(self, bare_repo: pygit2.Repository) -> None:
oids = _seed(bare_repo, ['a\n', 'b\n'])
# No notes created.
with _patch_gitdir(bare_repo), patch('b4.ez.run_rewrite_hook'):
result = b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
# Rewrite still succeeds and no refs/notes/* were created.
new_hex = result[str(oids[1])]
assert new_hex # sanity
notes_refs = [r for r in bare_repo.references if r.startswith('refs/notes/')]
assert notes_refs == []
# -- Hook integration (moved from test_ez.py) -------------------------------
class TestHookIntegration:
def test_pre_hook_blocks_rewrite(self, bare_repo: pygit2.Repository) -> None:
"""A failing pre-hook prevents any rewrite work from happening."""
oids = _seed(bare_repo, ['a\n', 'b\n'])
pre_tip = bare_repo.references[BRANCH].target
b4.MAIN_CONFIG['prep-pre-rewrite-hook'] = 'false'
try:
with (
patch('b4.ez.b4._run_command', return_value=(1, b'', b'hook failed\n')),
patch('b4.ez.b4.git_get_toplevel', return_value='/tmp'),
_patch_gitdir(bare_repo),
):
with pytest.raises(RuntimeError, match='Pre-rewrite hook'):
b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
# Branch must not have moved.
assert bare_repo.references[BRANCH].target == pre_tip
# No backup ref should have been created.
assert 'refs/original/master' not in bare_repo.references
finally:
b4.MAIN_CONFIG.pop('prep-pre-rewrite-hook', None)
def test_hooks_bracket_rewrite(self, bare_repo: pygit2.Repository) -> None:
"""Both hooks run around a successful rewrite, in the right order."""
oids = _seed(bare_repo, ['a\n', 'b\n'])
b4.MAIN_CONFIG['prep-pre-rewrite-hook'] = 'pre-cmd'
b4.MAIN_CONFIG['prep-post-rewrite-hook'] = 'post-cmd'
try:
call_order: List[str] = []
def _track_run(
cmdargs: List[str], **kwargs: Any
) -> Tuple[int, bytes, bytes]:
call_order.append(cmdargs[0])
return (0, b'', b'')
with (
patch('b4.ez.b4._run_command', side_effect=_track_run),
patch('b4.ez.b4.git_get_toplevel', return_value='/tmp'),
_patch_gitdir(bare_repo),
):
b4._rewrite.rewrite_commit_messages(
edit_map={str(oids[1]): 'b!\n'},
start=str(oids[0]),
end='HEAD',
)
# pre-cmd must run before the rewrite; post-cmd after. We don't
# have a direct "rewrite happened" marker in call_order, but if
# both are present in order and the branch moved, that's the
# guarantee we care about.
assert call_order == ['pre-cmd', 'post-cmd']
assert bare_repo.references[BRANCH].target != oids[1]
finally:
b4.MAIN_CONFIG.pop('prep-pre-rewrite-hook', None)
b4.MAIN_CONFIG.pop('prep-post-rewrite-hook', None)