summaryrefslogtreecommitdiff
path: root/git_crecord/crecord_core.py
diff options
context:
space:
mode:
Diffstat (limited to 'git_crecord/crecord_core.py')
-rw-r--r--git_crecord/crecord_core.py93
1 files changed, 50 insertions, 43 deletions
diff --git a/git_crecord/crecord_core.py b/git_crecord/crecord_core.py
index 0b02a43..ba4cee7 100644
--- a/git_crecord/crecord_core.py
+++ b/git_crecord/crecord_core.py
@@ -9,19 +9,22 @@
'''text-gui based change selection during commit or qrefresh'''
from gettext import gettext as _
-from . import encoding
-from . import util
+
import io
import errno
import os
import tempfile
import subprocess
+from typing import IO, cast
+
+from .crpatch import Header, parsepatch, filterpatch
+from .chunk_selector import chunkselector
+from .gitrepo import GitRepo
+from .util import Abort, system, closefds, copyfile
-from . import crpatch
-from . import chunk_selector
def dorecord(ui, repo, commitfunc, *pats, **opts):
- def recordfunc(ui, repo, message, match, opts):
+ def recordfunc(ui, repo: GitRepo, message, match, opts):
"""This is generic record driver.
Its job is to interactively filter local changes, and accordingly
@@ -51,21 +54,21 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
if not opts['index'] and repo.head():
git_base.append("HEAD")
- p = subprocess.Popen(git_args + git_base, stdout=subprocess.PIPE, close_fds=util.closefds)
- fp = p.stdout
+ p = subprocess.Popen(git_args + git_base, stdout=subprocess.PIPE, close_fds=closefds)
+ fp = cast(IO[bytes], p.stdout)
# 0. parse patch
fromfiles = set()
tofiles = set()
- chunks = crpatch.parsepatch(fp)
+ chunks = parsepatch(fp)
for c in chunks:
- if isinstance(c, crpatch.Header):
+ if isinstance(c, Header):
fromfile, tofile = c.files()
if fromfile is not None:
- fromfiles.add(fromfile)
+ fromfiles.add(os.fsdecode(fromfile))
if tofile is not None:
- tofiles.add(tofile)
+ tofiles.add(os.fsdecode(tofile))
added = tofiles - fromfiles
removed = fromfiles - tofiles
@@ -73,31 +76,31 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
changes = [modified, added, removed]
# 1. filter patch, so we have intending-to apply subset of it
- chunks = crpatch.filterpatch(opts,
- chunks,
- chunk_selector.chunkselector, ui)
+ chunks = filterpatch(opts,
+ chunks,
+ chunkselector, ui)
p.wait()
del fp
contenders = set()
for h in chunks:
- try:
- contenders.update(set(h.files()))
- except AttributeError:
- pass
+ fromfile, tofile = h.files()
+ if fromfile is not None:
+ contenders.add(os.fsdecode(fromfile))
+ if tofile is not None:
+ contenders.add(os.fsdecode(tofile))
changed = changes[0] | changes[1] | changes[2]
- newfiles = [f for f in changed if f in contenders]
+ newfiles: list = [f for f in changed if f in contenders]
if not newfiles:
- ui.status(_('no changes to record\n'))
+ ui.status(_('no changes to record'))
return 0
-
# 2. backup changed files, so we can restore them in the end
backups = {}
newly_added_backups = {}
- backupdir = os.path.join(repo.controldir(), 'record-backups')
+ backupdir = repo.controldir / 'record-backups'
try:
os.mkdir(backupdir)
except OSError as err:
@@ -112,19 +115,20 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
for f in newfiles:
if f not in (modified | added):
continue
- fd, tmpname = tempfile.mkstemp(prefix=f.replace('/', '_')+'.',
+ prefix = os.fsdecode(f).replace('/', '_') + '.'
+ fd, tmpname = tempfile.mkstemp(prefix=prefix,
dir=backupdir)
os.close(fd)
- ui.debug('backup %r as %r\n' % (f, tmpname))
+ ui.debug('backup %r as %r' % (f, tmpname))
pathname = os.path.join(repo.path, f)
if os.path.isfile(pathname):
- util.copyfile(pathname, tmpname)
+ copyfile(pathname, tmpname)
if f in modified:
backups[f] = tmpname
elif f in added:
newly_added_backups[f] = tmpname
- fp = io.StringIO()
+ fp = io.BytesIO()
all_backups = {}
all_backups.update(backups)
all_backups.update(newly_added_backups)
@@ -143,29 +147,32 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
# 3a. apply filtered patch to clean repo (clean)
if backups or any((f in contenders for f in removed)):
- util.system(['git', 'checkout', '-f'] + git_base + ['--'] + [f for f in newfiles if f not in added],
- onerr=util.Abort, errprefix=_("checkout failed"))
+ system(['git', 'checkout', '-f'] + git_base + ['--'] + [f for f in newfiles if f not in added],
+ onerr=Abort, errprefix=_("checkout failed"))
# remove newly added files from 'clean' repo (so patch can apply)
for f in newly_added_backups:
- pathname = os.path.join(repo.path, f)
- if os.path.isfile(pathname):
- os.unlink(pathname)
+ pathname = repo.path / f
+ pathname.unlink(missing_ok=True)
# 3b. (apply)
if dopatch:
try:
- ui.debug('applying patch\n')
- ui.debug(fp.getvalue())
- p = subprocess.Popen(["git", "apply", "--whitespace=nowarn"], stdin=subprocess.PIPE, close_fds=util.closefds)
- p.stdin.write(fp.read().encode(encoding.encoding))
+ ui.debug('applying patch')
+ ui.debug(fp.getvalue().decode("UTF-8", "hexreplace"))
+ p = subprocess.Popen(
+ ["git", "apply", "--whitespace=nowarn"],
+ stdin=subprocess.PIPE,
+ close_fds=closefds
+ )
+ p.stdin.write(fp.getvalue())
p.stdin.close()
p.wait()
except Exception as err:
s = str(err)
if s:
- raise util.Abort(s)
+ raise Abort(s)
else:
- raise util.Abort(_('patch failed to apply'))
+ raise Abort(_('patch failed to apply'))
del fp
# 4. We prepared working directory according to filtered patch.
@@ -173,12 +180,12 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
# it is important to first chdir to repo root -- we'll call a
# highlevel command with list of pathnames relative to repo root
- newfiles = [os.path.join(repo.path, n) for n in newfiles]
+ newfiles = [repo.path / n for n in newfiles]
if opts['operation'] == 'crecord':
ui.commit(*newfiles, **opts)
else:
ui.stage(*newfiles, **opts)
- ui.debug('previous staging contents backed up as tree %r\n' % index_backup.indextree)
+ ui.debug('previous staging contents backed up as tree %r' % index_backup.indextree)
index_backup = None
return 0
@@ -186,12 +193,12 @@ def dorecord(ui, repo, commitfunc, *pats, **opts):
# 5. finally restore backed-up files
try:
for realname, tmpname in backups.items():
- ui.debug('restoring %r to %r\n' % (tmpname, realname))
- util.copyfile(tmpname, os.path.join(repo.path, realname))
+ ui.debug('restoring %r to %r' % (tmpname, realname))
+ copyfile(tmpname, os.path.join(repo.path, realname))
os.unlink(tmpname)
for realname, tmpname in newly_added_backups.items():
- ui.debug('restoring %r to %r\n' % (tmpname, realname))
- util.copyfile(tmpname, os.path.join(repo.path, realname))
+ ui.debug('restoring %r to %r' % (tmpname, realname))
+ copyfile(tmpname, os.path.join(repo.path, realname))
os.unlink(tmpname)
os.rmdir(backupdir)
if index_backup: