summaryrefslogtreecommitdiff
path: root/pwnlib/term/readline.py
blob: 72082eb897be512a68768125c6bd5e589675f4d2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import six
import sys
import os

from pwnlib.term import keyconsts as kc
from pwnlib.term import keymap as km
from pwnlib.term import term
from pwnlib.term import text

cursor = text.reverse

buffer_left, buffer_right = '', ''
saved_buffer = None
history = []
history_idx = None
prompt_handle = None
buffer_handle = None
suggest_handle = None
search_idx = None
search_results = []
startup_hook = None
shutdown_hook = None

delims = ' /;:.\\'

show_completion = True
show_suggestions = False

complete_hook = None
suggest_hook = None

tabs = 0

def force_to_bytes(data):
    if isinstance(data, bytes):
        return data
    try:
        return data.encode('utf-8')
    except Exception:
        return data.encode('latin-1')

def set_completer(completer):
    global complete_hook, suggest_hook
    if completer is None:
        complete_hook = None
        suggest_hook = None
    else:
        complete_hook = completer.complete
        suggest_hook = completer.suggest

def fmt_suggestions(suggestions):
    if suggestions:
        s = ''
        l = max(map(len, suggestions))
        columns = term.width // (l + 1)
        column_width = term.width // columns
        fmt = '%%-%ds' % column_width
        for j in range(0, len(suggestions), columns):
            for k in range(columns):
                l = j + k
                if l < len(suggestions):
                    s += fmt % suggestions[l]
            s += '\n'
    else:
        s = '(no completions)\n'
    return s

def auto_complete(*_):
    global show_suggestions, tabs
    if search_idx is not None:
        commit_search()
        tabs = 0
    elif tabs == 1:
        if complete_hook:
            ret = complete_hook(buffer_left, buffer_right)
            if ret:
                tabs = 0
                insert_text(ret)
    else:
        show_suggestions = not show_suggestions
        redisplay()

def handle_keypress(trace):
    global tabs
    k = trace[-1]
    if k == '<tab>':
        tabs += 1
    else:
        tabs = 0

def clear():
    global buffer_left, buffer_right, history_idx, search_idx
    buffer_left, buffer_right = '', ''
    history_idx = None
    search_idx = None
    redisplay()

def redisplay():
    global suggest_handle
    if buffer_handle:
        if show_suggestions and suggest_hook:
            suggestions = suggest_hook(buffer_left, buffer_right)
            if suggest_handle is None:
                h = prompt_handle or buffer_handle
                suggest_handle = term.output(before = h)
            s = fmt_suggestions(suggestions)
            suggest_handle.update(s)
        elif suggest_handle:
            suggest_handle.update('')
        if search_idx is None:
            s = None
            if buffer_right:
                s = buffer_left + cursor(buffer_right[0]) + buffer_right[1:]
            elif show_completion and complete_hook:
                ret = complete_hook(buffer_left, buffer_right)
                if ret:
                    s = buffer_left + \
                      text.underline(cursor(ret[0])) + \
                      text.underline(ret[1:])
            s = s or buffer_left + cursor(' ')
            buffer_handle.update(s)
        else:
            if search_results != []:
                idx, i, j = search_results[search_idx]
                buf = history[idx]
                a, b, c = buf[:i], buf[i:j], buf[j:]
                s = a + text.bold_green(b) + c
            else:
                s = text.white_on_red(buffer_left)
            buffer_handle.update('(search) ' + s)

def self_insert(trace):
    if len(trace) != 1:
        return
    k = trace[0]
    if k.type == kc.TYPE_UNICODE and k.mods == kc.MOD_NONE:
        insert_text(k.code)

def set_buffer(left, right):
    global buffer_left, buffer_right
    buffer_left = left
    buffer_right = right
    redisplay()

def cancel_search(*_):
    global search_idx
    if search_idx is not None:
        search_idx = None
        redisplay()

def commit_search():
    global search_idx
    if search_idx is not None and search_results:
        set_buffer(history[search_results[search_idx][0]], '')
        search_idx = None
        redisplay()

def update_search_results():
    global search_results, search_idx, show_suggestions
    if search_idx is None:
        return
    show_suggestions = False
    if search_results:
        hidx = search_results[search_idx][0]
    else:
        hidx = None
    search_results = []
    search_idx = 0
    if not buffer_left:
        return
    for idx, h in enumerate(history):
        for i in range(0, len(h) - len(buffer_left) + 1):
            if h[i:i + len(buffer_left)] == buffer_left:
                if hidx is not None and idx == hidx:
                    search_idx = len(search_results)
                search_results.append((idx, i, i + len(buffer_left)))
                break

def search_history(*_):
    global buffer_left, buffer_right, history_idx, search_idx
    if search_idx is None:
        buffer_left, buffer_right = buffer_left + buffer_right, ''
        history_idx = None
        search_idx = 0
        update_search_results()
    elif search_results:
        search_idx = (search_idx + 1) % len(search_results)
    redisplay()

def history_prev(*_):
    global history_idx, saved_buffer
    if history == []:
        return
    cancel_search()
    if history_idx is None:
        saved_buffer = (buffer_left, buffer_right)
        history_idx = -1
    if history_idx < len(history) - 1:
        history_idx += 1
        set_buffer(history[history_idx], '')

def history_next(*_):
    global history_idx, saved_buffer
    if history_idx is None:
        return
    cancel_search()
    if history_idx == 0:
        set_buffer(*saved_buffer)
        history_idx = None
        saved_buffer = None
    else:
        history_idx -= 1
        set_buffer(history[history_idx], '')

def backward_char(*_):
    global buffer_left, buffer_right
    commit_search()
    if buffer_left:
        buffer_right = buffer_left[-1] + buffer_right
        buffer_left = buffer_left[:-1]
    redisplay()

def forward_char(*_):
    global buffer_left, buffer_right
    commit_search()
    if buffer_right:
        buffer_left += buffer_right[0]
        buffer_right = buffer_right[1:]
    redisplay()

def insert_text(s):
    global history_idx, saved_buffer, buffer_left
    if history_idx is not None:
        history_idx = None
        saved_buffer = None
    buffer_left += s
    update_search_results()
    redisplay()

def submit(*_):
    if search_idx is not None:
        commit_search()
    else:
        keymap.stop()

def control_c(*_):
    global history_idx, saved_buffer
    if search_idx is not None:
        cancel_search()
    elif history_idx is not None:
        set_buffer(*saved_buffer)
        history_idx = None
        saved_buffer = None
    elif buffer_left or buffer_right:
        clear()
    else:
        raise KeyboardInterrupt

def control_d(*_):
    if buffer_left or buffer_right:
        return
    global eof
    eof = True
    keymap.stop()

def kill_to_end(*_):
    global buffer_right
    commit_search()
    buffer_right = []
    redisplay()

def delete_char_forward(*_):
    global buffer_right
    commit_search()
    if buffer_right:
        buffer_right = buffer_right[1:]
        redisplay()

def delete_char_backward(*_):
    global buffer_left
    if buffer_left:
        buffer_left = buffer_left[:-1]
        update_search_results()
        redisplay()

def kill_word_backward(*_):
    global buffer_left
    commit_search()
    flag = False
    while buffer_left:
        c = buffer_left[-1]
        if c[0] in delims:
            if flag:
                break
        else:
            flag = True
        buffer_left = buffer_left[:-1]
    redisplay()

def backward_word(*_):
    global buffer_left, buffer_right
    commit_search()
    flag = False
    while buffer_left:
        c = buffer_left[-1]
        if c[0] in delims:
            if flag:
                break
        else:
            flag = True
        buffer_right = buffer_left[-1] + buffer_right
        buffer_left = buffer_left[:-1]
    redisplay()

def forward_word(*_):
    global buffer_left, buffer_right
    commit_search()
    flag = False
    while buffer_right:
        c = buffer_right[0]
        if c[0] in delims:
            if flag:
                break
        else:
            flag = True
        buffer_left += buffer_right[0]
        buffer_right = buffer_right[1:]
    redisplay()

def go_beginning(*_):
    commit_search()
    set_buffer('', buffer_left + buffer_right)

def go_end(*_):
    commit_search()
    set_buffer(buffer_left + buffer_right, '')

keymap = km.Keymap({
    '<nomatch>'   : self_insert,
    '<up>'        : history_prev,
    '<down>'      : history_next,
    '<left>'      : backward_char,
    '<right>'     : forward_char,
    '<del>'       : delete_char_backward,
    '<delete>'    : delete_char_forward,
    '<enter>'     : submit,
    'C-j'         : submit,
    'C-<left>'    : backward_word,
    'C-<right>'   : forward_word,
    'M-<left>'    : backward_word,
    'M-<right>'   : forward_word,
    'C-c'         : control_c,
    'C-d'         : control_d,
    'C-k'         : kill_to_end,
    'C-w'         : kill_word_backward,
    '<backspace>' : kill_word_backward,
    'M-<del>'     : kill_word_backward,
    'C-r'         : search_history,
    '<escape>'    : cancel_search,
    'C-a'         : go_beginning,
    'C-e'         : go_end,
    '<tab>'       : auto_complete,
    '<any>'       : handle_keypress,
    })

def readline(_size=-1, prompt='', float=True, priority=10):
    # The argument  _size is unused, but is there for compatibility
    # with the existing readline

    global buffer_handle, prompt_handle, suggest_handle, eof, \
        show_suggestions

    # XXX circular imports
    from pwnlib.term import term_mode
    if not term_mode:
        six.print_(prompt, end='', flush=True)
        return getattr(sys.stdin, 'buffer', sys.stdin).readline(_size).rstrip(b'\n')
    show_suggestions = False
    eof = False
    if prompt:
        prompt_handle = term.output(prompt, float = float, priority = priority)
    else:
        prompt_handle = None
    buffer_handle = term.output(float = float, priority = priority)
    suggest_handle = None
    clear()
    if startup_hook:
        startup_hook()
    try:
        while True:
            try:
                try:
                    keymap.handle_input()
                except EOFError:
                    if len(buffer_left + buffer_right) == 0:
                        return b''
                if eof:
                    return b''
                else:
                    buffer = (buffer_left + buffer_right)
                    if buffer:
                        history.insert(0, buffer)
                    return force_to_bytes(buffer) + b'\n'
            except KeyboardInterrupt:
                control_c()
    finally:
        line = buffer_left + buffer_right + '\n'
        buffer_handle.update(line)
        buffer_handle.freeze()
        buffer_handle = None
        if prompt_handle:
            prompt_handle.freeze()
            prompt_handle = None
        if suggest_handle:
            suggest_handle.freeze()
            suggest_handle = None
        if shutdown_hook:
            shutdown_hook()

def raw_input(prompt='', float=True):
    r"""raw_input(prompt='', float=True)

    Replacement for the built-in ``raw_input`` using ``pwnlib`` readline
    implementation.

    Arguments:
        prompt(str): The prompt to show to the user.
        float(bool): If set to `True`, prompt and input will float to the
                     bottom of the screen when `term.term_mode` is enabled.
    """
    return readline(-1, prompt, float).rstrip(os.linesep.encode())

def str_input(prompt='', float=True):
    r"""str_input(prompt='', float=True)

    Replacement for the built-in ``input`` in python3 using ``pwnlib`` readline
    implementation.

    Arguments:
        prompt(str): The prompt to show to the user.
        float(bool): If set to `True`, prompt and input will float to the
                     bottom of the screen when `term.term_mode` is enabled.
    """
    return readline(-1, prompt, float).decode().rstrip(os.linesep)

def eval_input(prompt='', float=True):
    """eval_input(prompt='', float=True)

    Replacement for the built-in python 2 - style ``input`` using
    ``pwnlib`` readline implementation, and `pwnlib.util.safeeval.expr`
    instead of ``eval`` (!).

    Arguments:
        prompt(str): The prompt to show to the user.
        float(bool): If set to ``True``, prompt and input will float to the
                     bottom of the screen when `term.term_mode` is enabled.

    Example:

        >>> try:
        ...     saved = sys.stdin, pwnlib.term.term_mode
        ...     pwnlib.term.term_mode = False
        ...     sys.stdin = io.TextIOWrapper(io.BytesIO(b"{'a': 20}"))
        ...     eval_input("Favorite object? ")['a']
        ... finally:
        ...     sys.stdin, pwnlib.term.term_mode = saved
        Favorite object? 20
    """
    from pwnlib.util import safeeval
    return safeeval.const(readline(-1, prompt, float).rstrip(os.linesep.encode()))

def init():
    global safeeval
    # defer imports until initialization
    import sys
    from six.moves import builtins
    from pwnlib.util import safeeval

    class Wrapper:
        def __init__(self, fd):
            self._fd = fd
        def readline(self, size = None):
            return readline(size)
        def __getattr__(self, k):
            return getattr(self._fd, k)
    sys.stdin = Wrapper(sys.stdin)

    if six.PY2:
        builtins.raw_input = raw_input
        builtins.input = eval_input
    else:
        builtins.input = str_input