| # Copyright (C) 2020 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http:#www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """REPL test cases""" |
| # pylint: disable=missing-docstring,bad-whitespace,redefined-outer-name |
| |
| import logging |
| import pytest |
| import numpy as np |
| |
| from .repl import ( |
| LineDrawing, |
| ReplSession, |
| TableFormatter, |
| TableStudy, |
| Ui, |
| ) |
| from .util import ( |
| final, |
| override, |
| ) |
| from .query import QuerySchema |
| from ._native import StringTable |
| |
| log = logging.getLogger(__name__) |
| |
| # Ugly dummy style we use in formatting tests. The drawing characters |
| # are lowercase letters so that we can test that styles are applied |
| # correctly by using a dummy "U" style to make "styled" parts of the |
| # table output upper case. |
| DUMMY_LINE_DRAWING = LineDrawing( |
| colsep=("U", "q"), |
| major_cross_first=("U", "a"), |
| major_cross_both=("U", "s"), |
| major_cross_last=("U", "d"), |
| minor_cross_first=("U", "z"), |
| minor_cross_both=("U", "x"), |
| minor_cross_last=("U", "c"), |
| rowsep_major=("U", "y"), |
| rowsep_minor=("U", "n"), |
| hanging_indent=("U", "h"), |
| truncation_indicator=("U", "t"), |
| header_style="U", |
| header_sep_by_default=True, |
| ) |
| FORMATTED_NEWLINE = ("", "\n") |
| |
| DEFAULT_COLUMN_FORMATTER = lambda n, _s: n |
| |
| @pytest.fixture(scope="module", params=["1", "2", "auto"]) |
| def repl(request): |
| test_ui = TestReplUi() |
| repl = ReplSession(test_ui) |
| if request.param != "auto": |
| block_size = int(request.param) |
| repl.handle_input_line(".set block_size {}".format(block_size)) |
| assert test_ui.slurp_output() == \ |
| "Set block_size to {}".format(block_size) |
| return repl |
| |
| @final |
| class TestReplUi(Ui): |
| @override |
| def __init__(self): |
| super().__init__() |
| self.reset() |
| |
| def reset(self): |
| self.prints = [] |
| self.tables = [] |
| |
| def slurp_output(self): |
| assert not self.tables |
| output = "\n".join(self.prints) |
| self.reset() |
| return output |
| |
| def slurp_table(self): |
| assert not self.prints |
| [row_chunk_generator, st] = self.tables[0] |
| self.reset() |
| return row_chunk_generator, st |
| |
| @override |
| def print(self, text): |
| self.prints.append(text) |
| |
| @override |
| def print_table(self, row_chunk_generator, st, table_schema, |
| column_header_formatter): |
| assert isinstance(st, StringTable) |
| assert callable(row_chunk_generator) |
| assert not self.tables |
| assert callable(column_header_formatter) |
| self.tables.append((row_chunk_generator, st)) |
| |
| def _table_to_dict(table): |
| row_chunk_generator, _st = table |
| data = None |
| for row_chunk in row_chunk_generator(): |
| if data is None: |
| data = {column: [array] |
| for column, (array, _schema) |
| in row_chunk.items()} |
| else: |
| assert tuple(data) == tuple(row_chunk) |
| for column, (array, schema) in row_chunk.items(): |
| assert isinstance(schema, QuerySchema) |
| data[column].append(array) # pylint: disable=unsubscriptable-object |
| assert data is not None |
| return {column: np.concatenate(arrays).tolist() |
| for column, arrays in data.items()} |
| |
| def _munge_dummy_output(output): |
| text_parts = [] |
| for chunk_format, chunk_contents in output: |
| assert isinstance(chunk_contents, str) |
| assert chunk_format in ("", "U") |
| if chunk_format == "U": |
| chunk_contents = chunk_contents.upper() |
| text_parts.append(chunk_contents) |
| return "".join(text_parts) |
| |
| def _format_table(repl, sql, *, desired_width=40, |
| column_header_formatter=DEFAULT_COLUMN_FORMATTER): |
| repl.handle_input_line(".set block_size 1") |
| assert repl.ui.slurp_output() == "Set block_size to 1" |
| repl.handle_input_line(sql) |
| row_chunk_generator, st = repl.ui.slurp_table() |
| study = TableStudy(row_chunk_generator, st, column_header_formatter) |
| formatter = TableFormatter(study=study, |
| desired_width=desired_width, |
| line_drawing=DUMMY_LINE_DRAWING) |
| output = [] |
| output.extend(formatter.format_column_headers()) |
| output.append(FORMATTED_NEWLINE) |
| for row_chunk in row_chunk_generator(): |
| output.extend(formatter.format_row_chunk(row_chunk)) |
| return _munge_dummy_output(output).replace(" ", "_") |
| |
| def test_help(repl): |
| repl.handle_input_line(".help") |
| output = repl.ui.slurp_output() |
| assert "Exit the REPL" in output |
| |
| def test_print_table(repl): |
| repl.handle_input_line("(VALUES (1, 2), (3, 4))") |
| assert _table_to_dict(repl.ui.slurp_table()) == { |
| "col0": [1, 3], |
| "col1": [2, 4], |
| } |
| |
| def test_study_table(repl): |
| repl.handle_input_line("(VALUES (12345, 2), (3, 4))") |
| row_chunk_generator, st = repl.ui.slurp_table() |
| study = TableStudy(row_chunk_generator, st, DEFAULT_COLUMN_FORMATTER) |
| assert study.column_names == ("col0", "col1") |
| columns = study.columns |
| assert columns[0].name == "col0" |
| assert columns[0].width == len("12345") |
| assert columns[1].name == "col1" |
| assert columns[1].width == len("col1") |
| |
| # We represent trailing whitespace with _ so that editors that strip |
| # trailing whitespace automatically don't break tests. |
| |
| def test_format_table_basic(repl): |
| assert _format_table(repl, "(VALUES (12345, 2), (3, 4))") == """\ |
| COL0_QCOL1 |
| YYYYYDYYYY |
| 12345Q2___ |
| 3____Q4___ |
| """ |
| |
| def test_format_table_column_formatter(repl): |
| def _dummy_column_formatter(name, schema): |
| assert isinstance(schema, QuerySchema) |
| return name+"x" |
| assert _format_table( |
| repl, "(VALUES (12345, 2), (3, 4))", |
| column_header_formatter=_dummy_column_formatter) == """\ |
| COL0XQCOL1X |
| YYYYYDYYYYY |
| 12345Q2____ |
| 3____Q4____ |
| """ |
| |
| def test_format_table_strings(repl): |
| assert _format_table(repl, "(VALUES ('foo', 2), ('b', 4))") == """\ |
| COL0QCOL1 |
| YYYYDYYYY |
| foo_Q2___ |
| b___Q4___ |
| """ |
| |
| def test_format_table_pad_to_header(repl): |
| assert _format_table(repl, "(VALUES (-1, 2), (3, 4))") == """\ |
| COL0QCOL1 |
| YYYYDYYYY |
| -1__Q2___ |
| 3___Q4___ |
| """ |
| |
| def test_format_table_no_rows(repl): |
| assert _format_table(repl, "SELECT 0 AS foo FROM dctv.filled(0)") == """\ |
| FOO |
| YYY |
| """ |
| |
| def test_format_table_no_columns(repl): |
| assert _format_table(repl, "SELECT * FROM dctv.filled(0)") == "\n" |
| assert _format_table(repl, "SELECT * FROM dctv.filled(10)") == "\n" |
| |
| def test_format_table_truncate_header(repl): |
| assert _format_table(repl, "SELECT 0 AS `abcdefghijklmop`", |
| desired_width=10) == """\ |
| ABCDEFGHIT |
| YYYYYYYYYY |
| 0_________ |
| """ |
| |
| def test_format_table_truncate_value(repl): |
| assert _format_table(repl, |
| "(VALUES ('foo'), ('abcdefghijklmop'), ('qux'))", |
| desired_width=10) == """\ |
| COL0______ |
| YYYYYYYYYY |
| foo_______ |
| abcdefghiT |
| qux_______ |
| """ |
| |
| def test_format_table_truncate_header_wrap(repl): |
| assert _format_table(repl, |
| "SELECT 1 AS `foo`, 0 AS `abcdefghijklmop`", |
| desired_width=10) == """\ |
| FOO_______ |
| NNNNNNNNNN |
| HABCDEFGHT |
| YYYYYYYYYY |
| 1_________ |
| NNNNNNNNNN |
| H0________ |
| YYYYYYYYYY |
| """ |
| |
| def test_format_table_3row(repl): |
| assert _format_table(repl, "SELECT " |
| "0 AS `abcdefghijklmop`," |
| "1 AS `qrstuvwxyzabcde`," |
| "2 AS `fghijklmnopqrst`,", |
| desired_width=10) == """\ |
| ABCDEFGHIT |
| NNNNNNNNNN |
| HQRSTUVWXT |
| NNNNNNNNNN |
| HHFGHIJKLT |
| YYYYYYYYYY |
| 0_________ |
| NNNNNNNNNN |
| H1________ |
| NNNNNNNNNN |
| HH2_______ |
| YYYYYYYYYY |
| """ |