summaryrefslogtreecommitdiff
path: root/git_crecord
diff options
context:
space:
mode:
authorAndrej Shadura <andrew@shadura.me>2022-03-06 10:43:21 +0100
committerAndrej Shadura <andrew@shadura.me>2022-03-06 21:37:38 +0100
commit1a73baa7952af30b2e9cb71a51d0d926bb59a515 (patch)
tree6c930e4698708e89d56d8c013556634836c2227b /git_crecord
parenta2c602e7d63201511839718ea3ba9a3b5c3205ec (diff)
Process patches and file names without decoding them as UTF-8
Diffstat (limited to 'git_crecord')
-rw-r--r--git_crecord/chunk_selector.py17
-rw-r--r--git_crecord/crecord_core.py93
-rw-r--r--git_crecord/crpatch.py352
-rw-r--r--git_crecord/gitrepo.py57
-rw-r--r--git_crecord/main.py42
-rw-r--r--git_crecord/util.py48
6 files changed, 383 insertions, 226 deletions
diff --git a/git_crecord/chunk_selector.py b/git_crecord/chunk_selector.py
index d27679f..9685a6e 100644
--- a/git_crecord/chunk_selector.py
+++ b/git_crecord/chunk_selector.py
@@ -686,7 +686,7 @@ class CursesChunkSelector:
return checkbox
- def printheader(self, header, selected=False, towin=True,
+ def printheader(self, header: Header, selected=False, towin=True,
ignorefolding=False):
"""
Print the header to the pad. If countLines is True, don't print
@@ -726,7 +726,7 @@ class CursesChunkSelector:
return outstr
- def printhunklinesbefore(self, hunk, selected=False, towin=True,
+ def printhunklinesbefore(self, hunk: Hunk, selected=False, towin=True,
ignorefolding=False):
"includes start/end line indicator"
outstr = ""
@@ -745,11 +745,10 @@ class CursesChunkSelector:
checkbox = self.getstatusprefixstring(hunk)
lineprefix = " "*self.hunkindentnumchars + checkbox
- frtoline = " " + hunk.getfromtoline().strip("\n")
-
+ frtoline = " " + hunk.getfromtoline().decode("UTF-8", errors="hexreplace").strip("\n")
outstr += self.printstring(self.chunkpad, lineprefix, towin=towin,
- align=False) # add uncolored checkbox/indent
+ align=False) # add uncolored checkbox/indent
outstr += self.printstring(self.chunkpad, frtoline, pair=colorpair,
towin=towin)
@@ -759,12 +758,12 @@ class CursesChunkSelector:
# print out lines of the chunk preceding changed-lines
for line in hunk.before:
- linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line
+ linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line.decode("UTF-8", errors="hexreplace")
outstr += self.printstring(self.chunkpad, linestr, towin=towin)
return outstr
- def printhunklinesafter(self, hunk, towin=True, ignorefolding=False):
+ def printhunklinesafter(self, hunk: Hunk, towin=True, ignorefolding=False):
outstr = ""
if hunk.folded and not ignorefolding:
return outstr
@@ -772,12 +771,12 @@ class CursesChunkSelector:
# a bit superfluous, but to avoid hard-coding indent amount
checkbox = self.getstatusprefixstring(hunk)
for line in hunk.after:
- linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line
+ linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line.decode("UTF-8", errors="hexreplace")
outstr += self.printstring(self.chunkpad, linestr, towin=towin)
return outstr
- def printhunkchangedline(self, hunkline, selected=False, towin=True):
+ def printhunkchangedline(self, hunkline: HunkLine, selected=False, towin=True):
outstr = ""
checkbox = self.getstatusprefixstring(hunkline)
diff --git a/git_crecord/crecord_core.py b/git_crecord/crecord_core.py
index 0b02a43..ba4cee7 100644
--- a/git_crecord/crecord_core.py
+++ b/git_crecord/crecord_core.py
@@ -9,19 +9,22 @@
'''text-gui based change selection during commit or qrefresh'''
from gettext import gettext as _
-from . import encoding
-from . import util
+
import io
import errno
import os
import tempfile
import subprocess
+from typing import IO, cast
+
+from .crpatch import Header, parsepatch, filterpatch
+from .chunk_selector import chunkselector
+from .gitrepo import GitRepo
+from .util import Abort, system, closefds, copyfile
-from . import crpatch
-from . import chunk_selector
def dorecord(ui, repo, commitfunc, *pats, **opts):
- def recordfunc(ui, repo, message, match, opts):
+ def recordfunc(ui, repo: GitRepo, message, match, opts):
"""This is generic record driver.
Its job is to interactively filter local changes, and accordingly
@@ -51,21 +54,21 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
if not opts['index'] and repo.head():
git_base.append("HEAD")
- p = subprocess.Popen(git_args + git_base, stdout=subprocess.PIPE, close_fds=util.closefds)
- fp = p.stdout
+ p = subprocess.Popen(git_args + git_base, stdout=subprocess.PIPE, close_fds=closefds)
+ fp = cast(IO[bytes], p.stdout)
# 0. parse patch
fromfiles = set()
tofiles = set()
- chunks = crpatch.parsepatch(fp)
+ chunks = parsepatch(fp)
for c in chunks:
- if isinstance(c, crpatch.Header):
+ if isinstance(c, Header):
fromfile, tofile = c.files()
if fromfile is not None:
- fromfiles.add(fromfile)
+ fromfiles.add(os.fsdecode(fromfile))
if tofile is not None:
- tofiles.add(tofile)
+ tofiles.add(os.fsdecode(tofile))
added = tofiles - fromfiles
removed = fromfiles - tofiles
@@ -73,31 +76,31 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
changes = [modified, added, removed]
# 1. filter patch, so we have intending-to apply subset of it
- chunks = crpatch.filterpatch(opts,
- chunks,
- chunk_selector.chunkselector, ui)
+ chunks = filterpatch(opts,
+ chunks,
+ chunkselector, ui)
p.wait()
del fp
contenders = set()
for h in chunks:
- try:
- contenders.update(set(h.files()))
- except AttributeError:
- pass
+ fromfile, tofile = h.files()
+ if fromfile is not None:
+ contenders.add(os.fsdecode(fromfile))
+ if tofile is not None:
+ contenders.add(os.fsdecode(tofile))
changed = changes[0] | changes[1] | changes[2]
- newfiles = [f for f in changed if f in contenders]
+ newfiles: list = [f for f in changed if f in contenders]
if not newfiles:
- ui.status(_('no changes to record\n'))
+ ui.status(_('no changes to record'))
return 0
-
# 2. backup changed files, so we can restore them in the end
backups = {}
newly_added_backups = {}
- backupdir = os.path.join(repo.controldir(), 'record-backups')
+ backupdir = repo.controldir / 'record-backups'
try:
os.mkdir(backupdir)
except OSError as err:
@@ -112,19 +115,20 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
for f in newfiles:
if f not in (modified | added):
continue
- fd, tmpname = tempfile.mkstemp(prefix=f.replace('/', '_')+'.',
+ prefix = os.fsdecode(f).replace('/', '_') + '.'
+ fd, tmpname = tempfile.mkstemp(prefix=prefix,
dir=backupdir)
os.close(fd)
- ui.debug('backup %r as %r\n' % (f, tmpname))
+ ui.debug('backup %r as %r' % (f, tmpname))
pathname = os.path.join(repo.path, f)
if os.path.isfile(pathname):
- util.copyfile(pathname, tmpname)
+ copyfile(pathname, tmpname)
if f in modified:
backups[f] = tmpname
elif f in added:
newly_added_backups[f] = tmpname
- fp = io.StringIO()
+ fp = io.BytesIO()
all_backups = {}
all_backups.update(backups)
all_backups.update(newly_added_backups)
@@ -143,29 +147,32 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
# 3a. apply filtered patch to clean repo (clean)
if backups or any((f in contenders for f in removed)):
- util.system(['git', 'checkout', '-f'] + git_base + ['--'] + [f for f in newfiles if f not in added],
- onerr=util.Abort, errprefix=_("checkout failed"))
+ system(['git', 'checkout', '-f'] + git_base + ['--'] + [f for f in newfiles if f not in added],
+ onerr=Abort, errprefix=_("checkout failed"))
# remove newly added files from 'clean' repo (so patch can apply)
for f in newly_added_backups:
- pathname = os.path.join(repo.path, f)
- if os.path.isfile(pathname):
- os.unlink(pathname)
+ pathname = repo.path / f
+ pathname.unlink(missing_ok=True)
# 3b. (apply)
if dopatch:
try:
- ui.debug('applying patch\n')
- ui.debug(fp.getvalue())
- p = subprocess.Popen(["git", "apply", "--whitespace=nowarn"], stdin=subprocess.PIPE, close_fds=util.closefds)
- p.stdin.write(fp.read().encode(encoding.encoding))
+ ui.debug('applying patch')
+ ui.debug(fp.getvalue().decode("UTF-8", "hexreplace"))
+ p = subprocess.Popen(
+ ["git", "apply", "--whitespace=nowarn"],
+ stdin=subprocess.PIPE,
+ close_fds=closefds
+ )
+ p.stdin.write(fp.getvalue())
p.stdin.close()
p.wait()
except Exception as err:
s = str(err)
if s:
- raise util.Abort(s)
+ raise Abort(s)
else:
- raise util.Abort(_('patch failed to apply'))
+ raise Abort(_('patch failed to apply'))
del fp
# 4. We prepared working directory according to filtered patch.
@@ -173,12 +180,12 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
# it is important to first chdir to repo root -- we'll call a
# highlevel command with list of pathnames relative to repo root
- newfiles = [os.path.join(repo.path, n) for n in newfiles]
+ newfiles = [repo.path / n for n in newfiles]
if opts['operation'] == 'crecord':
ui.commit(*newfiles, **opts)
else:
ui.stage(*newfiles, **opts)
- ui.debug('previous staging contents backed up as tree %r\n' % index_backup.indextree)
+ ui.debug('previous staging contents backed up as tree %r' % index_backup.indextree)
index_backup = None
return 0
@@ -186,12 +193,12 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
# 5. finally restore backed-up files
try:
for realname, tmpname in backups.items():
- ui.debug('restoring %r to %r\n' % (tmpname, realname))
- util.copyfile(tmpname, os.path.join(repo.path, realname))
+ ui.debug('restoring %r to %r' % (tmpname, realname))
+ copyfile(tmpname, os.path.join(repo.path, realname))
os.unlink(tmpname)
for realname, tmpname in newly_added_backups.items():
- ui.debug('restoring %r to %r\n' % (tmpname, realname))
- util.copyfile(tmpname, os.path.join(repo.path, realname))
+ ui.debug('restoring %r to %r' % (tmpname, realname))
+ copyfile(tmpname, os.path.join(repo.path, realname))
os.unlink(tmpname)
os.rmdir(backupdir)
if index_backup:
diff --git a/git_crecord/crpatch.py b/git_crecord/crpatch.py
index d7b3555..bd97f72 100644
--- a/git_crecord/crpatch.py
+++ b/git_crecord/crpatch.py
@@ -5,35 +5,47 @@ from gettext import gettext as _
import io
import re
+from codecs import register_error
-from typing import IO, Iterator, Optional
+from typing import IO, Iterator, Optional, Sequence, Union
-lines_re = re.compile(r'@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@\s*(.*)')
+lines_re = re.compile(b'@@ -(\\d+)(?:,(\\d+))? \\+(\\d+)(?:,(\\d+))? @@\\s*(.*)')
class PatchError(Exception):
pass
+def hexreplace(err: UnicodeError) -> tuple[str, int]:
+ if not isinstance(err, UnicodeDecodeError):
+ raise NotImplementedError("only decoding is supported")
+ return "".join(
+ "<%X>" % x for x in err.object[err.start:err.end]
+ ), err.end
+
+
+register_error("hexreplace", hexreplace)
+
+
class LineReader:
# simple class to allow pushing lines back into the input stream
def __init__(self, fp: IO[bytes]):
self.fp = fp
- self.buf: list[str] = []
+ self.buf: list[bytes] = []
- def push(self, line: str) -> None:
+ def push(self, line: bytes) -> None:
if line is not None:
self.buf.append(line)
- def readline(self) -> str:
+ def readline(self) -> bytes:
if self.buf:
line = self.buf[0]
del self.buf[0]
return line
- return self.fp.readline().decode('UTF-8')
+ return self.fp.readline()
- def __iter__(self) -> Iterator[str]:
- return iter(self.readline, '')
+ def __iter__(self) -> Iterator[bytes]:
+ return iter(self.readline, b'')
def scanpatch(fp: IO[bytes]):
@@ -47,7 +59,7 @@ def scanpatch(fp: IO[bytes]):
>>> rawpatch = b'''diff --git a/folder1/g b/folder1/g
... --- a/folder1/g
... +++ b/folder1/g
- ... @@ -1,8 +1,10 @@
+ ... @@ -1,8 +1,10 @@ some context
... 1
... 2
... -3
@@ -62,30 +74,30 @@ def scanpatch(fp: IO[bytes]):
>>> fp = io.BytesIO(rawpatch)
>>> list(scanpatch(fp))
[('file',
- ['diff --git a/folder1/g b/folder1/g\n',
- '--- a/folder1/g\n',
- '+++ b/folder1/g\n']),
+ [b'diff --git a/folder1/g b/folder1/g\n',
+ b'--- a/folder1/g\n',
+ b'+++ b/folder1/g\n']),
('range',
- ('1', '8', '1', '10', '')),
+ (b'1', b'8', b'1', b'10', b'some context')),
('context',
- [' 1\n', ' 2\n']),
+ [b' 1\n', b' 2\n']),
('hunk',
- ['-3\n']),
+ [b'-3\n']),
('context',
- [' 4\n', ' 5\n', ' 6\n']),
+ [b' 4\n', b' 5\n', b' 6\n']),
('hunk',
- ['+6.1\n', '+6.2\n']),
+ [b'+6.1\n', b'+6.2\n']),
('context',
- [' 7\n', ' 8\n']),
+ [b' 7\n', b' 8\n']),
('hunk',
- ['+9'])]
+ [b'+9'])]
"""
lr = LineReader(fp)
- def scanwhile(first, p) -> list[str]:
+ def scanwhile(first: bytes, p) -> list[bytes]:
"""scan lr while predicate holds"""
lines = [first]
- for line in iter(lr.readline, ''):
+ for line in iter(lr.readline, b''):
if p(line):
lines.append(line)
else:
@@ -93,24 +105,24 @@ def scanpatch(fp: IO[bytes]):
break
return lines
- for line in iter(lr.readline, ''):
- if line.startswith('diff --git a/'):
- def notheader(line: str) -> bool:
+ for line in iter(lr.readline, b''):
+ if line.startswith(b'diff --git a/'):
+ def notheader(line: bytes) -> bool:
s = line.split(None, 1)
- return not s or s[0] not in ('---', 'diff')
+ return not s or s[0] not in (b'---', b'diff')
header = scanwhile(line, notheader)
fromfile = lr.readline()
- if fromfile.startswith('---'):
+ if fromfile.startswith(b'---'):
tofile = lr.readline()
header += [fromfile, tofile]
else:
lr.push(fromfile)
yield 'file', header
- elif line.startswith(' '):
- yield 'context', scanwhile(line, lambda l: l[0] in ' \\')
- elif line[0] in '-+':
- yield 'hunk', scanwhile(line, lambda l: l[0] in '-+\\')
+ elif line.startswith(b' '):
+ yield 'context', scanwhile(line, lambda l: l[0] in b' \\')
+ elif line[0] in b'-+':
+ yield 'hunk', scanwhile(line, lambda l: l[0] in b'-+\\')
else:
m = lines_re.match(line)
if m:
@@ -124,7 +136,8 @@ class PatchNode:
(i.e. PatchRoot, header, hunk, HunkLine)
"""
- folded: bool = False
+ folded: bool
+ # a patch this node belongs to
patch: 'PatchRoot'
def firstchild(self):
@@ -133,7 +146,7 @@ class PatchNode:
def lastchild(self):
raise NotImplementedError("method must be implemented by subclass")
- def allchildren(self) -> list['PatchNode']:
+ def allchildren(self) -> Sequence['PatchNode']:
"""Return a list of all direct children of this node"""
raise NotImplementedError("method must be implemented by subclass")
@@ -235,6 +248,18 @@ class PatchNode:
# try parent (or None)
return self.parentitem()
+ def write(self, fp: IO[bytes]) -> None:
+ """Write the unified diff-formatter representation of the
+ patch node into the binary stream"""
+ raise NotImplementedError("method must be implemented by subclass")
+
+ def __bytes__(self) -> bytes:
+ """Return the unified diff-formatter representation of the
+ patch node as bytes"""
+ with io.BytesIO() as b:
+ self.write(b)
+ return b.getvalue()
+
class PatchRoot(PatchNode, list):
"""List of header objects representing the patch."""
@@ -249,10 +274,10 @@ class PatchRoot(PatchNode, list):
class Header(PatchNode):
"""Patch header"""
- diff_re = re.compile('diff --git a/(.*) b/(.*)$')
- allhunks_re = re.compile('(?:GIT binary patch|new file|deleted file) ')
- pretty_re = re.compile('(?:new file|deleted file) ')
- special_re = re.compile('(?:GIT binary patch|new|deleted|copy|rename) ')
+ diff_re = re.compile(b'diff --git a/(.*) b/(.*)$')
+ allhunks_re = re.compile(b'(?:GIT binary patch|new file|deleted file) ')
+ pretty_re = re.compile(b'(?:new file|deleted file) ')
+ special_re = re.compile(b'(?:GIT binary patch|new|deleted|copy|rename) ')
def __init__(self, header):
self.header = header
@@ -266,9 +291,6 @@ class Header(PatchNode):
# flag to indicate whether to display as folded/unfolded to user
self.folded = True
- # list of all headers in patch
- self.patch = None
-
# flag is False if this header was ever unfolded from initial state
self.neverunfolded = True
@@ -281,34 +303,37 @@ class Header(PatchNode):
Otherwise return False.
"""
- return any(h.startswith('GIT binary patch') for h in self.header)
+ return any(h.startswith(b'GIT binary patch') for h in self.header)
- def pretty(self, fp):
+ def pretty(self, fp: IO[str]):
for h in self.header:
- if h.startswith('GIT binary patch'):
+ if h.startswith(b'GIT binary patch'):
fp.write(_('this modifies a binary file (all or nothing)\n'))
break
if self.pretty_re.match(h):
- fp.write(h)
+ fp.write(h.decode("UTF-8", errors="hexreplace"))
if self.binary():
fp.write(_('this is a binary file\n'))
break
- if h.startswith('---'):
+ if h.startswith(b'---'):
fp.write(_('%d hunks, %d lines changed\n') %
(len(self.hunks),
sum([max(h.added, h.removed) for h in self.hunks])))
break
- fp.write(h)
+ fp.write(h.decode("UTF-8", errors="hexreplace"))
- def prettystr(self):
- x = io.StringIO()
- self.pretty(x)
- return x.getvalue()
+ def prettystr(self) -> str:
+ return str(self)
- def write(self, fp):
- fp.write(''.join(self.header))
+ def __str__(self) -> str:
+ with io.StringIO() as s:
+ self.pretty(s)
+ return s.getvalue()
- def allhunks(self):
+ def write(self, fp: IO[bytes]) -> None:
+ fp.write(b''.join(self.header))
+
+ def allhunks(self) -> bool:
"""
Return True if the file which the header represents was changed
completely (i.e. there is no possibility of applying a hunk of changes
@@ -324,33 +349,35 @@ class Header(PatchNode):
fromfile = None
return [fromfile, tofile]
- def filename(self):
+ def filename(self) -> str:
files = self.files()
- return files[1] or files[0]
+ return (files[1] or files[0]).decode("UTF-8", errors="hexreplace")
def __repr__(self) -> str:
- return '<header %s>' % (' '.join(map(repr, self.files())))
+ return '<header %s>' % (' '.join(
+ repr(x) for x in self.files()
+ ))
- def special(self):
+ def special(self) -> bool:
return any(self.special_re.match(h) for h in self.header)
@property
- def changetype(self):
+ def changetype(self) -> str:
if self._changetype is None:
self._changetype = "M"
for h in self.header:
- if h.startswith('new file'):
+ if h.startswith(b'new file'):
self._changetype = "A"
- elif h.startswith('deleted file'):
+ elif h.startswith(b'deleted file'):
self._changetype = "D"
- elif h.startswith('copy from'):
+ elif h.startswith(b'copy from'):
self._changetype = "C"
- elif h.startswith('rename from'):
+ elif h.startswith(b'rename from'):
self._changetype = "R"
return self._changetype
- def nextsibling(self):
+ def nextsibling(self) -> Optional['Header']:
numheadersinpatch = len(self.patch)
indexofthisheader = self.patch.index(self)
@@ -360,7 +387,7 @@ class Header(PatchNode):
else:
return None
- def prevsibling(self):
+ def prevsibling(self) -> Optional['Header']:
indexofthisheader = self.patch.index(self)
if indexofthisheader > 0:
previousheader = self.patch[indexofthisheader - 1]
@@ -368,7 +395,7 @@ class Header(PatchNode):
else:
return None
- def parentitem(self):
+ def parentitem(self) -> None:
"""
There is no 'real' parent item of a header that can be selected,
so return None.
@@ -389,7 +416,7 @@ class Header(PatchNode):
else:
return None
- def allchildren(self):
+ def allchildren(self) -> Sequence['Hunk']:
"""Return a list of all direct children of this node"""
return self.hunks
@@ -397,7 +424,7 @@ class Header(PatchNode):
class HunkLine(PatchNode):
"""Represents a changed line in a hunk"""
- def __init__(self, linetext, hunk):
+ def __init__(self, linetext: bytes, hunk):
self.linetext = linetext
self.applied = True
# the parent hunk to which this line belongs
@@ -406,8 +433,21 @@ class HunkLine(PatchNode):
# in the prevItem method.
self.folded = False
- def prettystr(self):
- return self.linetext
+ def __bytes__(self):
+ if self.applied:
+ return self.linetext
+ else:
+ return b' ' + self.linetext[1:]
+
+ @property
+ def diffop(self):
+ return self.linetext[0:1]
+
+ def __str__(self) -> str:
+ return self.prettystr()
+
+ def prettystr(self) -> str:
+ return self.linetext.decode("UTF-8", errors="hexreplace")
def nextsibling(self):
numlinesinhunk = len(self.hunk.changedlines)
@@ -446,8 +486,24 @@ class HunkLine(PatchNode):
class Hunk(PatchNode):
"""ui patch hunk, wraps a hunk and keeps track of ui behavior """
maxcontext = 3
-
- def __init__(self, header: Header, fromline, toline, proc, before, hunklines, after):
+ header: Header
+ fromline: int
+ toline: int
+ proc: bytes
+ after: Sequence[bytes]
+ before: Sequence[bytes]
+ changedlines: Sequence[HunkLine]
+
+ def __init__(
+ self,
+ header: Header,
+ fromline: int,
+ toline: int,
+ proc: bytes,
+ before: Sequence[bytes],
+ hunklines: Sequence[bytes],
+ after: Sequence[bytes]
+ ):
def trimcontext(number, lines):
delta = len(lines) - self.maxcontext
if False and delta > 0:
@@ -509,25 +565,25 @@ class Hunk(PatchNode):
else:
return None
- def allchildren(self) -> list[HunkLine]:
+ def allchildren(self) -> Sequence[PatchNode]:
"""Return a list of all direct children of this node"""
return self.changedlines
def countchanges(self) -> tuple[int, int]:
"""changedlines -> (n+,n-)"""
add = len([line for line in self.changedlines if line.applied
- and line.prettystr().startswith('+')])
+ and line.diffop == b'+'])
rem = len([line for line in self.changedlines if line.applied
- and line.prettystr().startswith('-')])
+ and line.diffop == b'-'])
return add, rem
def getfromtoline(self):
- # calculate the number of removed lines converted to context lines
+ """Calculate the number of removed lines converted to context lines"""
removedconvertedtocontext = self.originalremoved - self.removed
contextlen = (len(self.before) + len(self.after) +
removedconvertedtocontext)
- if self.after and self.after[-1] == '\\ No newline at end of file\n':
+ if self.after and self.after[-1] == b'\\ No newline at end of file\n':
contextlen -= 1
fromlen = contextlen + self.removed
tolen = contextlen + self.added
@@ -539,75 +595,75 @@ class Hunk(PatchNode):
# So, if either of hunks is empty, decrease its line start. --immerrr
# But only do this if fromline > 0, to avoid having, e.g fromline=-1.
fromline, toline = self.fromline, self.toline
- if fromline != 0:
- if fromlen == 0:
- fromline -= 1
+ if fromlen == 0 and fromline > 0:
+ fromline -= 1
if tolen == 0 and toline > 0:
toline -= 1
- fromtoline = '@@ -%d,%d +%d,%d @@%s\n' % (
+ fromtoline = b'@@ -%d,%d +%d,%d @@%b\n' % (
fromline, fromlen, toline, tolen,
- self.proc and (' ' + self.proc))
+ self.proc and (b' ' + self.proc))
+
return fromtoline
- def write(self, fp) -> None:
+ def write(self, fp: IO[bytes]) -> None:
# updated self.added/removed, which are used by getfromtoline()
self.added, self.removed = self.countchanges()
fp.write(self.getfromtoline())
+ fp.write(b''.join(self.before))
- hunklinelist = []
# add the following to the list: (1) all applied lines, and
# (2) all unapplied removal lines (convert these to context lines)
for changedline in self.changedlines:
- changedlinestr = changedline.prettystr()
- if changedline.applied:
- hunklinelist.append(changedlinestr)
- elif changedlinestr.startswith("-"):
- hunklinelist.append(" " + changedlinestr[1:])
+ fp.write(bytes(changedline))
- fp.write(''.join(self.before + hunklinelist + self.after))
+ fp.write(b''.join(self.after))
def reversehunks(self) -> 'Hunk':
- """Make the hunk apply in the other direction."""
- m = {'+': '-', '-': '+', '\\': '\\'}
- hunklines = ['%s%s' % (m[line.prettystr()[0:1]], line.prettystr()[1:])
+ r"""Make the hunk apply in the other direction.
+
+ >>> header = Header([b'diff --git a/file b/file\n'])
+ >>> print(Hunk(
+ ... header,
+ ... fromline=1,
+ ... toline=2,
+ ... proc=b'context',
+ ... before=[b' 1\n', b' 2\n'],
+ ... hunklines=[b'-3\n'],
+ ... after=[b' 4\n', b' 5\n'],
+ ... ).reversehunks().prettystr())
+ @@ -1,4 +2,5 @@ context
+ 1
+ 2
+ +3
+ 4
+ 5
+ """
+ m = {b'+': b'-', b'-': b'+', b'\\': b'\\'}
+ hunklines = [b'%s%s' % (m[line.linetext[0:1]], line.linetext[1:])
for line in self.changedlines if line.applied]
return Hunk(self.header, self.fromline, self.toline, self.proc, self.before, hunklines, self.after)
- def unapplyhunks(self) -> 'Hunk':
- """Unapply the hunk.
-
- If the hunk is not applied, then the hunk is returned as it appears in the patch file.
- If the hunk is applied, then the hunk is returned with the '+' lines changed to ' ' lines
-
- :return: A new Hunk object with the changes applied.
- """
- m = {'+': '-', '-': '+', '\\': '\\'}
- hunklinelist = []
- for changedline in self.changedlines:
- changedlinestr = changedline.prettystr()
- if not changedline.applied:
- hunklinelist.append('%s%s' % (m[changedlinestr[0]], changedlinestr[1:]))
- elif changedlinestr.startswith("+"):
- hunklinelist.append(" " + changedlinestr[1:])
- return Hunk(self.header, self.fromline, self.toline, self.proc, self.before, hunklinelist, self.after)
-
- pretty = write
+ def files(self) -> list[Optional[bytes]]:
+ return self.header.files()
- def filename(self):
+ def filename(self) -> str:
return self.header.filename()
+ def __str__(self) -> str:
+ return self.prettystr()
+
def prettystr(self) -> str:
- x = io.StringIO()
- self.pretty(x)
- return x.getvalue()
+ x = io.BytesIO()
+ self.write(x)
+ return x.getvalue().decode("UTF-8", errors="hexreplace")
def __repr__(self) -> str:
- return '<hunk %r@%d>' % (self.filename(), self.fromline)
+ return '<hunk %r@%d>' % (self.files()[1] or self.files()[0], self.fromline)
def parsepatch(fp: IO[bytes]):
- """Parse a patch, returning a list of header and hunk objects.
+ r"""Parse a patch, returning a list of header and hunk objects.
>>> rawpatch = b'''diff --git a/folder1/g b/folder1/g
... --- a/folder1/g
@@ -630,20 +686,23 @@ def parsepatch(fp: IO[bytes]):
Headers and hunks are interspersed in the list returned from
the function:
>>> headers
- [<header 'folder1/g' 'folder1/g'>,
- <hunk 'folder1/g'@1>,
- <hunk 'folder1/g'@7>,
- <hunk 'folder1/g'@9>]
+ [<header b'folder1/g' b'folder1/g'>,
+ <hunk b'folder1/g'@1>,
+ <hunk b'folder1/g'@7>,
+ <hunk b'folder1/g'@9>]
+
+ >>> headers[0].filename()
+ 'folder1/g'
Each header also provides a list of hunks belonging to it:
>>> headers[0].hunks
- [<hunk 'folder1/g'@1>,
- <hunk 'folder1/g'@7>,
- <hunk 'folder1/g'@9>]
- >>> out = io.StringIO()
+ [<hunk b'folder1/g'@1>,
+ <hunk b'folder1/g'@7>,
+ <hunk b'folder1/g'@9>]
+ >>> out = io.BytesIO()
>>> for header in headers:
... header.write(out)
- >>> print(out.getvalue())
+ >>> print(out.getvalue().decode("ascii"))
diff --git a/folder1/g b/folder1/g
--- a/folder1/g
+++ b/folder1/g
@@ -661,23 +720,54 @@ def parsepatch(fp: IO[bytes]):
8
@@ -8,0 +10,1 @@
+9
+
+ It is possible to handle non-UTF-8 patches:
+ >>> rawpatch = b'''diff --git a/test b/test
+ ... --- /dev/null
+ ... +++ b/test
+ ... @@ -0,0 +1,2 @@
+ ... +\xCD\xCE\xCD-\xD3\xD2\xD4-8 \xF2\xE5\xF1\xF2
+ ... +test'''
+ >>> fp = io.BytesIO(rawpatch)
+ >>> headers = parsepatch(fp)
+ >>> out = io.BytesIO()
+ >>> for header in headers:
+ ... header.write(out)
+
+ Non-UTF-8 characters survive the roundtrip:
+ >>> print(out.getvalue().decode("cp1251"))
+ diff --git a/test b/test
+ --- /dev/null
+ +++ b/test
+ @@ -0,0 +1,2 @@
+ +НОН-УТФ-8 тест
+ +test
+
+ When pretty-printing the hunk, they get replaced with their
+ hexadecimal codes:
+ >>> print(headers[0].hunks[0])
+ @@ -0,0 +1,2 @@
+ +<CD><CE><CD>-<D3><D2><D4>-8 <F2><E5><F1><F2>
+ +test
"""
class Parser:
"""patch parsing state machine"""
+ header: Header
+ headers: Sequence[Union[Header, Hunk]]
+
def __init__(self):
self.fromline = 0
self.toline = 0
- self.proc = ''
- self.header = None
+ self.proc = b''
self.context = []
self.before = []
self.hunk = []
self.headers = []
def addrange(self, limits):
- "Store range line info to associated instance variables."
+ """Store range line info to associated instance variables."""
fromstart, fromend, tostart, toend, proc = limits
self.fromline = int(fromstart)
self.toline = int(tostart)
@@ -707,7 +797,7 @@ def parsepatch(fp: IO[bytes]):
self.before = []
self.hunk = []
self.context = []
- self.proc = ''
+ self.proc = b''
def addcontext(self, context):
"""
@@ -738,7 +828,7 @@ def parsepatch(fp: IO[bytes]):
self.before = self.context
self.context = []
- def newfile(self, hdr):
+ def newfile(self, header):
"""
Create a header object containing the header lines, and the
filename the header applies to. Add the header to self.headers.
@@ -750,7 +840,7 @@ def parsepatch(fp: IO[bytes]):
self.add_new_hunk()
# create a new header and add it to self.header
- h = Header(hdr)
+ h = Header(header)
self.headers.append(h)
self.header = h
diff --git a/git_crecord/gitrepo.py b/git_crecord/gitrepo.py
index c5f1843..0a3a59a 100644
--- a/git_crecord/gitrepo.py
+++ b/git_crecord/gitrepo.py
@@ -1,9 +1,14 @@
import os
import sys
+from pathlib import Path
+from typing import Optional
+
from . import util
INDEX_FILENAME = "index"
+ObjectHash = str
+
class GitTree:
def __init__(self, tree):
self._tree = tree
@@ -23,41 +28,59 @@ class GitIndex:
def __repr__(self):
return "%s(%r, %r)" % (self.__class__.__name__, self._filename, self.indextree)
- def commit(self):
- return util.systemcall(['git', 'write-tree'], onerr=RuntimeError).rstrip('\n')
+ def commit(self) -> ObjectHash:
+ return util.systemcall(
+ ['git', 'write-tree'],
+ onerr=RuntimeError,
+ encoding="ascii",
+ ).rstrip('\n')
def write(self):
GitTree(self.indextree).read()
- def backup_tree(self):
+ def backup_tree(self) -> ObjectHash:
try:
self.indextree = self.commit()
except RuntimeError as inst:
raise util.Abort('failed to read the index: %s' % inst)
return self.indextree
+
class GitRepo:
- def __init__(self, path):
+ def __init__(self, path: Optional[os.PathLike]):
try:
- self.path = util.systemcall(['git', 'rev-parse', '--show-toplevel'],
- onerr=util.Abort).rstrip('\n')
- self._controldir = util.systemcall(['git', 'rev-parse', '--git-dir']).rstrip('\n')
- if not os.path.isdir(self._controldir):
+ self.path = Path(util.systemcall(
+ ['git', 'rev-parse', '--show-toplevel'],
+ dir=path,
+ encoding="fs",
+ onerr=util.Abort
+ ).rstrip('\n'))
+ self._controldir = Path(util.systemcall(
+ ['git', 'rev-parse', '--git-dir'],
+ dir=path,
+ encoding="fs",
+ ).rstrip('\n'))
+ if not self._controldir.is_dir():
raise util.Abort
except util.Abort:
sys.exit(1)
def __repr__(self):
- return "%s(%r)" % (self.__class__.__name__, self.path)
+ return "%s(%s)" % (self.__class__.__name__, self.path)
- def controldir(self):
- return os.path.abspath(self._controldir)
+ @property
+ def controldir(self) -> Path:
+ return self._controldir.resolve()
- def index_path(self):
- return os.path.join(self.controldir(), INDEX_FILENAME)
+ @property
+ def index_path(self) -> Path:
+ return self.controldir / INDEX_FILENAME
- def open_index(self):
- return GitIndex(self.index_path())
+ def open_index(self) -> GitIndex:
+ return GitIndex(self.index_path)
- def head(self):
- return util.systemcall(['git', 'rev-parse', '--verify', '-q', 'HEAD']).rstrip('\n')
+ def head(self) -> ObjectHash:
+ return util.systemcall(
+ ['git', 'rev-parse', '--verify', '-q', 'HEAD'],
+ encoding="ascii",
+ ).rstrip('\n')
diff --git a/git_crecord/main.py b/git_crecord/main.py
index 0e3228c..bad5f8f 100644
--- a/git_crecord/main.py
+++ b/git_crecord/main.py
@@ -10,7 +10,11 @@ import argparse
class Config:
def get(self, section, item, default=None):
try:
- return util.systemcall(['git', 'config', '--get', '%s.%s' % (section, item)], onerr=KeyError).rstrip('\n')
+ return util.systemcall(
+ ['git', 'config', '--get', '%s.%s' % (section, item)],
+ onerr=KeyError,
+ encoding="UTF-8",
+ ).rstrip('\n')
except KeyError:
return default
@@ -27,29 +31,25 @@ class Ui:
except KeyError:
self._username = None
- def debug(self, *msg, **opts):
- if self.debuglevel < 2:
+ def print_message(self, *msg, debuglevel: int, **opts):
+ if self.debuglevel < debuglevel:
return
- for m in msg:
- sys.stdout.write(m)
- def info(self, *msg, **opts):
- if self.debuglevel < 1:
- return
sys.stdout.flush()
- for m in msg:
- sys.stderr.write(m)
+ print(*msg, **opts, file=sys.stderr)
sys.stderr.flush()
- def status(self, *msg, **opts):
- for m in msg:
- sys.stdout.write(m)
+ def debug(self, *msg, **opts):
+ self.print_message(*msg, debuglevel=2, **opts)
+
+ def info(self, *msg, **opts):
+ self.print_message(*msg, debuglevel=1, **opts)
def warn(self, *msg, **opts):
- sys.stdout.flush()
- for m in msg:
- sys.stderr.write(m)
- sys.stderr.flush()
+ self.print_message(*msg, debuglevel=0, **opts)
+
+ def status(self, *msg, **opts):
+ print(*msg, **opts)
def setdebuglevel(self, level):
self.debuglevel = level
@@ -69,16 +69,16 @@ class Ui:
os.environ.get("VISUAL") or
os.environ.get("EDITOR", editor))
- def edit(self, text, user, extra=None, name=None):
+ def edit(self, text: bytes, user, extra=None, name=None) -> bytes:
fd = None
if name is None:
(fd, name) = tempfile.mkstemp(prefix='git-crecord-',
suffix=".txt", text=True)
try:
if fd is not None:
- f = os.fdopen(fd, "w")
+ f = os.fdopen(fd, "wb")
else:
- f = open(name, "w")
+ f = open(name, "wb")
f.write(text)
f.close()
@@ -87,7 +87,7 @@ class Ui:
util.system("%s \"%s\"" % (editor, name),
onerr=util.Abort, errprefix=_("edit failed"))
- f = open(name)
+ f = open(name, "rb")
t = f.read()
f.close()
finally:
diff --git a/git_crecord/util.py b/git_crecord/util.py
index 3481b18..f148415 100644
--- a/git_crecord/util.py
+++ b/git_crecord/util.py
@@ -15,10 +15,14 @@ import os
import subprocess
import shutil
import sys
+from typing import AnyStr, overload, Sequence, Optional
+
from . import encoding
+
closefds = os.name == 'posix'
+
def explainexit(code):
"""return a 2-tuple (desc, code) describing a subprocess status
(codes from kill are negative - not os.system/wait encoding)"""
@@ -27,9 +31,11 @@ def explainexit(code):
else:
return _("exited with status %d") % code, code
+
class Abort(Exception):
pass
+
def system(cmd, cwd=None, onerr=None, errprefix=None):
try:
sys.stdout.flush()
@@ -53,16 +59,38 @@ def system(cmd, cwd=None, onerr=None, errprefix=None):
raise onerr(errmsg)
return rc
-def systemcall(cmd, onerr=None, errprefix=None):
+
+@overload
+def systemcall(
+ cmd: Sequence[AnyStr],
+ encoding: str,
+ dir: Optional[os.PathLike] = None,
+ onerr=None,
+ errprefix=None
+) -> str:
+ ...
+
+
+@overload
+def systemcall(
+ cmd: Sequence[AnyStr],
+ dir: Optional[os.PathLike] = None,
+ onerr=None,
+ errprefix=None
+) -> bytes:
+ ...
+
+
+def systemcall(cmd, encoding=None, dir=None, onerr=None, errprefix=None):
try:
sys.stdout.flush()
except Exception:
pass
- p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=closefds)
- out = ''
+ p = subprocess.Popen(cmd, cwd=dir, stdout=subprocess.PIPE, close_fds=closefds)
+ out = b''
for line in iter(p.stdout.readline, b''):
- out = out + line.decode(encoding.encoding)
+ out = out + line
p.wait()
rc = p.returncode
@@ -73,7 +101,13 @@ def systemcall(cmd, onerr=None, errprefix=None):
errmsg = '%s: %s' % (errprefix, errmsg)
raise onerr(errmsg)
- return out
+ if encoding == "fs":
+ return os.fsdecode(out)
+ elif encoding:
+ return out.decode(encoding)
+ else:
+ return out
+
def copyfile(src, dest, hardlink=False, copystat=False):
'''copy a file, preserving mode and optionally other stat info like
@@ -103,10 +137,14 @@ def copyfile(src, dest, hardlink=False, copystat=False):
except shutil.Error as inst:
raise Abort(str(inst))
+
def ellipsis(text, maxlength=400):
"""Trim string to at most maxlength (default: 400) columns in display."""
return encoding.trim(text, maxlength, ellipsis='...')
+
_notset = object()
+
+
def safehasattr(thing, attr):
return getattr(thing, attr, _notset) is not _notset