From 1a73baa7952af30b2e9cb71a51d0d926bb59a515 Mon Sep 17 00:00:00 2001 From: Andrej Shadura Date: Sun, 6 Mar 2022 10:43:21 +0100 Subject: Process patches and file names without decoding them as UTF-8 --- git_crecord/chunk_selector.py | 17 +- git_crecord/crecord_core.py | 93 +++++------ git_crecord/crpatch.py | 352 ++++++++++++++++++++++++++---------------- git_crecord/gitrepo.py | 57 +++++-- git_crecord/main.py | 42 ++--- git_crecord/util.py | 48 +++++- 6 files changed, 383 insertions(+), 226 deletions(-) (limited to 'git_crecord') diff --git a/git_crecord/chunk_selector.py b/git_crecord/chunk_selector.py index d27679f..9685a6e 100644 --- a/git_crecord/chunk_selector.py +++ b/git_crecord/chunk_selector.py @@ -686,7 +686,7 @@ class CursesChunkSelector: return checkbox - def printheader(self, header, selected=False, towin=True, + def printheader(self, header: Header, selected=False, towin=True, ignorefolding=False): """ Print the header to the pad. If countLines is True, don't print @@ -726,7 +726,7 @@ class CursesChunkSelector: return outstr - def printhunklinesbefore(self, hunk, selected=False, towin=True, + def printhunklinesbefore(self, hunk: Hunk, selected=False, towin=True, ignorefolding=False): "includes start/end line indicator" outstr = "" @@ -745,11 +745,10 @@ class CursesChunkSelector: checkbox = self.getstatusprefixstring(hunk) lineprefix = " "*self.hunkindentnumchars + checkbox - frtoline = " " + hunk.getfromtoline().strip("\n") - + frtoline = " " + hunk.getfromtoline().decode("UTF-8", errors="hexreplace").strip("\n") outstr += self.printstring(self.chunkpad, lineprefix, towin=towin, - align=False) # add uncolored checkbox/indent + align=False) # add uncolored checkbox/indent outstr += self.printstring(self.chunkpad, frtoline, pair=colorpair, towin=towin) @@ -759,12 +758,12 @@ class CursesChunkSelector: # print out lines of the chunk preceding changed-lines for line in hunk.before: - linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line + linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line.decode("UTF-8", errors="hexreplace") outstr += self.printstring(self.chunkpad, linestr, towin=towin) return outstr - def printhunklinesafter(self, hunk, towin=True, ignorefolding=False): + def printhunklinesafter(self, hunk: Hunk, towin=True, ignorefolding=False): outstr = "" if hunk.folded and not ignorefolding: return outstr @@ -772,12 +771,12 @@ class CursesChunkSelector: # a bit superfluous, but to avoid hard-coding indent amount checkbox = self.getstatusprefixstring(hunk) for line in hunk.after: - linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line + linestr = " "*(self.hunklineindentnumchars + len(checkbox)) + line.decode("UTF-8", errors="hexreplace") outstr += self.printstring(self.chunkpad, linestr, towin=towin) return outstr - def printhunkchangedline(self, hunkline, selected=False, towin=True): + def printhunkchangedline(self, hunkline: HunkLine, selected=False, towin=True): outstr = "" checkbox = self.getstatusprefixstring(hunkline) diff --git a/git_crecord/crecord_core.py b/git_crecord/crecord_core.py index 0b02a43..ba4cee7 100644 --- a/git_crecord/crecord_core.py +++ b/git_crecord/crecord_core.py @@ -9,19 +9,22 @@ '''text-gui based change selection during commit or qrefresh''' from gettext import gettext as _ -from . import encoding -from . import util + import io import errno import os import tempfile import subprocess +from typing import IO, cast + +from .crpatch import Header, parsepatch, filterpatch +from .chunk_selector import chunkselector +from .gitrepo import GitRepo +from .util import Abort, system, closefds, copyfile -from . import crpatch -from . import chunk_selector def dorecord(ui, repo, commitfunc, *pats, **opts): - def recordfunc(ui, repo, message, match, opts): + def recordfunc(ui, repo: GitRepo, message, match, opts): """This is generic record driver. Its job is to interactively filter local changes, and accordingly @@ -51,21 +54,21 @@ def dorecord(ui, repo, commitfunc, *pats, **opts): if not opts['index'] and repo.head(): git_base.append("HEAD") - p = subprocess.Popen(git_args + git_base, stdout=subprocess.PIPE, close_fds=util.closefds) - fp = p.stdout + p = subprocess.Popen(git_args + git_base, stdout=subprocess.PIPE, close_fds=closefds) + fp = cast(IO[bytes], p.stdout) # 0. parse patch fromfiles = set() tofiles = set() - chunks = crpatch.parsepatch(fp) + chunks = parsepatch(fp) for c in chunks: - if isinstance(c, crpatch.Header): + if isinstance(c, Header): fromfile, tofile = c.files() if fromfile is not None: - fromfiles.add(fromfile) + fromfiles.add(os.fsdecode(fromfile)) if tofile is not None: - tofiles.add(tofile) + tofiles.add(os.fsdecode(tofile)) added = tofiles - fromfiles removed = fromfiles - tofiles @@ -73,31 +76,31 @@ def dorecord(ui, repo, commitfunc, *pats, **opts): changes = [modified, added, removed] # 1. filter patch, so we have intending-to apply subset of it - chunks = crpatch.filterpatch(opts, - chunks, - chunk_selector.chunkselector, ui) + chunks = filterpatch(opts, + chunks, + chunkselector, ui) p.wait() del fp contenders = set() for h in chunks: - try: - contenders.update(set(h.files())) - except AttributeError: - pass + fromfile, tofile = h.files() + if fromfile is not None: + contenders.add(os.fsdecode(fromfile)) + if tofile is not None: + contenders.add(os.fsdecode(tofile)) changed = changes[0] | changes[1] | changes[2] - newfiles = [f for f in changed if f in contenders] + newfiles: list = [f for f in changed if f in contenders] if not newfiles: - ui.status(_('no changes to record\n')) + ui.status(_('no changes to record')) return 0 - # 2. backup changed files, so we can restore them in the end backups = {} newly_added_backups = {} - backupdir = os.path.join(repo.controldir(), 'record-backups') + backupdir = repo.controldir / 'record-backups' try: os.mkdir(backupdir) except OSError as err: @@ -112,19 +115,20 @@ def dorecord(ui, repo, commitfunc, *pats, **opts): for f in newfiles: if f not in (modified | added): continue - fd, tmpname = tempfile.mkstemp(prefix=f.replace('/', '_')+'.', + prefix = os.fsdecode(f).replace('/', '_') + '.' + fd, tmpname = tempfile.mkstemp(prefix=prefix, dir=backupdir) os.close(fd) - ui.debug('backup %r as %r\n' % (f, tmpname)) + ui.debug('backup %r as %r' % (f, tmpname)) pathname = os.path.join(repo.path, f) if os.path.isfile(pathname): - util.copyfile(pathname, tmpname) + copyfile(pathname, tmpname) if f in modified: backups[f] = tmpname elif f in added: newly_added_backups[f] = tmpname - fp = io.StringIO() + fp = io.BytesIO() all_backups = {} all_backups.update(backups) all_backups.update(newly_added_backups) @@ -143,29 +147,32 @@ def dorecord(ui, repo, commitfunc, *pats, **opts): # 3a. apply filtered patch to clean repo (clean) if backups or any((f in contenders for f in removed)): - util.system(['git', 'checkout', '-f'] + git_base + ['--'] + [f for f in newfiles if f not in added], - onerr=util.Abort, errprefix=_("checkout failed")) + system(['git', 'checkout', '-f'] + git_base + ['--'] + [f for f in newfiles if f not in added], + onerr=Abort, errprefix=_("checkout failed")) # remove newly added files from 'clean' repo (so patch can apply) for f in newly_added_backups: - pathname = os.path.join(repo.path, f) - if os.path.isfile(pathname): - os.unlink(pathname) + pathname = repo.path / f + pathname.unlink(missing_ok=True) # 3b. (apply) if dopatch: try: - ui.debug('applying patch\n') - ui.debug(fp.getvalue()) - p = subprocess.Popen(["git", "apply", "--whitespace=nowarn"], stdin=subprocess.PIPE, close_fds=util.closefds) - p.stdin.write(fp.read().encode(encoding.encoding)) + ui.debug('applying patch') + ui.debug(fp.getvalue().decode("UTF-8", "hexreplace")) + p = subprocess.Popen( + ["git", "apply", "--whitespace=nowarn"], + stdin=subprocess.PIPE, + close_fds=closefds + ) + p.stdin.write(fp.getvalue()) p.stdin.close() p.wait() except Exception as err: s = str(err) if s: - raise util.Abort(s) + raise Abort(s) else: - raise util.Abort(_('patch failed to apply')) + raise Abort(_('patch failed to apply')) del fp # 4. We prepared working directory according to filtered patch. @@ -173,12 +180,12 @@ def dorecord(ui, repo, commitfunc, *pats, **opts): # it is important to first chdir to repo root -- we'll call a # highlevel command with list of pathnames relative to repo root - newfiles = [os.path.join(repo.path, n) for n in newfiles] + newfiles = [repo.path / n for n in newfiles] if opts['operation'] == 'crecord': ui.commit(*newfiles, **opts) else: ui.stage(*newfiles, **opts) - ui.debug('previous staging contents backed up as tree %r\n' % index_backup.indextree) + ui.debug('previous staging contents backed up as tree %r' % index_backup.indextree) index_backup = None return 0 @@ -186,12 +193,12 @@ def dorecord(ui, repo, commitfunc, *pats, **opts): # 5. finally restore backed-up files try: for realname, tmpname in backups.items(): - ui.debug('restoring %r to %r\n' % (tmpname, realname)) - util.copyfile(tmpname, os.path.join(repo.path, realname)) + ui.debug('restoring %r to %r' % (tmpname, realname)) + copyfile(tmpname, os.path.join(repo.path, realname)) os.unlink(tmpname) for realname, tmpname in newly_added_backups.items(): - ui.debug('restoring %r to %r\n' % (tmpname, realname)) - util.copyfile(tmpname, os.path.join(repo.path, realname)) + ui.debug('restoring %r to %r' % (tmpname, realname)) + copyfile(tmpname, os.path.join(repo.path, realname)) os.unlink(tmpname) os.rmdir(backupdir) if index_backup: diff --git a/git_crecord/crpatch.py b/git_crecord/crpatch.py index d7b3555..bd97f72 100644 --- a/git_crecord/crpatch.py +++ b/git_crecord/crpatch.py @@ -5,35 +5,47 @@ from gettext import gettext as _ import io import re +from codecs import register_error -from typing import IO, Iterator, Optional +from typing import IO, Iterator, Optional, Sequence, Union -lines_re = re.compile(r'@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@\s*(.*)') +lines_re = re.compile(b'@@ -(\\d+)(?:,(\\d+))? \\+(\\d+)(?:,(\\d+))? @@\\s*(.*)') class PatchError(Exception): pass +def hexreplace(err: UnicodeError) -> tuple[str, int]: + if not isinstance(err, UnicodeDecodeError): + raise NotImplementedError("only decoding is supported") + return "".join( + "<%X>" % x for x in err.object[err.start:err.end] + ), err.end + + +register_error("hexreplace", hexreplace) + + class LineReader: # simple class to allow pushing lines back into the input stream def __init__(self, fp: IO[bytes]): self.fp = fp - self.buf: list[str] = [] + self.buf: list[bytes] = [] - def push(self, line: str) -> None: + def push(self, line: bytes) -> None: if line is not None: self.buf.append(line) - def readline(self) -> str: + def readline(self) -> bytes: if self.buf: line = self.buf[0] del self.buf[0] return line - return self.fp.readline().decode('UTF-8') + return self.fp.readline() - def __iter__(self) -> Iterator[str]: - return iter(self.readline, '') + def __iter__(self) -> Iterator[bytes]: + return iter(self.readline, b'') def scanpatch(fp: IO[bytes]): @@ -47,7 +59,7 @@ def scanpatch(fp: IO[bytes]): >>> rawpatch = b'''diff --git a/folder1/g b/folder1/g ... --- a/folder1/g ... +++ b/folder1/g - ... @@ -1,8 +1,10 @@ + ... @@ -1,8 +1,10 @@ some context ... 1 ... 2 ... -3 @@ -62,30 +74,30 @@ def scanpatch(fp: IO[bytes]): >>> fp = io.BytesIO(rawpatch) >>> list(scanpatch(fp)) [('file', - ['diff --git a/folder1/g b/folder1/g\n', - '--- a/folder1/g\n', - '+++ b/folder1/g\n']), + [b'diff --git a/folder1/g b/folder1/g\n', + b'--- a/folder1/g\n', + b'+++ b/folder1/g\n']), ('range', - ('1', '8', '1', '10', '')), + (b'1', b'8', b'1', b'10', b'some context')), ('context', - [' 1\n', ' 2\n']), + [b' 1\n', b' 2\n']), ('hunk', - ['-3\n']), + [b'-3\n']), ('context', - [' 4\n', ' 5\n', ' 6\n']), + [b' 4\n', b' 5\n', b' 6\n']), ('hunk', - ['+6.1\n', '+6.2\n']), + [b'+6.1\n', b'+6.2\n']), ('context', - [' 7\n', ' 8\n']), + [b' 7\n', b' 8\n']), ('hunk', - ['+9'])] + [b'+9'])] """ lr = LineReader(fp) - def scanwhile(first, p) -> list[str]: + def scanwhile(first: bytes, p) -> list[bytes]: """scan lr while predicate holds""" lines = [first] - for line in iter(lr.readline, ''): + for line in iter(lr.readline, b''): if p(line): lines.append(line) else: @@ -93,24 +105,24 @@ def scanpatch(fp: IO[bytes]): break return lines - for line in iter(lr.readline, ''): - if line.startswith('diff --git a/'): - def notheader(line: str) -> bool: + for line in iter(lr.readline, b''): + if line.startswith(b'diff --git a/'): + def notheader(line: bytes) -> bool: s = line.split(None, 1) - return not s or s[0] not in ('---', 'diff') + return not s or s[0] not in (b'---', b'diff') header = scanwhile(line, notheader) fromfile = lr.readline() - if fromfile.startswith('---'): + if fromfile.startswith(b'---'): tofile = lr.readline() header += [fromfile, tofile] else: lr.push(fromfile) yield 'file', header - elif line.startswith(' '): - yield 'context', scanwhile(line, lambda l: l[0] in ' \\') - elif line[0] in '-+': - yield 'hunk', scanwhile(line, lambda l: l[0] in '-+\\') + elif line.startswith(b' '): + yield 'context', scanwhile(line, lambda l: l[0] in b' \\') + elif line[0] in b'-+': + yield 'hunk', scanwhile(line, lambda l: l[0] in b'-+\\') else: m = lines_re.match(line) if m: @@ -124,7 +136,8 @@ class PatchNode: (i.e. PatchRoot, header, hunk, HunkLine) """ - folded: bool = False + folded: bool + # a patch this node belongs to patch: 'PatchRoot' def firstchild(self): @@ -133,7 +146,7 @@ class PatchNode: def lastchild(self): raise NotImplementedError("method must be implemented by subclass") - def allchildren(self) -> list['PatchNode']: + def allchildren(self) -> Sequence['PatchNode']: """Return a list of all direct children of this node""" raise NotImplementedError("method must be implemented by subclass") @@ -235,6 +248,18 @@ class PatchNode: # try parent (or None) return self.parentitem() + def write(self, fp: IO[bytes]) -> None: + """Write the unified diff-formatter representation of the + patch node into the binary stream""" + raise NotImplementedError("method must be implemented by subclass") + + def __bytes__(self) -> bytes: + """Return the unified diff-formatter representation of the + patch node as bytes""" + with io.BytesIO() as b: + self.write(b) + return b.getvalue() + class PatchRoot(PatchNode, list): """List of header objects representing the patch.""" @@ -249,10 +274,10 @@ class PatchRoot(PatchNode, list): class Header(PatchNode): """Patch header""" - diff_re = re.compile('diff --git a/(.*) b/(.*)$') - allhunks_re = re.compile('(?:GIT binary patch|new file|deleted file) ') - pretty_re = re.compile('(?:new file|deleted file) ') - special_re = re.compile('(?:GIT binary patch|new|deleted|copy|rename) ') + diff_re = re.compile(b'diff --git a/(.*) b/(.*)$') + allhunks_re = re.compile(b'(?:GIT binary patch|new file|deleted file) ') + pretty_re = re.compile(b'(?:new file|deleted file) ') + special_re = re.compile(b'(?:GIT binary patch|new|deleted|copy|rename) ') def __init__(self, header): self.header = header @@ -266,9 +291,6 @@ class Header(PatchNode): # flag to indicate whether to display as folded/unfolded to user self.folded = True - # list of all headers in patch - self.patch = None - # flag is False if this header was ever unfolded from initial state self.neverunfolded = True @@ -281,34 +303,37 @@ class Header(PatchNode): Otherwise return False. """ - return any(h.startswith('GIT binary patch') for h in self.header) + return any(h.startswith(b'GIT binary patch') for h in self.header) - def pretty(self, fp): + def pretty(self, fp: IO[str]): for h in self.header: - if h.startswith('GIT binary patch'): + if h.startswith(b'GIT binary patch'): fp.write(_('this modifies a binary file (all or nothing)\n')) break if self.pretty_re.match(h): - fp.write(h) + fp.write(h.decode("UTF-8", errors="hexreplace")) if self.binary(): fp.write(_('this is a binary file\n')) break - if h.startswith('---'): + if h.startswith(b'---'): fp.write(_('%d hunks, %d lines changed\n') % (len(self.hunks), sum([max(h.added, h.removed) for h in self.hunks]))) break - fp.write(h) + fp.write(h.decode("UTF-8", errors="hexreplace")) - def prettystr(self): - x = io.StringIO() - self.pretty(x) - return x.getvalue() + def prettystr(self) -> str: + return str(self) - def write(self, fp): - fp.write(''.join(self.header)) + def __str__(self) -> str: + with io.StringIO() as s: + self.pretty(s) + return s.getvalue() - def allhunks(self): + def write(self, fp: IO[bytes]) -> None: + fp.write(b''.join(self.header)) + + def allhunks(self) -> bool: """ Return True if the file which the header represents was changed completely (i.e. there is no possibility of applying a hunk of changes @@ -324,33 +349,35 @@ class Header(PatchNode): fromfile = None return [fromfile, tofile] - def filename(self): + def filename(self) -> str: files = self.files() - return files[1] or files[0] + return (files[1] or files[0]).decode("UTF-8", errors="hexreplace") def __repr__(self) -> str: - return '
' % (' '.join(map(repr, self.files()))) + return '
' % (' '.join( + repr(x) for x in self.files() + )) - def special(self): + def special(self) -> bool: return any(self.special_re.match(h) for h in self.header) @property - def changetype(self): + def changetype(self) -> str: if self._changetype is None: self._changetype = "M" for h in self.header: - if h.startswith('new file'): + if h.startswith(b'new file'): self._changetype = "A" - elif h.startswith('deleted file'): + elif h.startswith(b'deleted file'): self._changetype = "D" - elif h.startswith('copy from'): + elif h.startswith(b'copy from'): self._changetype = "C" - elif h.startswith('rename from'): + elif h.startswith(b'rename from'): self._changetype = "R" return self._changetype - def nextsibling(self): + def nextsibling(self) -> Optional['Header']: numheadersinpatch = len(self.patch) indexofthisheader = self.patch.index(self) @@ -360,7 +387,7 @@ class Header(PatchNode): else: return None - def prevsibling(self): + def prevsibling(self) -> Optional['Header']: indexofthisheader = self.patch.index(self) if indexofthisheader > 0: previousheader = self.patch[indexofthisheader - 1] @@ -368,7 +395,7 @@ class Header(PatchNode): else: return None - def parentitem(self): + def parentitem(self) -> None: """ There is no 'real' parent item of a header that can be selected, so return None. @@ -389,7 +416,7 @@ class Header(PatchNode): else: return None - def allchildren(self): + def allchildren(self) -> Sequence['Hunk']: """Return a list of all direct children of this node""" return self.hunks @@ -397,7 +424,7 @@ class Header(PatchNode): class HunkLine(PatchNode): """Represents a changed line in a hunk""" - def __init__(self, linetext, hunk): + def __init__(self, linetext: bytes, hunk): self.linetext = linetext self.applied = True # the parent hunk to which this line belongs @@ -406,8 +433,21 @@ class HunkLine(PatchNode): # in the prevItem method. self.folded = False - def prettystr(self): - return self.linetext + def __bytes__(self): + if self.applied: + return self.linetext + else: + return b' ' + self.linetext[1:] + + @property + def diffop(self): + return self.linetext[0:1] + + def __str__(self) -> str: + return self.prettystr() + + def prettystr(self) -> str: + return self.linetext.decode("UTF-8", errors="hexreplace") def nextsibling(self): numlinesinhunk = len(self.hunk.changedlines) @@ -446,8 +486,24 @@ class HunkLine(PatchNode): class Hunk(PatchNode): """ui patch hunk, wraps a hunk and keeps track of ui behavior """ maxcontext = 3 - - def __init__(self, header: Header, fromline, toline, proc, before, hunklines, after): + header: Header + fromline: int + toline: int + proc: bytes + after: Sequence[bytes] + before: Sequence[bytes] + changedlines: Sequence[HunkLine] + + def __init__( + self, + header: Header, + fromline: int, + toline: int, + proc: bytes, + before: Sequence[bytes], + hunklines: Sequence[bytes], + after: Sequence[bytes] + ): def trimcontext(number, lines): delta = len(lines) - self.maxcontext if False and delta > 0: @@ -509,25 +565,25 @@ class Hunk(PatchNode): else: return None - def allchildren(self) -> list[HunkLine]: + def allchildren(self) -> Sequence[PatchNode]: """Return a list of all direct children of this node""" return self.changedlines def countchanges(self) -> tuple[int, int]: """changedlines -> (n+,n-)""" add = len([line for line in self.changedlines if line.applied - and line.prettystr().startswith('+')]) + and line.diffop == b'+']) rem = len([line for line in self.changedlines if line.applied - and line.prettystr().startswith('-')]) + and line.diffop == b'-']) return add, rem def getfromtoline(self): - # calculate the number of removed lines converted to context lines + """Calculate the number of removed lines converted to context lines""" removedconvertedtocontext = self.originalremoved - self.removed contextlen = (len(self.before) + len(self.after) + removedconvertedtocontext) - if self.after and self.after[-1] == '\\ No newline at end of file\n': + if self.after and self.after[-1] == b'\\ No newline at end of file\n': contextlen -= 1 fromlen = contextlen + self.removed tolen = contextlen + self.added @@ -539,75 +595,75 @@ class Hunk(PatchNode): # So, if either of hunks is empty, decrease its line start. --immerrr # But only do this if fromline > 0, to avoid having, e.g fromline=-1. fromline, toline = self.fromline, self.toline - if fromline != 0: - if fromlen == 0: - fromline -= 1 + if fromlen == 0 and fromline > 0: + fromline -= 1 if tolen == 0 and toline > 0: toline -= 1 - fromtoline = '@@ -%d,%d +%d,%d @@%s\n' % ( + fromtoline = b'@@ -%d,%d +%d,%d @@%b\n' % ( fromline, fromlen, toline, tolen, - self.proc and (' ' + self.proc)) + self.proc and (b' ' + self.proc)) + return fromtoline - def write(self, fp) -> None: + def write(self, fp: IO[bytes]) -> None: # updated self.added/removed, which are used by getfromtoline() self.added, self.removed = self.countchanges() fp.write(self.getfromtoline()) + fp.write(b''.join(self.before)) - hunklinelist = [] # add the following to the list: (1) all applied lines, and # (2) all unapplied removal lines (convert these to context lines) for changedline in self.changedlines: - changedlinestr = changedline.prettystr() - if changedline.applied: - hunklinelist.append(changedlinestr) - elif changedlinestr.startswith("-"): - hunklinelist.append(" " + changedlinestr[1:]) + fp.write(bytes(changedline)) - fp.write(''.join(self.before + hunklinelist + self.after)) + fp.write(b''.join(self.after)) def reversehunks(self) -> 'Hunk': - """Make the hunk apply in the other direction.""" - m = {'+': '-', '-': '+', '\\': '\\'} - hunklines = ['%s%s' % (m[line.prettystr()[0:1]], line.prettystr()[1:]) + r"""Make the hunk apply in the other direction. + + >>> header = Header([b'diff --git a/file b/file\n']) + >>> print(Hunk( + ... header, + ... fromline=1, + ... toline=2, + ... proc=b'context', + ... before=[b' 1\n', b' 2\n'], + ... hunklines=[b'-3\n'], + ... after=[b' 4\n', b' 5\n'], + ... ).reversehunks().prettystr()) + @@ -1,4 +2,5 @@ context + 1 + 2 + +3 + 4 + 5 + """ + m = {b'+': b'-', b'-': b'+', b'\\': b'\\'} + hunklines = [b'%s%s' % (m[line.linetext[0:1]], line.linetext[1:]) for line in self.changedlines if line.applied] return Hunk(self.header, self.fromline, self.toline, self.proc, self.before, hunklines, self.after) - def unapplyhunks(self) -> 'Hunk': - """Unapply the hunk. - - If the hunk is not applied, then the hunk is returned as it appears in the patch file. - If the hunk is applied, then the hunk is returned with the '+' lines changed to ' ' lines - - :return: A new Hunk object with the changes applied. - """ - m = {'+': '-', '-': '+', '\\': '\\'} - hunklinelist = [] - for changedline in self.changedlines: - changedlinestr = changedline.prettystr() - if not changedline.applied: - hunklinelist.append('%s%s' % (m[changedlinestr[0]], changedlinestr[1:])) - elif changedlinestr.startswith("+"): - hunklinelist.append(" " + changedlinestr[1:]) - return Hunk(self.header, self.fromline, self.toline, self.proc, self.before, hunklinelist, self.after) - - pretty = write + def files(self) -> list[Optional[bytes]]: + return self.header.files() - def filename(self): + def filename(self) -> str: return self.header.filename() + def __str__(self) -> str: + return self.prettystr() + def prettystr(self) -> str: - x = io.StringIO() - self.pretty(x) - return x.getvalue() + x = io.BytesIO() + self.write(x) + return x.getvalue().decode("UTF-8", errors="hexreplace") def __repr__(self) -> str: - return '' % (self.filename(), self.fromline) + return '' % (self.files()[1] or self.files()[0], self.fromline) def parsepatch(fp: IO[bytes]): - """Parse a patch, returning a list of header and hunk objects. + r"""Parse a patch, returning a list of header and hunk objects. >>> rawpatch = b'''diff --git a/folder1/g b/folder1/g ... --- a/folder1/g @@ -630,20 +686,23 @@ def parsepatch(fp: IO[bytes]): Headers and hunks are interspersed in the list returned from the function: >>> headers - [
, - , - , - ] + [
, + , + , + ] + + >>> headers[0].filename() + 'folder1/g' Each header also provides a list of hunks belonging to it: >>> headers[0].hunks - [, - , - ] - >>> out = io.StringIO() + [, + , + ] + >>> out = io.BytesIO() >>> for header in headers: ... header.write(out) - >>> print(out.getvalue()) + >>> print(out.getvalue().decode("ascii")) diff --git a/folder1/g b/folder1/g --- a/folder1/g +++ b/folder1/g @@ -661,23 +720,54 @@ def parsepatch(fp: IO[bytes]): 8 @@ -8,0 +10,1 @@ +9 + + It is possible to handle non-UTF-8 patches: + >>> rawpatch = b'''diff --git a/test b/test + ... --- /dev/null + ... +++ b/test + ... @@ -0,0 +1,2 @@ + ... +\xCD\xCE\xCD-\xD3\xD2\xD4-8 \xF2\xE5\xF1\xF2 + ... +test''' + >>> fp = io.BytesIO(rawpatch) + >>> headers = parsepatch(fp) + >>> out = io.BytesIO() + >>> for header in headers: + ... header.write(out) + + Non-UTF-8 characters survive the roundtrip: + >>> print(out.getvalue().decode("cp1251")) + diff --git a/test b/test + --- /dev/null + +++ b/test + @@ -0,0 +1,2 @@ + +НОН-УТФ-8 тест + +test + + When pretty-printing the hunk, they get replaced with their + hexadecimal codes: + >>> print(headers[0].hunks[0]) + @@ -0,0 +1,2 @@ + +--8 + +test """ class Parser: """patch parsing state machine""" + header: Header + headers: Sequence[Union[Header, Hunk]] + def __init__(self): self.fromline = 0 self.toline = 0 - self.proc = '' - self.header = None + self.proc = b'' self.context = [] self.before = [] self.hunk = [] self.headers = [] def addrange(self, limits): - "Store range line info to associated instance variables." + """Store range line info to associated instance variables.""" fromstart, fromend, tostart, toend, proc = limits self.fromline = int(fromstart) self.toline = int(tostart) @@ -707,7 +797,7 @@ def parsepatch(fp: IO[bytes]): self.before = [] self.hunk = [] self.context = [] - self.proc = '' + self.proc = b'' def addcontext(self, context): """ @@ -738,7 +828,7 @@ def parsepatch(fp: IO[bytes]): self.before = self.context self.context = [] - def newfile(self, hdr): + def newfile(self, header): """ Create a header object containing the header lines, and the filename the header applies to. Add the header to self.headers. @@ -750,7 +840,7 @@ def parsepatch(fp: IO[bytes]): self.add_new_hunk() # create a new header and add it to self.header - h = Header(hdr) + h = Header(header) self.headers.append(h) self.header = h diff --git a/git_crecord/gitrepo.py b/git_crecord/gitrepo.py index c5f1843..0a3a59a 100644 --- a/git_crecord/gitrepo.py +++ b/git_crecord/gitrepo.py @@ -1,9 +1,14 @@ import os import sys +from pathlib import Path +from typing import Optional + from . import util INDEX_FILENAME = "index" +ObjectHash = str + class GitTree: def __init__(self, tree): self._tree = tree @@ -23,41 +28,59 @@ class GitIndex: def __repr__(self): return "%s(%r, %r)" % (self.__class__.__name__, self._filename, self.indextree) - def commit(self): - return util.systemcall(['git', 'write-tree'], onerr=RuntimeError).rstrip('\n') + def commit(self) -> ObjectHash: + return util.systemcall( + ['git', 'write-tree'], + onerr=RuntimeError, + encoding="ascii", + ).rstrip('\n') def write(self): GitTree(self.indextree).read() - def backup_tree(self): + def backup_tree(self) -> ObjectHash: try: self.indextree = self.commit() except RuntimeError as inst: raise util.Abort('failed to read the index: %s' % inst) return self.indextree + class GitRepo: - def __init__(self, path): + def __init__(self, path: Optional[os.PathLike]): try: - self.path = util.systemcall(['git', 'rev-parse', '--show-toplevel'], - onerr=util.Abort).rstrip('\n') - self._controldir = util.systemcall(['git', 'rev-parse', '--git-dir']).rstrip('\n') - if not os.path.isdir(self._controldir): + self.path = Path(util.systemcall( + ['git', 'rev-parse', '--show-toplevel'], + dir=path, + encoding="fs", + onerr=util.Abort + ).rstrip('\n')) + self._controldir = Path(util.systemcall( + ['git', 'rev-parse', '--git-dir'], + dir=path, + encoding="fs", + ).rstrip('\n')) + if not self._controldir.is_dir(): raise util.Abort except util.Abort: sys.exit(1) def __repr__(self): - return "%s(%r)" % (self.__class__.__name__, self.path) + return "%s(%s)" % (self.__class__.__name__, self.path) - def controldir(self): - return os.path.abspath(self._controldir) + @property + def controldir(self) -> Path: + return self._controldir.resolve() - def index_path(self): - return os.path.join(self.controldir(), INDEX_FILENAME) + @property + def index_path(self) -> Path: + return self.controldir / INDEX_FILENAME - def open_index(self): - return GitIndex(self.index_path()) + def open_index(self) -> GitIndex: + return GitIndex(self.index_path) - def head(self): - return util.systemcall(['git', 'rev-parse', '--verify', '-q', 'HEAD']).rstrip('\n') + def head(self) -> ObjectHash: + return util.systemcall( + ['git', 'rev-parse', '--verify', '-q', 'HEAD'], + encoding="ascii", + ).rstrip('\n') diff --git a/git_crecord/main.py b/git_crecord/main.py index 0e3228c..bad5f8f 100644 --- a/git_crecord/main.py +++ b/git_crecord/main.py @@ -10,7 +10,11 @@ import argparse class Config: def get(self, section, item, default=None): try: - return util.systemcall(['git', 'config', '--get', '%s.%s' % (section, item)], onerr=KeyError).rstrip('\n') + return util.systemcall( + ['git', 'config', '--get', '%s.%s' % (section, item)], + onerr=KeyError, + encoding="UTF-8", + ).rstrip('\n') except KeyError: return default @@ -27,29 +31,25 @@ class Ui: except KeyError: self._username = None - def debug(self, *msg, **opts): - if self.debuglevel < 2: + def print_message(self, *msg, debuglevel: int, **opts): + if self.debuglevel < debuglevel: return - for m in msg: - sys.stdout.write(m) - def info(self, *msg, **opts): - if self.debuglevel < 1: - return sys.stdout.flush() - for m in msg: - sys.stderr.write(m) + print(*msg, **opts, file=sys.stderr) sys.stderr.flush() - def status(self, *msg, **opts): - for m in msg: - sys.stdout.write(m) + def debug(self, *msg, **opts): + self.print_message(*msg, debuglevel=2, **opts) + + def info(self, *msg, **opts): + self.print_message(*msg, debuglevel=1, **opts) def warn(self, *msg, **opts): - sys.stdout.flush() - for m in msg: - sys.stderr.write(m) - sys.stderr.flush() + self.print_message(*msg, debuglevel=0, **opts) + + def status(self, *msg, **opts): + print(*msg, **opts) def setdebuglevel(self, level): self.debuglevel = level @@ -69,16 +69,16 @@ class Ui: os.environ.get("VISUAL") or os.environ.get("EDITOR", editor)) - def edit(self, text, user, extra=None, name=None): + def edit(self, text: bytes, user, extra=None, name=None) -> bytes: fd = None if name is None: (fd, name) = tempfile.mkstemp(prefix='git-crecord-', suffix=".txt", text=True) try: if fd is not None: - f = os.fdopen(fd, "w") + f = os.fdopen(fd, "wb") else: - f = open(name, "w") + f = open(name, "wb") f.write(text) f.close() @@ -87,7 +87,7 @@ class Ui: util.system("%s \"%s\"" % (editor, name), onerr=util.Abort, errprefix=_("edit failed")) - f = open(name) + f = open(name, "rb") t = f.read() f.close() finally: diff --git a/git_crecord/util.py b/git_crecord/util.py index 3481b18..f148415 100644 --- a/git_crecord/util.py +++ b/git_crecord/util.py @@ -15,10 +15,14 @@ import os import subprocess import shutil import sys +from typing import AnyStr, overload, Sequence, Optional + from . import encoding + closefds = os.name == 'posix' + def explainexit(code): """return a 2-tuple (desc, code) describing a subprocess status (codes from kill are negative - not os.system/wait encoding)""" @@ -27,9 +31,11 @@ def explainexit(code): else: return _("exited with status %d") % code, code + class Abort(Exception): pass + def system(cmd, cwd=None, onerr=None, errprefix=None): try: sys.stdout.flush() @@ -53,16 +59,38 @@ def system(cmd, cwd=None, onerr=None, errprefix=None): raise onerr(errmsg) return rc -def systemcall(cmd, onerr=None, errprefix=None): + +@overload +def systemcall( + cmd: Sequence[AnyStr], + encoding: str, + dir: Optional[os.PathLike] = None, + onerr=None, + errprefix=None +) -> str: + ... + + +@overload +def systemcall( + cmd: Sequence[AnyStr], + dir: Optional[os.PathLike] = None, + onerr=None, + errprefix=None +) -> bytes: + ... + + +def systemcall(cmd, encoding=None, dir=None, onerr=None, errprefix=None): try: sys.stdout.flush() except Exception: pass - p = subprocess.Popen(cmd, stdout=subprocess.PIPE, close_fds=closefds) - out = '' + p = subprocess.Popen(cmd, cwd=dir, stdout=subprocess.PIPE, close_fds=closefds) + out = b'' for line in iter(p.stdout.readline, b''): - out = out + line.decode(encoding.encoding) + out = out + line p.wait() rc = p.returncode @@ -73,7 +101,13 @@ def systemcall(cmd, onerr=None, errprefix=None): errmsg = '%s: %s' % (errprefix, errmsg) raise onerr(errmsg) - return out + if encoding == "fs": + return os.fsdecode(out) + elif encoding: + return out.decode(encoding) + else: + return out + def copyfile(src, dest, hardlink=False, copystat=False): '''copy a file, preserving mode and optionally other stat info like @@ -103,10 +137,14 @@ def copyfile(src, dest, hardlink=False, copystat=False): except shutil.Error as inst: raise Abort(str(inst)) + def ellipsis(text, maxlength=400): """Trim string to at most maxlength (default: 400) columns in display.""" return encoding.trim(text, maxlength, ellipsis='...') + _notset = object() + + def safehasattr(thing, attr): return getattr(thing, attr, _notset) is not _notset -- cgit v1.2.3