summaryrefslogtreecommitdiff
path: root/tools/examples/svnshell.py
blob: b05e7ef170b5f63763f8650e4525af0cd79340db (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
#!/usr/bin/env python
#
# svnshell.py : a Python-based shell interface for cruising 'round in
#               the filesystem.
#
######################################################################
#    Licensed to the Apache Software Foundation (ASF) under one
#    or more contributor license agreements.  See the NOTICE file
#    distributed with this work for additional information
#    regarding copyright ownership.  The ASF licenses this file
#    to you under the Apache License, Version 2.0 (the
#    "License"); you may not use this file except in compliance
#    with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing,
#    software distributed under the License is distributed on an
#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
#    KIND, either express or implied.  See the License for the
#    specific language governing permissions and limitations
#    under the License.
######################################################################
#

import sys
import time
import re
from cmd import Cmd
from random import randint
from svn import fs, core, repos


class SVNShell(Cmd):
  def __init__(self, path):
    """initialize an SVNShell object"""
    Cmd.__init__(self)
    path = core.svn_path_canonicalize(path)
    self.fs_ptr = repos.fs(repos.open(path))
    self.is_rev = 1
    self.rev = fs.youngest_rev(self.fs_ptr)
    self.txn = None
    self.root = fs.revision_root(self.fs_ptr, self.rev)
    self.path = "/"
    self._setup_prompt()
    self.cmdloop()

  def precmd(self, line):
    if line == "EOF":
      # Ctrl-D is a command without a newline.  Print a newline, so the next
      # shell prompt is not on the same line as the last svnshell prompt.
      print("")
      return "exit"
    return line

  def postcmd(self, stop, line):
    self._setup_prompt()

  def default(self, line):
    print "Unknown command."

  def do_cat(self, arg):
    """dump the contents of a file"""
    if not len(arg):
      print("You must supply a file path.")
      return
    catpath = self._parse_path(arg)
    kind = fs.check_path(self.root, catpath)
    if kind == core.svn_node_none:
      print("Path '%s' does not exist." % catpath)
      return
    if kind == core.svn_node_dir:
      print("Path '%s' is not a file." % catpath)
      return
    ### be nice to get some paging in here.
    stream = fs.file_contents(self.root, catpath)
    while True:
      data = core.svn_stream_read(stream, core.SVN_STREAM_CHUNK_SIZE)
      sys.stdout.write(data)
      if len(data) < core.SVN_STREAM_CHUNK_SIZE:
        break

  def do_cd(self, arg):
    """change directory"""
    newpath = self._parse_path(arg)

    # make sure that path actually exists in the filesystem as a directory
    kind = fs.check_path(self.root, newpath)
    if kind != core.svn_node_dir:
      print("Path '%s' is not a valid filesystem directory." % newpath)
      return
    self.path = newpath

  def do_ls(self, arg):
    """list the contents of the current directory or provided path"""
    parent = self.path
    if not len(arg):
      # no arg -- show a listing for the current directory.
      entries = fs.dir_entries(self.root, self.path)
    else:
      # arg?  show a listing of that path.
      newpath = self._parse_path(arg)
      kind = fs.check_path(self.root, newpath)
      if kind == core.svn_node_dir:
        parent = newpath
        entries = fs.dir_entries(self.root, parent)
      elif kind == core.svn_node_file:
        parts = self._path_to_parts(newpath)
        name = parts.pop(-1)
        parent = self._parts_to_path(parts)
        print(parent + ':' + name)
        tmpentries = fs.dir_entries(self.root, parent)
        if not tmpentries.get(name, None):
          return
        entries = {}
        entries[name] = tmpentries[name]
      else:
        print("Path '%s' not found." % newpath)
        return

    keys = sorted(entries.keys())

    print("   REV   AUTHOR  NODE-REV-ID     SIZE         DATE NAME")
    print("----------------------------------------------------------------------------")

    for entry in keys:
      fullpath = parent + '/' + entry
      size = ''
      is_dir = fs.is_dir(self.root, fullpath)
      if is_dir:
        name = entry + '/'
      else:
        size = str(fs.file_length(self.root, fullpath))
        name = entry
      node_id = fs.unparse_id(entries[entry].id)
      created_rev = fs.node_created_rev(self.root, fullpath)
      author = fs.revision_prop(self.fs_ptr, created_rev,
                                core.SVN_PROP_REVISION_AUTHOR)
      if not author:
        author = ""
      date = fs.revision_prop(self.fs_ptr, created_rev,
                              core.SVN_PROP_REVISION_DATE)
      if not date:
        date = ""
      else:
        date = self._format_date(date)

      print("%6s %8s %12s %8s %12s %s" % (created_rev, author[:8],
                                          node_id, size, date, name))

  def do_lstxns(self, arg):
    """list the transactions available for browsing"""
    txns = sorted(fs.list_transactions(self.fs_ptr))
    counter = 0
    for txn in txns:
      counter = counter + 1
      sys.stdout.write("%8s   " % txn)
      if counter == 6:
        print("")
        counter = 0
    print("")

  def do_pcat(self, arg):
    """list the properties of a path"""
    catpath = self.path
    if len(arg):
      catpath = self._parse_path(arg)
    kind = fs.check_path(self.root, catpath)
    if kind == core.svn_node_none:
      print("Path '%s' does not exist." % catpath)
      return
    plist = fs.node_proplist(self.root, catpath)
    if not plist:
      return
    for pkey, pval in plist.items():
      print('K ' + str(len(pkey)))
      print(pkey)
      print('P ' + str(len(pval)))
      print(pval)
    print('PROPS-END')

  def do_setrev(self, arg):
    """set the current revision to view"""
    try:
      if arg.lower() == 'head':
        rev = fs.youngest_rev(self.fs_ptr)
      else:
        rev = int(arg)
      newroot = fs.revision_root(self.fs_ptr, rev)
    except:
      print("Error setting the revision to '" + arg + "'.")
      return
    fs.close_root(self.root)
    self.root = newroot
    self.rev = rev
    self.is_rev = 1
    self._do_path_landing()

  def do_settxn(self, arg):
    """set the current transaction to view"""
    try:
      txnobj = fs.open_txn(self.fs_ptr, arg)
      newroot = fs.txn_root(txnobj)
    except:
      print("Error setting the transaction to '" + arg + "'.")
      return
    fs.close_root(self.root)
    self.root = newroot
    self.txn = arg
    self.is_rev = 0
    self._do_path_landing()

  def do_youngest(self, arg):
    """list the youngest revision available for browsing"""
    rev = fs.youngest_rev(self.fs_ptr)
    print(rev)

  def do_exit(self, arg):
    sys.exit(0)

  def _path_to_parts(self, path):
    return [_f for _f in path.split('/') if _f]

  def _parts_to_path(self, parts):
    return '/' + '/'.join(parts)

  def _parse_path(self, path):
    # cleanup leading, trailing, and duplicate '/' characters
    newpath = self._parts_to_path(self._path_to_parts(path))

    # if PATH is absolute, use it, else append it to the existing path.
    if path.startswith('/') or self.path == '/':
      newpath = '/' + newpath
    else:
      newpath = self.path + '/' + newpath

    # cleanup '.' and '..'
    parts = self._path_to_parts(newpath)
    finalparts = []
    for part in parts:
      if part == '.':
        pass
      elif part == '..':
        if len(finalparts) != 0:
          finalparts.pop(-1)
      else:
        finalparts.append(part)

    # finally, return the calculated path
    return self._parts_to_path(finalparts)

  def _format_date(self, date):
    date = core.svn_time_from_cstring(date)
    date = time.asctime(time.localtime(date / 1000000))
    return date[4:-8]

  def _do_path_landing(self):
    """try to land on self.path as a directory in root, failing up to '/'"""
    not_found = 1
    newpath = self.path
    while not_found:
      kind = fs.check_path(self.root, newpath)
      if kind == core.svn_node_dir:
        not_found = 0
      else:
        parts = self._path_to_parts(newpath)
        parts.pop(-1)
        newpath = self._parts_to_path(parts)
    self.path = newpath

  def _setup_prompt(self):
    """present the prompt and handle the user's input"""
    if self.is_rev:
      self.prompt = "<rev: " + str(self.rev)
    else:
      self.prompt = "<txn: " + self.txn
    self.prompt += " " + self.path + ">$ "

  def _complete(self, text, line, begidx, endidx, limit_node_kind=None):
    """Generic tab completer.  Takes the 4 standard parameters passed to a
    cmd.Cmd completer function, plus LIMIT_NODE_KIND, which should be a
    svn.core.svn_node_foo constant to restrict the returned completions to, or
    None for no limit.  Catches and displays exceptions, because otherwise
    they are silently ignored - which is quite frustrating when debugging!"""
    try:
      args = line.split()
      if len(args) > 1:
        arg = args[1]
      else:
        arg = ""
      dirs = arg.split('/')
      user_elem = dirs[-1]
      user_dir = "/".join(dirs[:-1] + [''])

      canon_dir = self._parse_path(user_dir)

      entries = fs.dir_entries(self.root, canon_dir)
      acceptable_completions = []
      for name, dirent_t in entries.items():
        if not name.startswith(user_elem):
          continue
        if limit_node_kind and dirent_t.kind != limit_node_kind:
          continue
        if dirent_t.kind == core.svn_node_dir:
          name += '/'
        acceptable_completions.append(name)
      if limit_node_kind == core.svn_node_dir or not limit_node_kind:
        if user_elem in ('.', '..'):
          for extraname in ('.', '..'):
            if extraname.startswith(user_elem):
              acceptable_completions.append(extraname + '/')
      return acceptable_completions
    except:
      ei = sys.exc_info()
      sys.stderr.write("EXCEPTION WHILST COMPLETING\n")
      import traceback
      traceback.print_tb(ei[2])
      sys.stderr.write("%s: %s\n" % (ei[0], ei[1]))
      raise

  def complete_cd(self, text, line, begidx, endidx):
    return self._complete(text, line, begidx, endidx, core.svn_node_dir)

  def complete_cat(self, text, line, begidx, endidx):
    return self._complete(text, line, begidx, endidx, core.svn_node_file)

  def complete_ls(self, text, line, begidx, endidx):
    return self._complete(text, line, begidx, endidx)

  def complete_pcat(self, text, line, begidx, endidx):
    return self._complete(text, line, begidx, endidx)


def _basename(path):
  "Return the basename for a '/'-separated path."
  idx = path.rfind('/')
  if idx == -1:
    return path
  return path[idx+1:]


def usage(exit):
  if exit:
    output = sys.stderr
  else:
    output = sys.stdout
  output.write(
    "usage: %s REPOS_PATH\n"
    "\n"
    "Once the program has started, type 'help' at the prompt for hints on\n"
    "using the shell.\n" % sys.argv[0])
  sys.exit(exit)

def main():
  if len(sys.argv) != 2:
    usage(1)

  SVNShell(sys.argv[1])

if __name__ == '__main__':
  main()