| # Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com> |
| # Alex Gaynor |
| # Antonio Cuni |
| # Armin Rigo |
| # Holger Krekel |
| # |
| # All Rights Reserved |
| # |
| # |
| # Permission to use, copy, modify, and distribute this software and |
| # its documentation for any purpose is hereby granted without fee, |
| # provided that the above copyright notice appear in all copies and |
| # that both that copyright notice and this permission notice appear in |
| # supporting documentation. |
| # |
| # THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO |
| # THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY |
| # AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, |
| # INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER |
| # RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF |
| # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN |
| # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
| |
| """A compatibility wrapper reimplementing the 'readline' standard module |
| on top of pyrepl. Not all functionalities are supported. Contains |
| extensions for multiline input. |
| """ |
| |
| from __future__ import annotations |
| |
| import warnings |
| from dataclasses import dataclass, field |
| |
| import os |
| from site import gethistoryfile # type: ignore[attr-defined] |
| import sys |
| from rlcompleter import Completer as RLCompleter |
| |
| from . import commands, historical_reader |
| from .completing_reader import CompletingReader |
| from .console import Console as ConsoleType |
| |
| Console: type[ConsoleType] |
| _error: tuple[type[Exception], ...] | type[Exception] |
| try: |
| from .unix_console import UnixConsole as Console, _error |
| except ImportError: |
| from .windows_console import WindowsConsole as Console, _error |
| |
| ENCODING = sys.getdefaultencoding() or "latin1" |
| |
| |
| # types |
| Command = commands.Command |
| from collections.abc import Callable, Collection |
| from .types import Callback, Completer, KeySpec, CommandName |
| |
| TYPE_CHECKING = False |
| |
| if TYPE_CHECKING: |
| from typing import Any, Mapping |
| |
| |
| MoreLinesCallable = Callable[[str], bool] |
| |
| |
| __all__ = [ |
| "add_history", |
| "clear_history", |
| "get_begidx", |
| "get_completer", |
| "get_completer_delims", |
| "get_current_history_length", |
| "get_endidx", |
| "get_history_item", |
| "get_history_length", |
| "get_line_buffer", |
| "insert_text", |
| "parse_and_bind", |
| "read_history_file", |
| # "read_init_file", |
| # "redisplay", |
| "remove_history_item", |
| "replace_history_item", |
| "set_auto_history", |
| "set_completer", |
| "set_completer_delims", |
| "set_history_length", |
| # "set_pre_input_hook", |
| "set_startup_hook", |
| "write_history_file", |
| # ---- multiline extensions ---- |
| "multiline_input", |
| ] |
| |
| # ____________________________________________________________ |
| |
| @dataclass |
| class ReadlineConfig: |
| readline_completer: Completer | None = None |
| completer_delims: frozenset[str] = frozenset(" \t\n`~!@#$%^&*()-=+[{]}\\|;:'\",<>/?") |
| |
| |
| @dataclass(kw_only=True) |
| class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader): |
| # Class fields |
| assume_immutable_completions = False |
| use_brackets = False |
| sort_in_column = True |
| |
| # Instance fields |
| config: ReadlineConfig |
| more_lines: MoreLinesCallable | None = None |
| last_used_indentation: str | None = None |
| |
| def __post_init__(self) -> None: |
| super().__post_init__() |
| self.commands["maybe_accept"] = maybe_accept |
| self.commands["maybe-accept"] = maybe_accept |
| self.commands["backspace_dedent"] = backspace_dedent |
| self.commands["backspace-dedent"] = backspace_dedent |
| |
| def error(self, msg: str = "none") -> None: |
| pass # don't show error messages by default |
| |
| def get_stem(self) -> str: |
| b = self.buffer |
| p = self.pos - 1 |
| completer_delims = self.config.completer_delims |
| while p >= 0 and b[p] not in completer_delims: |
| p -= 1 |
| return "".join(b[p + 1 : self.pos]) |
| |
| def get_completions(self, stem: str) -> list[str]: |
| if len(stem) == 0 and self.more_lines is not None: |
| b = self.buffer |
| p = self.pos |
| while p > 0 and b[p - 1] != "\n": |
| p -= 1 |
| num_spaces = 4 - ((self.pos - p) % 4) |
| return [" " * num_spaces] |
| result = [] |
| function = self.config.readline_completer |
| if function is not None: |
| try: |
| stem = str(stem) # rlcompleter.py seems to not like unicode |
| except UnicodeEncodeError: |
| pass # but feed unicode anyway if we have no choice |
| state = 0 |
| while True: |
| try: |
| next = function(stem, state) |
| except Exception: |
| break |
| if not isinstance(next, str): |
| break |
| result.append(next) |
| state += 1 |
| # emulate the behavior of the standard readline that sorts |
| # the completions before displaying them. |
| result.sort() |
| return result |
| |
| def get_trimmed_history(self, maxlength: int) -> list[str]: |
| if maxlength >= 0: |
| cut = len(self.history) - maxlength |
| if cut < 0: |
| cut = 0 |
| else: |
| cut = 0 |
| return self.history[cut:] |
| |
| def update_last_used_indentation(self) -> None: |
| indentation = _get_first_indentation(self.buffer) |
| if indentation is not None: |
| self.last_used_indentation = indentation |
| |
| # --- simplified support for reading multiline Python statements --- |
| |
| def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: |
| return super().collect_keymap() + ( |
| (r"\n", "maybe-accept"), |
| (r"\<backspace>", "backspace-dedent"), |
| ) |
| |
| def after_command(self, cmd: Command) -> None: |
| super().after_command(cmd) |
| if self.more_lines is None: |
| # Force single-line input if we are in raw_input() mode. |
| # Although there is no direct way to add a \n in this mode, |
| # multiline buffers can still show up using various |
| # commands, e.g. navigating the history. |
| try: |
| index = self.buffer.index("\n") |
| except ValueError: |
| pass |
| else: |
| self.buffer = self.buffer[:index] |
| if self.pos > len(self.buffer): |
| self.pos = len(self.buffer) |
| |
| |
| def set_auto_history(_should_auto_add_history: bool) -> None: |
| """Enable or disable automatic history""" |
| historical_reader.should_auto_add_history = bool(_should_auto_add_history) |
| |
| |
| def _get_this_line_indent(buffer: list[str], pos: int) -> int: |
| indent = 0 |
| while pos > 0 and buffer[pos - 1] in " \t": |
| indent += 1 |
| pos -= 1 |
| if pos > 0 and buffer[pos - 1] == "\n": |
| return indent |
| return 0 |
| |
| |
| def _get_previous_line_indent(buffer: list[str], pos: int) -> tuple[int, int | None]: |
| prevlinestart = pos |
| while prevlinestart > 0 and buffer[prevlinestart - 1] != "\n": |
| prevlinestart -= 1 |
| prevlinetext = prevlinestart |
| while prevlinetext < pos and buffer[prevlinetext] in " \t": |
| prevlinetext += 1 |
| if prevlinetext == pos: |
| indent = None |
| else: |
| indent = prevlinetext - prevlinestart |
| return prevlinestart, indent |
| |
| |
| def _get_first_indentation(buffer: list[str]) -> str | None: |
| indented_line_start = None |
| for i in range(len(buffer)): |
| if (i < len(buffer) - 1 |
| and buffer[i] == "\n" |
| and buffer[i + 1] in " \t" |
| ): |
| indented_line_start = i + 1 |
| elif indented_line_start is not None and buffer[i] not in " \t\n": |
| return ''.join(buffer[indented_line_start : i]) |
| return None |
| |
| |
| def _should_auto_indent(buffer: list[str], pos: int) -> bool: |
| # check if last character before "pos" is a colon, ignoring |
| # whitespaces and comments. |
| last_char = None |
| while pos > 0: |
| pos -= 1 |
| if last_char is None: |
| if buffer[pos] not in " \t\n#": # ignore whitespaces and comments |
| last_char = buffer[pos] |
| else: |
| # even if we found a non-whitespace character before |
| # original pos, we keep going back until newline is reached |
| # to make sure we ignore comments |
| if buffer[pos] == "\n": |
| break |
| if buffer[pos] == "#": |
| last_char = None |
| return last_char == ":" |
| |
| |
| class maybe_accept(commands.Command): |
| def do(self) -> None: |
| r: ReadlineAlikeReader |
| r = self.reader # type: ignore[assignment] |
| r.dirty = True # this is needed to hide the completion menu, if visible |
| |
| if self.reader.in_bracketed_paste: |
| r.insert("\n") |
| return |
| |
| # if there are already several lines and the cursor |
| # is not on the last one, always insert a new \n. |
| text = r.get_unicode() |
| |
| if "\n" in r.buffer[r.pos :] or ( |
| r.more_lines is not None and r.more_lines(text) |
| ): |
| def _newline_before_pos(): |
| before_idx = r.pos - 1 |
| while before_idx > 0 and text[before_idx].isspace(): |
| before_idx -= 1 |
| return text[before_idx : r.pos].count("\n") > 0 |
| |
| # if there's already a new line before the cursor then |
| # even if the cursor is followed by whitespace, we assume |
| # the user is trying to terminate the block |
| if _newline_before_pos() and text[r.pos:].isspace(): |
| self.finish = True |
| return |
| |
| # auto-indent the next line like the previous line |
| prevlinestart, indent = _get_previous_line_indent(r.buffer, r.pos) |
| r.insert("\n") |
| if not self.reader.paste_mode: |
| if indent: |
| for i in range(prevlinestart, prevlinestart + indent): |
| r.insert(r.buffer[i]) |
| r.update_last_used_indentation() |
| if _should_auto_indent(r.buffer, r.pos): |
| if r.last_used_indentation is not None: |
| indentation = r.last_used_indentation |
| else: |
| # default |
| indentation = " " * 4 |
| r.insert(indentation) |
| elif not self.reader.paste_mode: |
| self.finish = True |
| else: |
| r.insert("\n") |
| |
| |
| class backspace_dedent(commands.Command): |
| def do(self) -> None: |
| r = self.reader |
| b = r.buffer |
| if r.pos > 0: |
| repeat = 1 |
| if b[r.pos - 1] != "\n": |
| indent = _get_this_line_indent(b, r.pos) |
| if indent > 0: |
| ls = r.pos - indent |
| while ls > 0: |
| ls, pi = _get_previous_line_indent(b, ls - 1) |
| if pi is not None and pi < indent: |
| repeat = indent - pi |
| break |
| r.pos -= repeat |
| del b[r.pos : r.pos + repeat] |
| r.dirty = True |
| else: |
| self.reader.error("can't backspace at start") |
| |
| |
| # ____________________________________________________________ |
| |
| |
| @dataclass(slots=True) |
| class _ReadlineWrapper: |
| f_in: int = -1 |
| f_out: int = -1 |
| reader: ReadlineAlikeReader | None = field(default=None, repr=False) |
| saved_history_length: int = -1 |
| startup_hook: Callback | None = None |
| config: ReadlineConfig = field(default_factory=ReadlineConfig, repr=False) |
| |
| def __post_init__(self) -> None: |
| if self.f_in == -1: |
| self.f_in = os.dup(0) |
| if self.f_out == -1: |
| self.f_out = os.dup(1) |
| |
| def get_reader(self) -> ReadlineAlikeReader: |
| if self.reader is None: |
| console = Console(self.f_in, self.f_out, encoding=ENCODING) |
| self.reader = ReadlineAlikeReader(console=console, config=self.config) |
| return self.reader |
| |
| def input(self, prompt: object = "") -> str: |
| try: |
| reader = self.get_reader() |
| except _error: |
| assert raw_input is not None |
| return raw_input(prompt) |
| prompt_str = str(prompt) |
| reader.ps1 = prompt_str |
| sys.audit("builtins.input", prompt_str) |
| result = reader.readline(startup_hook=self.startup_hook) |
| sys.audit("builtins.input/result", result) |
| return result |
| |
| def multiline_input(self, more_lines: MoreLinesCallable, ps1: str, ps2: str) -> str: |
| """Read an input on possibly multiple lines, asking for more |
| lines as long as 'more_lines(unicodetext)' returns an object whose |
| boolean value is true. |
| """ |
| reader = self.get_reader() |
| saved = reader.more_lines |
| try: |
| reader.more_lines = more_lines |
| reader.ps1 = ps1 |
| reader.ps2 = ps1 |
| reader.ps3 = ps2 |
| reader.ps4 = "" |
| with warnings.catch_warnings(action="ignore"): |
| return reader.readline() |
| finally: |
| reader.more_lines = saved |
| reader.paste_mode = False |
| |
| def parse_and_bind(self, string: str) -> None: |
| pass # XXX we don't support parsing GNU-readline-style init files |
| |
| def set_completer(self, function: Completer | None = None) -> None: |
| self.config.readline_completer = function |
| |
| def get_completer(self) -> Completer | None: |
| return self.config.readline_completer |
| |
| def set_completer_delims(self, delimiters: Collection[str]) -> None: |
| self.config.completer_delims = frozenset(delimiters) |
| |
| def get_completer_delims(self) -> str: |
| return "".join(sorted(self.config.completer_delims)) |
| |
| def _histline(self, line: str) -> str: |
| line = line.rstrip("\n") |
| return line |
| |
| def get_history_length(self) -> int: |
| return self.saved_history_length |
| |
| def set_history_length(self, length: int) -> None: |
| self.saved_history_length = length |
| |
| def get_current_history_length(self) -> int: |
| return len(self.get_reader().history) |
| |
| def read_history_file(self, filename: str = gethistoryfile()) -> None: |
| # multiline extension (really a hack) for the end of lines that |
| # are actually continuations inside a single multiline_input() |
| # history item: we use \r\n instead of just \n. If the history |
| # file is passed to GNU readline, the extra \r are just ignored. |
| history = self.get_reader().history |
| |
| with open(os.path.expanduser(filename), 'rb') as f: |
| is_editline = f.readline().startswith(b"_HiStOrY_V2_") |
| if is_editline: |
| encoding = "unicode-escape" |
| else: |
| f.seek(0) |
| encoding = "utf-8" |
| |
| lines = [line.decode(encoding, errors='replace') for line in f.read().split(b'\n')] |
| buffer = [] |
| for line in lines: |
| if line.endswith("\r"): |
| buffer.append(line+'\n') |
| else: |
| line = self._histline(line) |
| if buffer: |
| line = self._histline("".join(buffer).replace("\r", "") + line) |
| del buffer[:] |
| if line: |
| history.append(line) |
| |
| def write_history_file(self, filename: str = gethistoryfile()) -> None: |
| maxlength = self.saved_history_length |
| history = self.get_reader().get_trimmed_history(maxlength) |
| with open(os.path.expanduser(filename), "w", encoding="utf-8") as f: |
| for entry in history: |
| entry = entry.replace("\n", "\r\n") # multiline history support |
| f.write(entry + "\n") |
| |
| def clear_history(self) -> None: |
| del self.get_reader().history[:] |
| |
| def get_history_item(self, index: int) -> str | None: |
| history = self.get_reader().history |
| if 1 <= index <= len(history): |
| return history[index - 1] |
| else: |
| return None # like readline.c |
| |
| def remove_history_item(self, index: int) -> None: |
| history = self.get_reader().history |
| if 0 <= index < len(history): |
| del history[index] |
| else: |
| raise ValueError("No history item at position %d" % index) |
| # like readline.c |
| |
| def replace_history_item(self, index: int, line: str) -> None: |
| history = self.get_reader().history |
| if 0 <= index < len(history): |
| history[index] = self._histline(line) |
| else: |
| raise ValueError("No history item at position %d" % index) |
| # like readline.c |
| |
| def add_history(self, line: str) -> None: |
| self.get_reader().history.append(self._histline(line)) |
| |
| def set_startup_hook(self, function: Callback | None = None) -> None: |
| self.startup_hook = function |
| |
| def get_line_buffer(self) -> str: |
| return self.get_reader().get_unicode() |
| |
| def _get_idxs(self) -> tuple[int, int]: |
| start = cursor = self.get_reader().pos |
| buf = self.get_line_buffer() |
| for i in range(cursor - 1, -1, -1): |
| if buf[i] in self.get_completer_delims(): |
| break |
| start = i |
| return start, cursor |
| |
| def get_begidx(self) -> int: |
| return self._get_idxs()[0] |
| |
| def get_endidx(self) -> int: |
| return self._get_idxs()[1] |
| |
| def insert_text(self, text: str) -> None: |
| self.get_reader().insert(text) |
| |
| |
| _wrapper = _ReadlineWrapper() |
| |
| # ____________________________________________________________ |
| # Public API |
| |
| parse_and_bind = _wrapper.parse_and_bind |
| set_completer = _wrapper.set_completer |
| get_completer = _wrapper.get_completer |
| set_completer_delims = _wrapper.set_completer_delims |
| get_completer_delims = _wrapper.get_completer_delims |
| get_history_length = _wrapper.get_history_length |
| set_history_length = _wrapper.set_history_length |
| get_current_history_length = _wrapper.get_current_history_length |
| read_history_file = _wrapper.read_history_file |
| write_history_file = _wrapper.write_history_file |
| clear_history = _wrapper.clear_history |
| get_history_item = _wrapper.get_history_item |
| remove_history_item = _wrapper.remove_history_item |
| replace_history_item = _wrapper.replace_history_item |
| add_history = _wrapper.add_history |
| set_startup_hook = _wrapper.set_startup_hook |
| get_line_buffer = _wrapper.get_line_buffer |
| get_begidx = _wrapper.get_begidx |
| get_endidx = _wrapper.get_endidx |
| insert_text = _wrapper.insert_text |
| |
| # Extension |
| multiline_input = _wrapper.multiline_input |
| |
| # Internal hook |
| _get_reader = _wrapper.get_reader |
| |
| # ____________________________________________________________ |
| # Stubs |
| |
| |
| def _make_stub(_name: str, _ret: object) -> None: |
| def stub(*args: object, **kwds: object) -> None: |
| import warnings |
| |
| warnings.warn("readline.%s() not implemented" % _name, stacklevel=2) |
| |
| stub.__name__ = _name |
| globals()[_name] = stub |
| |
| |
| for _name, _ret in [ |
| ("read_init_file", None), |
| ("redisplay", None), |
| ("set_pre_input_hook", None), |
| ]: |
| assert _name not in globals(), _name |
| _make_stub(_name, _ret) |
| |
| # ____________________________________________________________ |
| |
| |
| def _setup(namespace: Mapping[str, Any]) -> None: |
| global raw_input |
| if raw_input is not None: |
| return # don't run _setup twice |
| |
| try: |
| f_in = sys.stdin.fileno() |
| f_out = sys.stdout.fileno() |
| except (AttributeError, ValueError): |
| return |
| if not os.isatty(f_in) or not os.isatty(f_out): |
| return |
| |
| _wrapper.f_in = f_in |
| _wrapper.f_out = f_out |
| |
| # set up namespace in rlcompleter, which requires it to be a bona fide dict |
| if not isinstance(namespace, dict): |
| namespace = dict(namespace) |
| _wrapper.config.readline_completer = RLCompleter(namespace).complete |
| |
| # this is not really what readline.c does. Better than nothing I guess |
| import builtins |
| raw_input = builtins.input |
| builtins.input = _wrapper.input |
| |
| |
| raw_input: Callable[[object], str] | None = None |