1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# util.py - utility functions from Mercurial
#
# Copyright 2006, 2015 Matt Mackall <mpm@selenic.com>
# Copyright 2007 Eric St-Jean <esj@wwd.ca>
# Copyright 2009, 2011 Mads Kiilerich <mads@kiilerich.com>
# Copyright 2015 Pierre-Yves David <pierre-yves.david@fb.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version
from __future__ import unicode_literals
from gettext import gettext as _
import os
import subprocess
import shutil
import sys
from typing import AnyStr, overload, Sequence, Optional
from . import encoding
closefds = os.name == 'posix'
def explainexit(code):
"""return a 2-tuple (desc, code) describing a subprocess status
(codes from kill are negative - not os.system/wait encoding)"""
if (code < 0) and (os.name == 'posix'):
return _("killed by signal %d") % -code, -code
else:
return _("exited with status %d") % code, code
class Abort(Exception):
pass
def system(cmd, cwd=None, onerr=None, errprefix=None):
try:
sys.stdout.flush()
except Exception:
pass
if isinstance(cmd, list):
shell = False
prog = os.path.basename(cmd[0])
else:
shell = True
prog = os.path.basename(cmd.split(None, 1)[0])
rc = subprocess.call(cmd, shell=shell, close_fds=closefds,
cwd=cwd)
if rc and onerr:
errmsg = '%s %s' % (prog,
explainexit(rc)[0])
if errprefix:
errmsg = '%s: %s' % (errprefix, errmsg)
raise onerr(errmsg)
return rc
@overload
def systemcall(
cmd: Sequence[AnyStr],
encoding: str,
dir: Optional[os.PathLike] = None,
onerr=None,
errprefix=None
) -> str:
...
@overload
def systemcall(
cmd: Sequence[AnyStr],
dir: Optional[os.PathLike] = None,
onerr=None,
errprefix=None
) -> bytes:
...
def systemcall(cmd, encoding=None, dir=None, onerr=None, errprefix=None):
try:
sys.stdout.flush()
except Exception:
pass
p = subprocess.Popen(cmd, cwd=dir, stdout=subprocess.PIPE, close_fds=closefds)
out = b''
for line in iter(p.stdout.readline, b''):
out = out + line
p.wait()
rc = p.returncode
if rc and onerr:
errmsg = '%s %s' % (os.path.basename(cmd[0]),
explainexit(rc)[0])
if errprefix:
errmsg = '%s: %s' % (errprefix, errmsg)
raise onerr(errmsg)
if encoding == "fs":
return os.fsdecode(out)
elif encoding:
return out.decode(encoding)
else:
return out
def copyfile(src, dest, hardlink=False, copystat=False):
'''copy a file, preserving mode and optionally other stat info like
atime/mtime'''
if os.path.lexists(dest):
os.unlink(dest)
# hardlinks are problematic on CIFS, quietly ignore this flag
# until we find a way to work around it cleanly (issue4546)
if False and hardlink:
try:
os.link(src, dest)
return
except (IOError, OSError):
pass # fall back to normal copy
if os.path.islink(src):
os.symlink(os.readlink(src), dest)
# copytime is ignored for symlinks, but in general copytime isn't needed
# for them anyway
else:
try:
shutil.copyfile(src, dest)
if copystat:
# copystat also copies mode
shutil.copystat(src, dest)
else:
shutil.copymode(src, dest)
except shutil.Error as inst:
raise Abort(str(inst))
def ellipsis(text, maxlength=400):
"""Trim string to at most maxlength (default: 400) columns in display."""
return encoding.trim(text, maxlength, ellipsis='...')
_notset = object()
def safehasattr(thing, attr):
return getattr(thing, attr, _notset) is not _notset
|