Thoroughly refactor the cases generator (#107151)
This mostly extracts a whole bunch of stuff out of generate_cases.py into separate files, but there are a few other things going on here.
- analysis.py: `Analyzer` etc.
- instructions.py: `Instruction` etc.
- flags.py: `InstructionFlags`, `variable_used`, `variable_used_unspecialized`
- formatting.py: `Formatter` etc.
- Rename parser.py to parsing.py, to avoid conflict with stdlib parser.py
- Blackify most things
- Fix most mypy errors
- Remove output filenames from Generator state, add them to `write_instructions()` etc.
- Fix unit tests
diff --git a/Lib/test/test_generated_cases.py b/Lib/test/test_generated_cases.py
index ba0e5e8..9c23c2e 100644
--- a/Lib/test/test_generated_cases.py
+++ b/Lib/test/test_generated_cases.py
@@ -6,8 +6,10 @@
test_tools.skip_if_missing('cases_generator')
with test_tools.imports_under_tool('cases_generator'):
+ import analysis
+ import formatting
import generate_cases
- from parser import StackEffect
+ from parsing import StackEffect
class TestEffects(unittest.TestCase):
@@ -27,37 +29,37 @@ def test_effect_sizes(self):
StackEffect("q", "", "", ""),
StackEffect("r", "", "", ""),
]
- self.assertEqual(generate_cases.effect_size(x), (1, ""))
- self.assertEqual(generate_cases.effect_size(y), (0, "oparg"))
- self.assertEqual(generate_cases.effect_size(z), (0, "oparg*2"))
+ self.assertEqual(formatting.effect_size(x), (1, ""))
+ self.assertEqual(formatting.effect_size(y), (0, "oparg"))
+ self.assertEqual(formatting.effect_size(z), (0, "oparg*2"))
self.assertEqual(
- generate_cases.list_effect_size(input_effects),
+ formatting.list_effect_size(input_effects),
(1, "oparg + oparg*2"),
)
self.assertEqual(
- generate_cases.list_effect_size(output_effects),
+ formatting.list_effect_size(output_effects),
(2, "oparg*4"),
)
self.assertEqual(
- generate_cases.list_effect_size(other_effects),
+ formatting.list_effect_size(other_effects),
(2, "(oparg<<1)"),
)
self.assertEqual(
- generate_cases.string_effect_size(
- generate_cases.list_effect_size(input_effects),
+ formatting.string_effect_size(
+ formatting.list_effect_size(input_effects),
), "1 + oparg + oparg*2",
)
self.assertEqual(
- generate_cases.string_effect_size(
- generate_cases.list_effect_size(output_effects),
+ formatting.string_effect_size(
+ formatting.list_effect_size(output_effects),
),
"2 + oparg*4",
)
self.assertEqual(
- generate_cases.string_effect_size(
- generate_cases.list_effect_size(other_effects),
+ formatting.string_effect_size(
+ formatting.list_effect_size(other_effects),
),
"2 + (oparg<<1)",
)
@@ -90,23 +92,17 @@ def tearDown(self) -> None:
def run_cases_test(self, input: str, expected: str):
with open(self.temp_input_filename, "w+") as temp_input:
- temp_input.write(generate_cases.BEGIN_MARKER)
+ temp_input.write(analysis.BEGIN_MARKER)
temp_input.write(input)
- temp_input.write(generate_cases.END_MARKER)
+ temp_input.write(analysis.END_MARKER)
temp_input.flush()
- a = generate_cases.Analyzer(
- [self.temp_input_filename],
- self.temp_output_filename,
- self.temp_metadata_filename,
- self.temp_pymetadata_filename,
- self.temp_executor_filename,
- )
+ a = generate_cases.Generator([self.temp_input_filename])
a.parse()
a.analyze()
if a.errors:
raise RuntimeError(f"Found {a.errors} errors")
- a.write_instructions()
+ a.write_instructions(self.temp_output_filename, False)
with open(self.temp_output_filename) as temp_output:
lines = temp_output.readlines()
diff --git a/Tools/cases_generator/analysis.py b/Tools/cases_generator/analysis.py
new file mode 100644
index 0000000..53ae736
--- /dev/null
+++ b/Tools/cases_generator/analysis.py
@@ -0,0 +1,412 @@
+import re
+import sys
+import typing
+
+from flags import InstructionFlags, variable_used
+from formatting import prettify_filename, UNUSED
+from instructions import (
+ ActiveCacheEffect,
+ Component,
+ Instruction,
+ InstructionOrCacheEffect,
+ MacroInstruction,
+ MacroParts,
+ OverriddenInstructionPlaceHolder,
+ PseudoInstruction,
+ StackEffectMapping,
+)
+import parsing
+from parsing import StackEffect
+
+BEGIN_MARKER = "// BEGIN BYTECODES //"
+END_MARKER = "// END BYTECODES //"
+
+RESERVED_WORDS = {
+ "co_consts": "Use FRAME_CO_CONSTS.",
+ "co_names": "Use FRAME_CO_NAMES.",
+}
+
+RE_PREDICTED = r"^\s*(?:GO_TO_INSTRUCTION\(|DEOPT_IF\(.*?,\s*)(\w+)\);\s*(?://.*)?$"
+
+
+class Analyzer:
+ """Parse input, analyze it, and write to output."""
+
+ input_filenames: list[str]
+ errors: int = 0
+
+ def __init__(self, input_filenames: list[str]):
+ self.input_filenames = input_filenames
+
+ def error(self, msg: str, node: parsing.Node) -> None:
+ lineno = 0
+ filename = "<unknown file>"
+ if context := node.context:
+ filename = context.owner.filename
+ # Use line number of first non-comment in the node
+ for token in context.owner.tokens[context.begin : context.end]:
+ lineno = token.line
+ if token.kind != "COMMENT":
+ break
+ print(f"{filename}:{lineno}: {msg}", file=sys.stderr)
+ self.errors += 1
+
+ everything: list[
+ parsing.InstDef
+ | parsing.Macro
+ | parsing.Pseudo
+ | OverriddenInstructionPlaceHolder
+ ]
+ instrs: dict[str, Instruction] # Includes ops
+ macros: dict[str, parsing.Macro]
+ macro_instrs: dict[str, MacroInstruction]
+ families: dict[str, parsing.Family]
+ pseudos: dict[str, parsing.Pseudo]
+ pseudo_instrs: dict[str, PseudoInstruction]
+
+ def parse(self) -> None:
+ """Parse the source text.
+
+ We only want the parser to see the stuff between the
+ begin and end markers.
+ """
+
+ self.everything = []
+ self.instrs = {}
+ self.macros = {}
+ self.families = {}
+ self.pseudos = {}
+
+ instrs_idx: dict[str, int] = dict()
+
+ for filename in self.input_filenames:
+ self.parse_file(filename, instrs_idx)
+
+ files = " + ".join(self.input_filenames)
+ print(
+ f"Read {len(self.instrs)} instructions/ops, "
+ f"{len(self.macros)} macros, {len(self.pseudos)} pseudos, "
+ f"and {len(self.families)} families from {files}",
+ file=sys.stderr,
+ )
+
+ def parse_file(self, filename: str, instrs_idx: dict[str, int]) -> None:
+ with open(filename) as file:
+ src = file.read()
+
+ psr = parsing.Parser(src, filename=prettify_filename(filename))
+
+ # Skip until begin marker
+ while tkn := psr.next(raw=True):
+ if tkn.text == BEGIN_MARKER:
+ break
+ else:
+ raise psr.make_syntax_error(
+ f"Couldn't find {BEGIN_MARKER!r} in {psr.filename}"
+ )
+ start = psr.getpos()
+
+ # Find end marker, then delete everything after it
+ while tkn := psr.next(raw=True):
+ if tkn.text == END_MARKER:
+ break
+ del psr.tokens[psr.getpos() - 1 :]
+
+ # Parse from start
+ psr.setpos(start)
+ thing: parsing.Node | None
+ thing_first_token = psr.peek()
+ while thing := psr.definition():
+ thing = typing.cast(
+ parsing.InstDef | parsing.Macro | parsing.Pseudo | parsing.Family, thing
+ )
+ if ws := [w for w in RESERVED_WORDS if variable_used(thing, w)]:
+ self.error(
+ f"'{ws[0]}' is a reserved word. {RESERVED_WORDS[ws[0]]}", thing
+ )
+
+ match thing:
+ case parsing.InstDef(name=name):
+ if name in self.instrs:
+ if not thing.override:
+ raise psr.make_syntax_error(
+ f"Duplicate definition of '{name}' @ {thing.context} "
+ f"previous definition @ {self.instrs[name].inst.context}",
+ thing_first_token,
+ )
+ self.everything[
+ instrs_idx[name]
+ ] = OverriddenInstructionPlaceHolder(name=name)
+ if name not in self.instrs and thing.override:
+ raise psr.make_syntax_error(
+ f"Definition of '{name}' @ {thing.context} is supposed to be "
+ "an override but no previous definition exists.",
+ thing_first_token,
+ )
+ self.instrs[name] = Instruction(thing)
+ instrs_idx[name] = len(self.everything)
+ self.everything.append(thing)
+ case parsing.Macro(name):
+ self.macros[name] = thing
+ self.everything.append(thing)
+ case parsing.Family(name):
+ self.families[name] = thing
+ case parsing.Pseudo(name):
+ self.pseudos[name] = thing
+ self.everything.append(thing)
+ case _:
+ typing.assert_never(thing)
+ if not psr.eof():
+ raise psr.make_syntax_error(f"Extra stuff at the end of {filename}")
+
+ def analyze(self) -> None:
+ """Analyze the inputs.
+
+ Raises SystemExit if there is an error.
+ """
+ self.analyze_macros_and_pseudos()
+ self.find_predictions()
+ self.map_families()
+ self.check_families()
+
+ def find_predictions(self) -> None:
+ """Find the instructions that need PREDICTED() labels."""
+ for instr in self.instrs.values():
+ targets: set[str] = set()
+ for line in instr.block_text:
+ if m := re.match(RE_PREDICTED, line):
+ targets.add(m.group(1))
+ for target in targets:
+ if target_instr := self.instrs.get(target):
+ target_instr.predicted = True
+ elif target_macro := self.macro_instrs.get(target):
+ target_macro.predicted = True
+ else:
+ self.error(
+ f"Unknown instruction {target!r} predicted in {instr.name!r}",
+ instr.inst, # TODO: Use better location
+ )
+
+ def map_families(self) -> None:
+ """Link instruction names back to their family, if they have one."""
+ for family in self.families.values():
+ for member in [family.name] + family.members:
+ if member_instr := self.instrs.get(member):
+ if (
+ member_instr.family is not family
+ and member_instr.family is not None
+ ):
+ self.error(
+ f"Instruction {member} is a member of multiple families "
+ f"({member_instr.family.name}, {family.name}).",
+ family,
+ )
+ else:
+ member_instr.family = family
+ elif not self.macro_instrs.get(member):
+ self.error(
+ f"Unknown instruction {member!r} referenced in family {family.name!r}",
+ family,
+ )
+
+ def check_families(self) -> None:
+ """Check each family:
+
+ - Must have at least 2 members (including head)
+ - Head and all members must be known instructions
+ - Head and all members must have the same cache, input and output effects
+ """
+ for family in self.families.values():
+ if family.name not in self.macro_instrs and family.name not in self.instrs:
+ self.error(
+ f"Family {family.name!r} has unknown instruction {family.name!r}",
+ family,
+ )
+ members = [
+ member
+ for member in family.members
+ if member in self.instrs or member in self.macro_instrs
+ ]
+ if members != family.members:
+ unknown = set(family.members) - set(members)
+ self.error(
+ f"Family {family.name!r} has unknown members: {unknown}", family
+ )
+ expected_effects = self.effect_counts(family.name)
+ for member in members:
+ member_effects = self.effect_counts(member)
+ if member_effects != expected_effects:
+ self.error(
+ f"Family {family.name!r} has inconsistent "
+ f"(cache, input, output) effects:\n"
+ f" {family.name} = {expected_effects}; "
+ f"{member} = {member_effects}",
+ family,
+ )
+
+ def effect_counts(self, name: str) -> tuple[int, int, int]:
+ if instr := self.instrs.get(name):
+ cache = instr.cache_offset
+ input = len(instr.input_effects)
+ output = len(instr.output_effects)
+ elif mac := self.macro_instrs.get(name):
+ cache = mac.cache_offset
+ input, output = 0, 0
+ for part in mac.parts:
+ if isinstance(part, Component):
+ # A component may pop what the previous component pushed,
+ # so we offset the input/output counts by that.
+ delta_i = len(part.instr.input_effects)
+ delta_o = len(part.instr.output_effects)
+ offset = min(delta_i, output)
+ input += delta_i - offset
+ output += delta_o - offset
+ else:
+ assert False, f"Unknown instruction {name!r}"
+ return cache, input, output
+
+ def analyze_macros_and_pseudos(self) -> None:
+ """Analyze each macro and pseudo instruction."""
+ self.macro_instrs = {}
+ self.pseudo_instrs = {}
+ for name, macro in self.macros.items():
+ self.macro_instrs[name] = self.analyze_macro(macro)
+ for name, pseudo in self.pseudos.items():
+ self.pseudo_instrs[name] = self.analyze_pseudo(pseudo)
+
+ def analyze_macro(self, macro: parsing.Macro) -> MacroInstruction:
+ components = self.check_macro_components(macro)
+ stack, initial_sp = self.stack_analysis(components)
+ sp = initial_sp
+ parts: MacroParts = []
+ flags = InstructionFlags.newEmpty()
+ offset = 0
+ for component in components:
+ match component:
+ case parsing.CacheEffect() as ceffect:
+ parts.append(ceffect)
+ offset += ceffect.size
+ case Instruction() as instr:
+ part, sp, offset = self.analyze_instruction(
+ instr, stack, sp, offset
+ )
+ parts.append(part)
+ flags.add(instr.instr_flags)
+ case _:
+ typing.assert_never(component)
+ final_sp = sp
+ format = "IB"
+ if offset:
+ format += "C" + "0" * (offset - 1)
+ return MacroInstruction(
+ macro.name, stack, initial_sp, final_sp, format, flags, macro, parts, offset
+ )
+
+ def analyze_pseudo(self, pseudo: parsing.Pseudo) -> PseudoInstruction:
+ targets = [self.instrs[target] for target in pseudo.targets]
+ assert targets
+ # Make sure the targets have the same fmt
+ fmts = list(set([t.instr_fmt for t in targets]))
+ assert len(fmts) == 1
+ assert len(list(set([t.instr_flags.bitmap() for t in targets]))) == 1
+ return PseudoInstruction(pseudo.name, targets, fmts[0], targets[0].instr_flags)
+
+ def analyze_instruction(
+ self, instr: Instruction, stack: list[StackEffect], sp: int, offset: int
+ ) -> tuple[Component, int, int]:
+ input_mapping: StackEffectMapping = []
+ for ieffect in reversed(instr.input_effects):
+ sp -= 1
+ input_mapping.append((stack[sp], ieffect))
+ output_mapping: StackEffectMapping = []
+ for oeffect in instr.output_effects:
+ output_mapping.append((stack[sp], oeffect))
+ sp += 1
+ active_effects: list[ActiveCacheEffect] = []
+ for ceffect in instr.cache_effects:
+ if ceffect.name != UNUSED:
+ active_effects.append(ActiveCacheEffect(ceffect, offset))
+ offset += ceffect.size
+ return (
+ Component(instr, input_mapping, output_mapping, active_effects),
+ sp,
+ offset,
+ )
+
+ def check_macro_components(
+ self, macro: parsing.Macro
+ ) -> list[InstructionOrCacheEffect]:
+ components: list[InstructionOrCacheEffect] = []
+ for uop in macro.uops:
+ match uop:
+ case parsing.OpName(name):
+ if name not in self.instrs:
+ self.error(f"Unknown instruction {name!r}", macro)
+ components.append(self.instrs[name])
+ case parsing.CacheEffect():
+ components.append(uop)
+ case _:
+ typing.assert_never(uop)
+ return components
+
+ def stack_analysis(
+ self, components: typing.Iterable[InstructionOrCacheEffect]
+ ) -> tuple[list[StackEffect], int]:
+ """Analyze a macro.
+
+ Ignore cache effects.
+
+ Return the list of variables (as StackEffects) and the initial stack pointer.
+ """
+ lowest = current = highest = 0
+ conditions: dict[int, str] = {} # Indexed by 'current'.
+ last_instr: Instruction | None = None
+ for thing in components:
+ if isinstance(thing, Instruction):
+ last_instr = thing
+ for thing in components:
+ match thing:
+ case Instruction() as instr:
+ if any(
+ eff.size for eff in instr.input_effects + instr.output_effects
+ ):
+ # TODO: Eventually this will be needed, at least for macros.
+ self.error(
+ f"Instruction {instr.name!r} has variable-sized stack effect, "
+ "which are not supported in macro instructions",
+ instr.inst, # TODO: Pass name+location of macro
+ )
+ if any(eff.cond for eff in instr.input_effects):
+ self.error(
+ f"Instruction {instr.name!r} has conditional input stack effect, "
+ "which are not supported in macro instructions",
+ instr.inst, # TODO: Pass name+location of macro
+ )
+ if (
+ any(eff.cond for eff in instr.output_effects)
+ and instr is not last_instr
+ ):
+ self.error(
+ f"Instruction {instr.name!r} has conditional output stack effect, "
+ "but is not the last instruction in a macro",
+ instr.inst, # TODO: Pass name+location of macro
+ )
+ current -= len(instr.input_effects)
+ lowest = min(lowest, current)
+ for eff in instr.output_effects:
+ if eff.cond:
+ conditions[current] = eff.cond
+ current += 1
+ highest = max(highest, current)
+ case parsing.CacheEffect():
+ pass
+ case _:
+ typing.assert_never(thing)
+ # At this point, 'current' is the net stack effect,
+ # and 'lowest' and 'highest' are the extremes.
+ # Note that 'lowest' may be negative.
+ stack = [
+ StackEffect(f"_tmp_{i}", "", conditions.get(highest - i, ""))
+ for i in reversed(range(1, highest - lowest + 1))
+ ]
+ return stack, -lowest
diff --git a/Tools/cases_generator/flags.py b/Tools/cases_generator/flags.py
new file mode 100644
index 0000000..78acd93
--- /dev/null
+++ b/Tools/cases_generator/flags.py
@@ -0,0 +1,102 @@
+import dataclasses
+
+from formatting import Formatter
+import lexer as lx
+import parsing
+
+
+@dataclasses.dataclass
+class InstructionFlags:
+ """Construct and manipulate instruction flags"""
+
+ HAS_ARG_FLAG: bool
+ HAS_CONST_FLAG: bool
+ HAS_NAME_FLAG: bool
+ HAS_JUMP_FLAG: bool
+ HAS_FREE_FLAG: bool
+ HAS_LOCAL_FLAG: bool
+
+ def __post_init__(self):
+ self.bitmask = {name: (1 << i) for i, name in enumerate(self.names())}
+
+ @staticmethod
+ def fromInstruction(instr: parsing.Node):
+
+ has_free = (
+ variable_used(instr, "PyCell_New")
+ or variable_used(instr, "PyCell_GET")
+ or variable_used(instr, "PyCell_SET")
+ )
+
+ return InstructionFlags(
+ HAS_ARG_FLAG=variable_used(instr, "oparg"),
+ HAS_CONST_FLAG=variable_used(instr, "FRAME_CO_CONSTS"),
+ HAS_NAME_FLAG=variable_used(instr, "FRAME_CO_NAMES"),
+ HAS_JUMP_FLAG=variable_used(instr, "JUMPBY"),
+ HAS_FREE_FLAG=has_free,
+ HAS_LOCAL_FLAG=(
+ variable_used(instr, "GETLOCAL") or variable_used(instr, "SETLOCAL")
+ )
+ and not has_free,
+ )
+
+ @staticmethod
+ def newEmpty():
+ return InstructionFlags(False, False, False, False, False, False)
+
+ def add(self, other: "InstructionFlags") -> None:
+ for name, value in dataclasses.asdict(other).items():
+ if value:
+ setattr(self, name, value)
+
+ def names(self, value=None):
+ if value is None:
+ return dataclasses.asdict(self).keys()
+ return [n for n, v in dataclasses.asdict(self).items() if v == value]
+
+ def bitmap(self) -> int:
+ flags = 0
+ for name in self.names():
+ if getattr(self, name):
+ flags |= self.bitmask[name]
+ return flags
+
+ @classmethod
+ def emit_macros(cls, out: Formatter):
+ flags = cls.newEmpty()
+ for name, value in flags.bitmask.items():
+ out.emit(f"#define {name} ({value})")
+
+ for name, value in flags.bitmask.items():
+ out.emit(
+ f"#define OPCODE_{name[:-len('_FLAG')]}(OP) "
+ f"(_PyOpcode_opcode_metadata[OP].flags & ({name}))"
+ )
+
+
+def variable_used(node: parsing.Node, name: str) -> bool:
+ """Determine whether a variable with a given name is used in a node."""
+ return any(
+ token.kind == "IDENTIFIER" and token.text == name for token in node.tokens
+ )
+
+
+def variable_used_unspecialized(node: parsing.Node, name: str) -> bool:
+ """Like variable_used(), but skips #if ENABLE_SPECIALIZATION blocks."""
+ tokens: list[lx.Token] = []
+ skipping = False
+ for i, token in enumerate(node.tokens):
+ if token.kind == "MACRO":
+ text = "".join(token.text.split())
+ # TODO: Handle nested #if
+ if text == "#if":
+ if (
+ i + 1 < len(node.tokens)
+ and node.tokens[i + 1].text == "ENABLE_SPECIALIZATION"
+ ):
+ skipping = True
+ elif text in ("#else", "#endif"):
+ skipping = False
+ if not skipping:
+ tokens.append(token)
+ return any(token.kind == "IDENTIFIER" and token.text == name for token in tokens)
diff --git a/Tools/cases_generator/formatting.py b/Tools/cases_generator/formatting.py
new file mode 100644
index 0000000..6b5a375
--- /dev/null
+++ b/Tools/cases_generator/formatting.py
@@ -0,0 +1,188 @@
+import contextlib
+import re
+import typing
+
+from parsing import StackEffect
+
+UNUSED = "unused"
+
+
+class Formatter:
+ """Wraps an output stream with the ability to indent etc."""
+
+ stream: typing.TextIO
+ prefix: str
+ emit_line_directives: bool = False
+ lineno: int # Next line number, 1-based
+ filename: str # Slightly improved stream.filename
+ nominal_lineno: int
+ nominal_filename: str
+
+ def __init__(
+ self, stream: typing.TextIO, indent: int,
+ emit_line_directives: bool = False, comment: str = "//",
+ ) -> None:
+ self.stream = stream
+ self.prefix = " " * indent
+ self.emit_line_directives = emit_line_directives
+ self.comment = comment
+ self.lineno = 1
+ self.filename = prettify_filename(self.stream.name)
+ self.nominal_lineno = 1
+ self.nominal_filename = self.filename
+
+ def write_raw(self, s: str) -> None:
+ self.stream.write(s)
+ newlines = s.count("\n")
+ self.lineno += newlines
+ self.nominal_lineno += newlines
+
+ def emit(self, arg: str) -> None:
+ if arg:
+ self.write_raw(f"{self.prefix}{arg}\n")
+ else:
+ self.write_raw("\n")
+
+ def set_lineno(self, lineno: int, filename: str) -> None:
+ if self.emit_line_directives:
+ if lineno != self.nominal_lineno or filename != self.nominal_filename:
+ self.emit(f'#line {lineno} "{filename}"')
+ self.nominal_lineno = lineno
+ self.nominal_filename = filename
+
+ def reset_lineno(self) -> None:
+ if self.lineno != self.nominal_lineno or self.filename != self.nominal_filename:
+ self.set_lineno(self.lineno + 1, self.filename)
+
+ @contextlib.contextmanager
+ def indent(self):
+ self.prefix += " "
+ yield
+ self.prefix = self.prefix[:-4]
+
+ @contextlib.contextmanager
+ def block(self, head: str, tail: str = ""):
+ if head:
+ self.emit(head + " {")
+ else:
+ self.emit("{")
+ with self.indent():
+ yield
+ self.emit("}" + tail)
+
+ def stack_adjust(
+ self,
+ input_effects: list[StackEffect],
+ output_effects: list[StackEffect],
+ ):
+ shrink, isym = list_effect_size(input_effects)
+ grow, osym = list_effect_size(output_effects)
+ diff = grow - shrink
+ if isym and isym != osym:
+ self.emit(f"STACK_SHRINK({isym});")
+ if diff < 0:
+ self.emit(f"STACK_SHRINK({-diff});")
+ if diff > 0:
+ self.emit(f"STACK_GROW({diff});")
+ if osym and osym != isym:
+ self.emit(f"STACK_GROW({osym});")
+
+ def declare(self, dst: StackEffect, src: StackEffect | None):
+ if dst.name == UNUSED or dst.cond == "0":
+ return
+ typ = f"{dst.type}" if dst.type else "PyObject *"
+ if src:
+ cast = self.cast(dst, src)
+ init = f" = {cast}{src.name}"
+ elif dst.cond:
+ init = " = NULL"
+ else:
+ init = ""
+ sepa = "" if typ.endswith("*") else " "
+ self.emit(f"{typ}{sepa}{dst.name}{init};")
+
+ def assign(self, dst: StackEffect, src: StackEffect):
+ if src.name == UNUSED:
+ return
+ if src.size:
+ # Don't write sized arrays -- it's up to the user code.
+ return
+ cast = self.cast(dst, src)
+ if re.match(r"^REG\(oparg(\d+)\)$", dst.name):
+ self.emit(f"Py_XSETREF({dst.name}, {cast}{src.name});")
+ else:
+ stmt = f"{dst.name} = {cast}{src.name};"
+ if src.cond and src.cond != "1":
+ if src.cond == "0":
+ # It will not be executed
+ return
+ stmt = f"if ({src.cond}) {{ {stmt} }}"
+ self.emit(stmt)
+
+ def cast(self, dst: StackEffect, src: StackEffect) -> str:
+ return f"({dst.type or 'PyObject *'})" if src.type != dst.type else ""
+
+
+def prettify_filename(filename: str) -> str:
+ # Make filename more user-friendly and less platform-specific,
+ # it is only used for error reporting at this point.
+ filename = filename.replace("\\", "/")
+ if filename.startswith("./"):
+ filename = filename[2:]
+ if filename.endswith(".new"):
+ filename = filename[:-4]
+ return filename
+
+
+def list_effect_size(effects: list[StackEffect]) -> tuple[int, str]:
+ numeric = 0
+ symbolic: list[str] = []
+ for effect in effects:
+ diff, sym = effect_size(effect)
+ numeric += diff
+ if sym:
+ symbolic.append(maybe_parenthesize(sym))
+ return numeric, " + ".join(symbolic)
+
+
+def effect_size(effect: StackEffect) -> tuple[int, str]:
+ """Return the 'size' impact of a stack effect.
+
+ Returns a tuple (numeric, symbolic) where:
+
+ - numeric is an int giving the statically analyzable size of the effect
+ - symbolic is a string representing a variable effect (e.g. 'oparg*2')
+
+ At most one of these will be non-zero / non-empty.
+ """
+ if effect.size:
+ assert not effect.cond, "Array effects cannot have a condition"
+ return 0, effect.size
+ elif effect.cond:
+ if effect.cond in ("0", "1"):
+ return int(effect.cond), ""
+ return 0, f"{maybe_parenthesize(effect.cond)} ? 1 : 0"
+ else:
+ return 1, ""
+
+
+def maybe_parenthesize(sym: str) -> str:
+ """Add parentheses around a string if it contains an operator.
+
+ An exception is made for '*' which is common and harmless
+ in the context where the symbolic size is used.
+ """
+ if re.match(r"^[\s\w*]+$", sym):
+ return sym
+ else:
+ return f"({sym})"
+
+
+def string_effect_size(arg: tuple[int, str]) -> str:
+ numeric, symbolic = arg
+ if numeric and symbolic:
+ return f"{numeric} + {symbolic}"
+ elif symbolic:
+ return symbolic
+ else:
+ return str(numeric)
diff --git a/Tools/cases_generator/generate_cases.py b/Tools/cases_generator/generate_cases.py
index 3a679b2..967e1e2 100644
--- a/Tools/cases_generator/generate_cases.py
+++ b/Tools/cases_generator/generate_cases.py
@@ -1,21 +1,31 @@
"""Generate the main interpreter switch.
-
Reads the instruction definitions from bytecodes.c.
Writes the cases to generated_cases.c.h, which is #included in ceval.c.
"""
import argparse
import contextlib
-import dataclasses
import os
import posixpath
-import re
import sys
import typing
-import lexer as lx
-import parser
-from parser import StackEffect
+from analysis import Analyzer
+from formatting import Formatter, list_effect_size, maybe_parenthesize
+from flags import InstructionFlags, variable_used
+from instructions import (
+ AnyInstruction,
+ Component,
+ Instruction,
+ MacroInstruction,
+ MacroParts,
+ PseudoInstruction,
+ StackEffect,
+ OverriddenInstructionPlaceHolder,
+ TIER_TWO,
+)
+import parsing
+from parsing import StackEffect
HERE = os.path.dirname(__file__)
@@ -33,13 +43,6 @@
DEFAULT_EXECUTOR_OUTPUT = os.path.relpath(
os.path.join(ROOT, "Python/executor_cases.c.h")
)
-BEGIN_MARKER = "// BEGIN BYTECODES //"
-END_MARKER = "// END BYTECODES //"
-RE_PREDICTED = (
- r"^\s*(?:GO_TO_INSTRUCTION\(|DEOPT_IF\(.*?,\s*)(\w+)\);\s*(?://.*)?$"
-)
-UNUSED = "unused"
-BITS_PER_CODE_UNIT = 16
# Constants used instead of size for macro expansions.
# Note: 1, 2, 4 must match actual cache entry sizes.
@@ -52,10 +55,7 @@
"OPARG_BOTTOM": 6,
}
-RESERVED_WORDS = {
- "co_consts" : "Use FRAME_CO_CONSTS.",
- "co_names": "Use FRAME_CO_NAMES.",
-}
+INSTR_FMT_PREFIX = "INSTR_FMT_"
arg_parser = argparse.ArgumentParser(
description="Generate the code for the interpreter switch.",
@@ -65,10 +65,18 @@
"-o", "--output", type=str, help="Generated code", default=DEFAULT_OUTPUT
)
arg_parser.add_argument(
- "-m", "--metadata", type=str, help="Generated C metadata", default=DEFAULT_METADATA_OUTPUT
+ "-m",
+ "--metadata",
+ type=str,
+ help="Generated C metadata",
+ default=DEFAULT_METADATA_OUTPUT,
)
arg_parser.add_argument(
- "-p", "--pymetadata", type=str, help="Generated Python metadata", default=DEFAULT_PYMETADATA_OUTPUT
+ "-p",
+ "--pymetadata",
+ type=str,
+ help="Generated Python metadata",
+ default=DEFAULT_PYMETADATA_OUTPUT,
)
arg_parser.add_argument(
"-l", "--emit-line-directives", help="Emit #line directives", action="store_true"
@@ -85,966 +93,9 @@
)
-def effect_size(effect: StackEffect) -> tuple[int, str]:
- """Return the 'size' impact of a stack effect.
-
- Returns a tuple (numeric, symbolic) where:
-
- - numeric is an int giving the statically analyzable size of the effect
- - symbolic is a string representing a variable effect (e.g. 'oparg*2')
-
- At most one of these will be non-zero / non-empty.
- """
- if effect.size:
- assert not effect.cond, "Array effects cannot have a condition"
- return 0, effect.size
- elif effect.cond:
- if effect.cond in ("0", "1"):
- return int(effect.cond), ""
- return 0, f"{maybe_parenthesize(effect.cond)} ? 1 : 0"
- else:
- return 1, ""
-
-
-def maybe_parenthesize(sym: str) -> str:
- """Add parentheses around a string if it contains an operator.
-
- An exception is made for '*' which is common and harmless
- in the context where the symbolic size is used.
- """
- if re.match(r"^[\s\w*]+$", sym):
- return sym
- else:
- return f"({sym})"
-
-
-def list_effect_size(effects: list[StackEffect]) -> tuple[int, str]:
- numeric = 0
- symbolic: list[str] = []
- for effect in effects:
- diff, sym = effect_size(effect)
- numeric += diff
- if sym:
- symbolic.append(maybe_parenthesize(sym))
- return numeric, " + ".join(symbolic)
-
-
-def string_effect_size(arg: tuple[int, str]) -> str:
- numeric, symbolic = arg
- if numeric and symbolic:
- return f"{numeric} + {symbolic}"
- elif symbolic:
- return symbolic
- else:
- return str(numeric)
-
-
-class Formatter:
- """Wraps an output stream with the ability to indent etc."""
-
- stream: typing.TextIO
- prefix: str
- emit_line_directives: bool = False
- lineno: int # Next line number, 1-based
- filename: str # Slightly improved stream.filename
- nominal_lineno: int
- nominal_filename: str
-
- def __init__(
- self, stream: typing.TextIO, indent: int,
- emit_line_directives: bool = False, comment: str = "//",
- ) -> None:
- self.stream = stream
- self.prefix = " " * indent
- self.emit_line_directives = emit_line_directives
- self.comment = comment
- self.lineno = 1
- self.filename = prettify_filename(self.stream.name)
- self.nominal_lineno = 1
- self.nominal_filename = self.filename
-
- def write_raw(self, s: str) -> None:
- self.stream.write(s)
- newlines = s.count("\n")
- self.lineno += newlines
- self.nominal_lineno += newlines
-
- def emit(self, arg: str) -> None:
- if arg:
- self.write_raw(f"{self.prefix}{arg}\n")
- else:
- self.write_raw("\n")
-
- def set_lineno(self, lineno: int, filename: str) -> None:
- if self.emit_line_directives:
- if lineno != self.nominal_lineno or filename != self.nominal_filename:
- self.emit(f'#line {lineno} "{filename}"')
- self.nominal_lineno = lineno
- self.nominal_filename = filename
-
- def reset_lineno(self) -> None:
- if self.lineno != self.nominal_lineno or self.filename != self.nominal_filename:
- self.set_lineno(self.lineno + 1, self.filename)
-
- @contextlib.contextmanager
- def indent(self):
- self.prefix += " "
- yield
- self.prefix = self.prefix[:-4]
-
- @contextlib.contextmanager
- def block(self, head: str, tail: str = ""):
- if head:
- self.emit(head + " {")
- else:
- self.emit("{")
- with self.indent():
- yield
- self.emit("}" + tail)
-
- def stack_adjust(
- self,
- input_effects: list[StackEffect],
- output_effects: list[StackEffect],
- ):
- shrink, isym = list_effect_size(input_effects)
- grow, osym = list_effect_size(output_effects)
- diff = grow - shrink
- if isym and isym != osym:
- self.emit(f"STACK_SHRINK({isym});")
- if diff < 0:
- self.emit(f"STACK_SHRINK({-diff});")
- if diff > 0:
- self.emit(f"STACK_GROW({diff});")
- if osym and osym != isym:
- self.emit(f"STACK_GROW({osym});")
-
- def declare(self, dst: StackEffect, src: StackEffect | None):
- if dst.name == UNUSED or dst.cond == "0":
- return
- typ = f"{dst.type}" if dst.type else "PyObject *"
- if src:
- cast = self.cast(dst, src)
- init = f" = {cast}{src.name}"
- elif dst.cond:
- init = " = NULL"
- else:
- init = ""
- sepa = "" if typ.endswith("*") else " "
- self.emit(f"{typ}{sepa}{dst.name}{init};")
-
- def assign(self, dst: StackEffect, src: StackEffect):
- if src.name == UNUSED:
- return
- if src.size:
- # Don't write sized arrays -- it's up to the user code.
- return
- cast = self.cast(dst, src)
- if re.match(r"^REG\(oparg(\d+)\)$", dst.name):
- self.emit(f"Py_XSETREF({dst.name}, {cast}{src.name});")
- else:
- stmt = f"{dst.name} = {cast}{src.name};"
- if src.cond and src.cond != "1":
- if src.cond == "0":
- # It will not be executed
- return
- stmt = f"if ({src.cond}) {{ {stmt} }}"
- self.emit(stmt)
-
- def cast(self, dst: StackEffect, src: StackEffect) -> str:
- return f"({dst.type or 'PyObject *'})" if src.type != dst.type else ""
-
-@dataclasses.dataclass
-class InstructionFlags:
- """Construct and manipulate instruction flags"""
-
- HAS_ARG_FLAG: bool
- HAS_CONST_FLAG: bool
- HAS_NAME_FLAG: bool
- HAS_JUMP_FLAG: bool
- HAS_FREE_FLAG: bool
- HAS_LOCAL_FLAG: bool
-
- def __post_init__(self):
- self.bitmask = {
- name : (1 << i) for i, name in enumerate(self.names())
- }
-
- @staticmethod
- def fromInstruction(instr: "AnyInstruction"):
-
- has_free = (variable_used(instr, "PyCell_New") or
- variable_used(instr, "PyCell_GET") or
- variable_used(instr, "PyCell_SET"))
-
- return InstructionFlags(
- HAS_ARG_FLAG=variable_used(instr, "oparg"),
- HAS_CONST_FLAG=variable_used(instr, "FRAME_CO_CONSTS"),
- HAS_NAME_FLAG=variable_used(instr, "FRAME_CO_NAMES"),
- HAS_JUMP_FLAG=variable_used(instr, "JUMPBY"),
- HAS_FREE_FLAG=has_free,
- HAS_LOCAL_FLAG=(variable_used(instr, "GETLOCAL") or
- variable_used(instr, "SETLOCAL")) and
- not has_free,
- )
-
- @staticmethod
- def newEmpty():
- return InstructionFlags(False, False, False, False, False, False)
-
- def add(self, other: "InstructionFlags") -> None:
- for name, value in dataclasses.asdict(other).items():
- if value:
- setattr(self, name, value)
-
- def names(self, value=None):
- if value is None:
- return dataclasses.asdict(self).keys()
- return [n for n, v in dataclasses.asdict(self).items() if v == value]
-
- def bitmap(self) -> int:
- flags = 0
- for name in self.names():
- if getattr(self, name):
- flags |= self.bitmask[name]
- return flags
-
- @classmethod
- def emit_macros(cls, out: Formatter):
- flags = cls.newEmpty()
- for name, value in flags.bitmask.items():
- out.emit(f"#define {name} ({value})");
-
- for name, value in flags.bitmask.items():
- out.emit(
- f"#define OPCODE_{name[:-len('_FLAG')]}(OP) "
- f"(_PyOpcode_opcode_metadata[OP].flags & ({name}))")
-
-
-@dataclasses.dataclass
-class ActiveCacheEffect:
- """Wraps a CacheEffect that is actually used, in context."""
- effect: parser.CacheEffect
- offset: int
-
-
-FORBIDDEN_NAMES_IN_UOPS = (
- "resume_with_error",
- "kwnames",
- "next_instr",
- "oparg1", # Proxy for super-instructions like LOAD_FAST_LOAD_FAST
- "JUMPBY",
- "DISPATCH",
- "INSTRUMENTED_JUMP",
- "throwflag",
- "exception_unwind",
- "import_from",
- "import_name",
- "_PyObject_CallNoArgs", # Proxy for BEFORE_WITH
-)
-
-
-# Interpreter tiers
-TIER_ONE = 1 # Specializing adaptive interpreter (PEP 659)
-TIER_TWO = 2 # Experimental tracing interpreter
-Tiers: typing.TypeAlias = typing.Literal[1, 2]
-
-
-@dataclasses.dataclass
-class Instruction:
- """An instruction with additional data and code."""
-
- # Parts of the underlying instruction definition
- inst: parser.InstDef
- kind: typing.Literal["inst", "op"]
- name: str
- block: parser.Block
- block_text: list[str] # Block.text, less curlies, less PREDICT() calls
- block_line: int # First line of block in original code
-
- # Computed by constructor
- always_exits: bool
- cache_offset: int
- cache_effects: list[parser.CacheEffect]
- input_effects: list[StackEffect]
- output_effects: list[StackEffect]
- unmoved_names: frozenset[str]
- instr_fmt: str
- instr_flags: InstructionFlags
- active_caches: list[ActiveCacheEffect]
-
- # Set later
- family: parser.Family | None = None
- predicted: bool = False
-
- def __init__(self, inst: parser.InstDef):
- self.inst = inst
- self.kind = inst.kind
- self.name = inst.name
- self.block = inst.block
- self.block_text, self.check_eval_breaker, self.block_line = \
- extract_block_text(self.block)
- self.always_exits = always_exits(self.block_text)
- self.cache_effects = [
- effect for effect in inst.inputs if isinstance(effect, parser.CacheEffect)
- ]
- self.cache_offset = sum(c.size for c in self.cache_effects)
- self.input_effects = [
- effect for effect in inst.inputs if isinstance(effect, StackEffect)
- ]
- self.output_effects = inst.outputs # For consistency/completeness
- unmoved_names: set[str] = set()
- for ieffect, oeffect in zip(self.input_effects, self.output_effects):
- if ieffect.name == oeffect.name:
- unmoved_names.add(ieffect.name)
- else:
- break
- self.unmoved_names = frozenset(unmoved_names)
-
- self.instr_flags = InstructionFlags.fromInstruction(inst)
-
- self.active_caches = []
- offset = 0
- for effect in self.cache_effects:
- if effect.name != UNUSED:
- self.active_caches.append(ActiveCacheEffect(effect, offset))
- offset += effect.size
-
- if self.instr_flags.HAS_ARG_FLAG:
- fmt = "IB"
- else:
- fmt = "IX"
- if offset:
- fmt += "C" + "0"*(offset-1)
- self.instr_fmt = fmt
-
- def is_viable_uop(self) -> bool:
- """Whether this instruction is viable as a uop."""
- dprint: typing.Callable[..., None] = lambda *args, **kwargs: None
- # if self.name.startswith("CALL"):
- # dprint = print
-
- if self.name == "EXIT_TRACE":
- return True # This has 'return frame' but it's okay
- if self.always_exits:
- dprint(f"Skipping {self.name} because it always exits")
- return False
- if len(self.active_caches) > 1:
- # print(f"Skipping {self.name} because it has >1 cache entries")
- return False
- res = True
- for forbidden in FORBIDDEN_NAMES_IN_UOPS:
- # NOTE: To disallow unspecialized uops, use
- # if variable_used(self.inst, forbidden):
- if variable_used_unspecialized(self.inst, forbidden):
- dprint(f"Skipping {self.name} because it uses {forbidden}")
- res = False
- return res
-
- def write(self, out: Formatter, tier: Tiers = TIER_ONE) -> None:
- """Write one instruction, sans prologue and epilogue."""
- # Write a static assertion that a family's cache size is correct
- if family := self.family:
- if self.name == family.name:
- if cache_size := family.size:
- out.emit(
- f"static_assert({cache_size} == "
- f'{self.cache_offset}, "incorrect cache size");'
- )
-
- # Write input stack effect variable declarations and initializations
- ieffects = list(reversed(self.input_effects))
- for i, ieffect in enumerate(ieffects):
- isize = string_effect_size(
- list_effect_size([ieff for ieff in ieffects[: i + 1]])
- )
- if ieffect.size:
- src = StackEffect(f"(stack_pointer - {maybe_parenthesize(isize)})", "PyObject **")
- elif ieffect.cond:
- src = StackEffect(f"({ieffect.cond}) ? stack_pointer[-{maybe_parenthesize(isize)}] : NULL", "")
- else:
- src = StackEffect(f"stack_pointer[-{maybe_parenthesize(isize)}]", "")
- out.declare(ieffect, src)
-
- # Write output stack effect variable declarations
- isize = string_effect_size(list_effect_size(self.input_effects))
- input_names = {ieffect.name for ieffect in self.input_effects}
- for i, oeffect in enumerate(self.output_effects):
- if oeffect.name not in input_names:
- if oeffect.size:
- osize = string_effect_size(
- list_effect_size([oeff for oeff in self.output_effects[:i]])
- )
- offset = "stack_pointer"
- if isize != osize:
- if isize != "0":
- offset += f" - ({isize})"
- if osize != "0":
- offset += f" + {osize}"
- src = StackEffect(offset, "PyObject **")
- out.declare(oeffect, src)
- else:
- out.declare(oeffect, None)
-
- # out.emit(f"next_instr += OPSIZE({self.inst.name}) - 1;")
-
- self.write_body(out, 0, self.active_caches, tier=tier)
-
- # Skip the rest if the block always exits
- if self.always_exits:
- return
-
- # Write net stack growth/shrinkage
- out.stack_adjust(
- [ieff for ieff in self.input_effects],
- [oeff for oeff in self.output_effects],
- )
-
- # Write output stack effect assignments
- oeffects = list(reversed(self.output_effects))
- for i, oeffect in enumerate(oeffects):
- if oeffect.name in self.unmoved_names:
- continue
- osize = string_effect_size(
- list_effect_size([oeff for oeff in oeffects[: i + 1]])
- )
- if oeffect.size:
- dst = StackEffect(f"stack_pointer - {maybe_parenthesize(osize)}", "PyObject **")
- else:
- dst = StackEffect(f"stack_pointer[-{maybe_parenthesize(osize)}]", "")
- out.assign(dst, oeffect)
-
- # Write cache effect
- if tier == TIER_ONE and self.cache_offset:
- out.emit(f"next_instr += {self.cache_offset};")
-
- def write_body(
- self,
- out: Formatter,
- dedent: int,
- active_caches: list[ActiveCacheEffect],
- tier: Tiers = TIER_ONE,
- ) -> None:
- """Write the instruction body."""
- # Write cache effect variable declarations and initializations
- for active in active_caches:
- ceffect = active.effect
- bits = ceffect.size * BITS_PER_CODE_UNIT
- if bits == 64:
- # NOTE: We assume that 64-bit data in the cache
- # is always an object pointer.
- # If this becomes false, we need a way to specify
- # syntactically what type the cache data is.
- typ = "PyObject *"
- func = "read_obj"
- else:
- typ = f"uint{bits}_t "
- func = f"read_u{bits}"
- if tier == TIER_ONE:
- out.emit(
- f"{typ}{ceffect.name} = {func}(&next_instr[{active.offset}].cache);"
- )
- else:
- out.emit(f"{typ}{ceffect.name} = ({typ.strip()})operand;")
-
- # Write the body, substituting a goto for ERROR_IF() and other stuff
- assert dedent <= 0
- extra = " " * -dedent
- names_to_skip = self.unmoved_names | frozenset({UNUSED, "null"})
- offset = 0
- context = self.block.context
- assert context is not None and context.owner is not None
- filename = context.owner.filename
- for line in self.block_text:
- out.set_lineno(self.block_line + offset, filename)
- offset += 1
- if m := re.match(r"(\s*)ERROR_IF\((.+), (\w+)\);\s*(?://.*)?$", line):
- space, cond, label = m.groups()
- space = extra + space
- # ERROR_IF() must pop the inputs from the stack.
- # The code block is responsible for DECREF()ing them.
- # NOTE: If the label doesn't exist, just add it to ceval.c.
-
- # Don't pop common input/output effects at the bottom!
- # These aren't DECREF'ed so they can stay.
- ieffs = list(self.input_effects)
- oeffs = list(self.output_effects)
- while ieffs and oeffs and ieffs[0] == oeffs[0]:
- ieffs.pop(0)
- oeffs.pop(0)
- ninputs, symbolic = list_effect_size(ieffs)
- if ninputs:
- label = f"pop_{ninputs}_{label}"
- if symbolic:
- out.write_raw(
- f"{space}if ({cond}) {{ STACK_SHRINK({symbolic}); goto {label}; }}\n"
- )
- else:
- out.write_raw(f"{space}if ({cond}) goto {label};\n")
- elif m := re.match(r"(\s*)DECREF_INPUTS\(\);\s*(?://.*)?$", line):
- out.reset_lineno()
- space = extra + m.group(1)
- for ieff in self.input_effects:
- if ieff.name in names_to_skip:
- continue
- if ieff.size:
- out.write_raw(
- f"{space}for (int _i = {ieff.size}; --_i >= 0;) {{\n"
- )
- out.write_raw(f"{space} Py_DECREF({ieff.name}[_i]);\n")
- out.write_raw(f"{space}}}\n")
- else:
- decref = "XDECREF" if ieff.cond else "DECREF"
- out.write_raw(f"{space}Py_{decref}({ieff.name});\n")
- else:
- out.write_raw(extra + line)
- out.reset_lineno()
-
-
-InstructionOrCacheEffect = Instruction | parser.CacheEffect
-StackEffectMapping = list[tuple[StackEffect, StackEffect]]
-
-
-@dataclasses.dataclass
-class Component:
- instr: Instruction
- input_mapping: StackEffectMapping
- output_mapping: StackEffectMapping
- active_caches: list[ActiveCacheEffect]
-
- def write_body(self, out: Formatter) -> None:
- with out.block(""):
- input_names = {ieffect.name for _, ieffect in self.input_mapping}
- for var, ieffect in self.input_mapping:
- out.declare(ieffect, var)
- for _, oeffect in self.output_mapping:
- if oeffect.name not in input_names:
- out.declare(oeffect, None)
-
- self.instr.write_body(out, -4, self.active_caches)
-
- for var, oeffect in self.output_mapping:
- out.assign(var, oeffect)
-
-
-MacroParts = list[Component | parser.CacheEffect]
-
-
-@dataclasses.dataclass
-class MacroInstruction:
- """A macro instruction."""
-
- name: str
- stack: list[StackEffect]
- initial_sp: int
- final_sp: int
- instr_fmt: str
- instr_flags: InstructionFlags
- macro: parser.Macro
- parts: MacroParts
- cache_offset: int
- predicted: bool = False
-
-
-@dataclasses.dataclass
-class PseudoInstruction:
- """A pseudo instruction."""
-
- name: str
- targets: list[Instruction]
- instr_fmt: str
- instr_flags: InstructionFlags
-
-
-@dataclasses.dataclass
-class OverriddenInstructionPlaceHolder:
- name: str
-
-
-AnyInstruction = Instruction | MacroInstruction | PseudoInstruction
-INSTR_FMT_PREFIX = "INSTR_FMT_"
-
-
-class Analyzer:
- """Parse input, analyze it, and write to output."""
-
- input_filenames: list[str]
- output_filename: str
- metadata_filename: str
- pymetadata_filename: str
- executor_filename: str
- errors: int = 0
- emit_line_directives: bool = False
-
- def __init__(
- self,
- input_filenames: list[str],
- output_filename: str,
- metadata_filename: str,
- pymetadata_filename: str,
- executor_filename: str,
- ):
- """Read the input file."""
- self.input_filenames = input_filenames
- self.output_filename = output_filename
- self.metadata_filename = metadata_filename
- self.pymetadata_filename = pymetadata_filename
- self.executor_filename = executor_filename
-
- def error(self, msg: str, node: parser.Node) -> None:
- lineno = 0
- filename = "<unknown file>"
- if context := node.context:
- filename = context.owner.filename
- # Use line number of first non-comment in the node
- for token in context.owner.tokens[context.begin : context.end]:
- lineno = token.line
- if token.kind != "COMMENT":
- break
- print(f"{filename}:{lineno}: {msg}", file=sys.stderr)
- self.errors += 1
-
- everything: list[
- parser.InstDef | parser.Macro | parser.Pseudo | OverriddenInstructionPlaceHolder
- ]
- instrs: dict[str, Instruction] # Includes ops
- macros: dict[str, parser.Macro]
- macro_instrs: dict[str, MacroInstruction]
- families: dict[str, parser.Family]
- pseudos: dict[str, parser.Pseudo]
- pseudo_instrs: dict[str, PseudoInstruction]
-
- def parse(self) -> None:
- """Parse the source text.
-
- We only want the parser to see the stuff between the
- begin and end markers.
- """
-
- self.everything = []
- self.instrs = {}
- self.macros = {}
- self.families = {}
- self.pseudos = {}
-
- instrs_idx: dict[str, int] = dict()
-
- for filename in self.input_filenames:
- self.parse_file(filename, instrs_idx)
-
- files = " + ".join(self.input_filenames)
- print(
- f"Read {len(self.instrs)} instructions/ops, "
- f"{len(self.macros)} macros, {len(self.pseudos)} pseudos, "
- f"and {len(self.families)} families from {files}",
- file=sys.stderr,
- )
-
- def parse_file(self, filename: str, instrs_idx: dict[str, int]) -> None:
- with open(filename) as file:
- src = file.read()
-
-
- psr = parser.Parser(src, filename=prettify_filename(filename))
-
- # Skip until begin marker
- while tkn := psr.next(raw=True):
- if tkn.text == BEGIN_MARKER:
- break
- else:
- raise psr.make_syntax_error(
- f"Couldn't find {BEGIN_MARKER!r} in {psr.filename}"
- )
- start = psr.getpos()
-
- # Find end marker, then delete everything after it
- while tkn := psr.next(raw=True):
- if tkn.text == END_MARKER:
- break
- del psr.tokens[psr.getpos() - 1 :]
-
- # Parse from start
- psr.setpos(start)
- thing: parser.InstDef | parser.Macro | parser.Pseudo | parser.Family | None
- thing_first_token = psr.peek()
- while thing := psr.definition():
- if ws := [w for w in RESERVED_WORDS if variable_used(thing, w)]:
- self.error(f"'{ws[0]}' is a reserved word. {RESERVED_WORDS[ws[0]]}", thing)
-
- match thing:
- case parser.InstDef(name=name):
- if name in self.instrs:
- if not thing.override:
- raise psr.make_syntax_error(
- f"Duplicate definition of '{name}' @ {thing.context} "
- f"previous definition @ {self.instrs[name].inst.context}",
- thing_first_token,
- )
- self.everything[instrs_idx[name]] = OverriddenInstructionPlaceHolder(name=name)
- if name not in self.instrs and thing.override:
- raise psr.make_syntax_error(
- f"Definition of '{name}' @ {thing.context} is supposed to be "
- "an override but no previous definition exists.",
- thing_first_token,
- )
- self.instrs[name] = Instruction(thing)
- instrs_idx[name] = len(self.everything)
- self.everything.append(thing)
- case parser.Macro(name):
- self.macros[name] = thing
- self.everything.append(thing)
- case parser.Family(name):
- self.families[name] = thing
- case parser.Pseudo(name):
- self.pseudos[name] = thing
- self.everything.append(thing)
- case _:
- typing.assert_never(thing)
- if not psr.eof():
- raise psr.make_syntax_error(f"Extra stuff at the end of {filename}")
-
- def analyze(self) -> None:
- """Analyze the inputs.
-
- Raises SystemExit if there is an error.
- """
- self.analyze_macros_and_pseudos()
- self.find_predictions()
- self.map_families()
- self.check_families()
-
- def find_predictions(self) -> None:
- """Find the instructions that need PREDICTED() labels."""
- for instr in self.instrs.values():
- targets: set[str] = set()
- for line in instr.block_text:
- if m := re.match(RE_PREDICTED, line):
- targets.add(m.group(1))
- for target in targets:
- if target_instr := self.instrs.get(target):
- target_instr.predicted = True
- elif target_macro := self.macro_instrs.get(target):
- target_macro.predicted = True
- else:
- self.error(
- f"Unknown instruction {target!r} predicted in {instr.name!r}",
- instr.inst, # TODO: Use better location
- )
-
- def map_families(self) -> None:
- """Link instruction names back to their family, if they have one."""
- for family in self.families.values():
- for member in [family.name] + family.members:
- if member_instr := self.instrs.get(member):
- if member_instr.family not in (family, None):
- self.error(
- f"Instruction {member} is a member of multiple families "
- f"({member_instr.family.name}, {family.name}).",
- family,
- )
- else:
- member_instr.family = family
- elif not self.macro_instrs.get(member):
- self.error(
- f"Unknown instruction {member!r} referenced in family {family.name!r}",
- family,
- )
-
- def check_families(self) -> None:
- """Check each family:
-
- - Must have at least 2 members (including head)
- - Head and all members must be known instructions
- - Head and all members must have the same cache, input and output effects
- """
- for family in self.families.values():
- if family.name not in self.macro_instrs and family.name not in self.instrs:
- self.error(
- f"Family {family.name!r} has unknown instruction {family.name!r}",
- family,
- )
- members = [
- member
- for member in family.members
- if member in self.instrs or member in self.macro_instrs
- ]
- if members != family.members:
- unknown = set(family.members) - set(members)
- self.error(
- f"Family {family.name!r} has unknown members: {unknown}", family
- )
- expected_effects = self.effect_counts(family.name)
- for member in members:
- member_effects = self.effect_counts(member)
- if member_effects != expected_effects:
- self.error(
- f"Family {family.name!r} has inconsistent "
- f"(cache, input, output) effects:\n"
- f" {family.name} = {expected_effects}; "
- f"{member} = {member_effects}",
- family,
- )
-
- def effect_counts(self, name: str) -> tuple[int, int, int]:
- if instr := self.instrs.get(name):
- cache = instr.cache_offset
- input = len(instr.input_effects)
- output = len(instr.output_effects)
- elif mac := self.macro_instrs.get(name):
- cache = mac.cache_offset
- input, output = 0, 0
- for part in mac.parts:
- if isinstance(part, Component):
- # A component may pop what the previous component pushed,
- # so we offset the input/output counts by that.
- delta_i = len(part.instr.input_effects)
- delta_o = len(part.instr.output_effects)
- offset = min(delta_i, output)
- input += delta_i - offset
- output += delta_o - offset
- else:
- assert False, f"Unknown instruction {name!r}"
- return cache, input, output
-
- def analyze_macros_and_pseudos(self) -> None:
- """Analyze each macro and pseudo instruction."""
- self.macro_instrs = {}
- self.pseudo_instrs = {}
- for name, macro in self.macros.items():
- self.macro_instrs[name] = self.analyze_macro(macro)
- for name, pseudo in self.pseudos.items():
- self.pseudo_instrs[name] = self.analyze_pseudo(pseudo)
-
- def analyze_macro(self, macro: parser.Macro) -> MacroInstruction:
- components = self.check_macro_components(macro)
- stack, initial_sp = self.stack_analysis(components)
- sp = initial_sp
- parts: MacroParts = []
- flags = InstructionFlags.newEmpty()
- offset = 0
- for component in components:
- match component:
- case parser.CacheEffect() as ceffect:
- parts.append(ceffect)
- offset += ceffect.size
- case Instruction() as instr:
- part, sp, offset = self.analyze_instruction(instr, stack, sp, offset)
- parts.append(part)
- flags.add(instr.instr_flags)
- case _:
- typing.assert_never(component)
- final_sp = sp
- format = "IB"
- if offset:
- format += "C" + "0"*(offset-1)
- return MacroInstruction(
- macro.name, stack, initial_sp, final_sp, format, flags, macro, parts, offset
- )
-
- def analyze_pseudo(self, pseudo: parser.Pseudo) -> PseudoInstruction:
- targets = [self.instrs[target] for target in pseudo.targets]
- assert targets
- # Make sure the targets have the same fmt
- fmts = list(set([t.instr_fmt for t in targets]))
- assert(len(fmts) == 1)
- assert(len(list(set([t.instr_flags.bitmap() for t in targets]))) == 1)
- return PseudoInstruction(pseudo.name, targets, fmts[0], targets[0].instr_flags)
-
- def analyze_instruction(
- self, instr: Instruction, stack: list[StackEffect], sp: int, offset: int
- ) -> tuple[Component, int, int]:
- input_mapping: StackEffectMapping = []
- for ieffect in reversed(instr.input_effects):
- sp -= 1
- input_mapping.append((stack[sp], ieffect))
- output_mapping: StackEffectMapping = []
- for oeffect in instr.output_effects:
- output_mapping.append((stack[sp], oeffect))
- sp += 1
- active_effects: list[ActiveCacheEffect] = []
- for ceffect in instr.cache_effects:
- if ceffect.name != UNUSED:
- active_effects.append(ActiveCacheEffect(ceffect, offset))
- offset += ceffect.size
- return Component(instr, input_mapping, output_mapping, active_effects), sp, offset
-
- def check_macro_components(
- self, macro: parser.Macro
- ) -> list[InstructionOrCacheEffect]:
- components: list[InstructionOrCacheEffect] = []
- for uop in macro.uops:
- match uop:
- case parser.OpName(name):
- if name not in self.instrs:
- self.error(f"Unknown instruction {name!r}", macro)
- components.append(self.instrs[name])
- case parser.CacheEffect():
- components.append(uop)
- case _:
- typing.assert_never(uop)
- return components
-
- def stack_analysis(
- self, components: typing.Iterable[InstructionOrCacheEffect]
- ) -> tuple[list[StackEffect], int]:
- """Analyze a macro.
-
- Ignore cache effects.
-
- Return the list of variables (as StackEffects) and the initial stack pointer.
- """
- lowest = current = highest = 0
- conditions: dict[int, str] = {} # Indexed by 'current'.
- last_instr: Instruction | None = None
- for thing in components:
- if isinstance(thing, Instruction):
- last_instr = thing
- for thing in components:
- match thing:
- case Instruction() as instr:
- if any(
- eff.size for eff in instr.input_effects + instr.output_effects
- ):
- # TODO: Eventually this will be needed, at least for macros.
- self.error(
- f"Instruction {instr.name!r} has variable-sized stack effect, "
- "which are not supported in macro instructions",
- instr.inst, # TODO: Pass name+location of macro
- )
- if any(eff.cond for eff in instr.input_effects):
- self.error(
- f"Instruction {instr.name!r} has conditional input stack effect, "
- "which are not supported in macro instructions",
- instr.inst, # TODO: Pass name+location of macro
- )
- if any(eff.cond for eff in instr.output_effects) and instr is not last_instr:
- self.error(
- f"Instruction {instr.name!r} has conditional output stack effect, "
- "but is not the last instruction in a macro",
- instr.inst, # TODO: Pass name+location of macro
- )
- current -= len(instr.input_effects)
- lowest = min(lowest, current)
- for eff in instr.output_effects:
- if eff.cond:
- conditions[current] = eff.cond
- current += 1
- highest = max(highest, current)
- case parser.CacheEffect():
- pass
- case _:
- typing.assert_never(thing)
- # At this point, 'current' is the net stack effect,
- # and 'lowest' and 'highest' are the extremes.
- # Note that 'lowest' may be negative.
- stack = [
- StackEffect(f"_tmp_{i}", "", conditions.get(highest - i, ""))
- for i in reversed(range(1, highest - lowest + 1))
- ]
- return stack, -lowest
-
+class Generator(Analyzer):
def get_stack_effect_info(
- self, thing: parser.InstDef | parser.Macro | parser.Pseudo
+ self, thing: parsing.InstDef | parsing.Macro | parsing.Pseudo
) -> tuple[AnyInstruction | None, str | None, str | None]:
def effect_str(effects: list[StackEffect]) -> str:
n_effect, sym_effect = list_effect_size(effects)
@@ -1053,8 +104,10 @@ def effect_str(effects: list[StackEffect]) -> str:
return str(n_effect)
instr: AnyInstruction | None
+ popped: str | None
+ pushed: str | None
match thing:
- case parser.InstDef():
+ case parsing.InstDef():
if thing.kind != "op":
instr = self.instrs[thing.name]
popped = effect_str(instr.input_effects)
@@ -1063,7 +116,7 @@ def effect_str(effects: list[StackEffect]) -> str:
instr = None
popped = ""
pushed = ""
- case parser.Macro():
+ case parsing.Macro():
instr = self.macro_instrs[thing.name]
parts = [comp for comp in instr.parts if isinstance(comp, Component)]
# Note: stack_analysis() already verifies that macro components
@@ -1084,7 +137,11 @@ def effect_str(effects: list[StackEffect]) -> str:
if effect.cond in ("0", "1"):
pushed_symbolic.append(effect.cond)
else:
- pushed_symbolic.append(maybe_parenthesize(f"{maybe_parenthesize(effect.cond)} ? 1 : 0"))
+ pushed_symbolic.append(
+ maybe_parenthesize(
+ f"{maybe_parenthesize(effect.cond)} ? 1 : 0"
+ )
+ )
sp += 1
high = max(sp, high)
if high != max(0, sp):
@@ -1096,7 +153,7 @@ def effect_str(effects: list[StackEffect]) -> str:
popped = str(-low)
pushed_symbolic.append(str(sp - low - len(pushed_symbolic)))
pushed = " + ".join(pushed_symbolic)
- case parser.Pseudo():
+ case parsing.Pseudo():
instr = self.pseudo_instrs[thing.name]
popped = pushed = None
# Calculate stack effect, and check that it's the the same
@@ -1135,10 +192,14 @@ def write_function(
) -> None:
self.out.emit("")
self.out.emit("#ifndef NEED_OPCODE_METADATA")
- self.out.emit(f"extern int _PyOpcode_num_{direction}(int opcode, int oparg, bool jump);")
+ self.out.emit(
+ f"extern int _PyOpcode_num_{direction}(int opcode, int oparg, bool jump);"
+ )
self.out.emit("#else")
self.out.emit("int")
- self.out.emit(f"_PyOpcode_num_{direction}(int opcode, int oparg, bool jump) {{")
+ self.out.emit(
+ f"_PyOpcode_num_{direction}(int opcode, int oparg, bool jump) {{"
+ )
self.out.emit(" switch(opcode) {")
for instr, effect in data:
self.out.emit(f" case {instr.name}:")
@@ -1159,7 +220,7 @@ def from_source_files(self) -> str:
try:
filename = os.path.relpath(filename, ROOT)
except ValueError:
- # May happen on Windows if root and temp on different volumes
+ # May happen on Windows if root and temp on different volumes
pass
filenames.append(filename)
paths = f"\n{self.out.comment} ".join(filenames)
@@ -1170,20 +231,21 @@ def write_provenance_header(self):
self.out.write_raw(self.from_source_files())
self.out.write_raw(f"{self.out.comment} Do not edit!\n")
- def write_metadata(self) -> None:
+ def write_metadata(self, metadata_filename: str, pymetadata_filename: str) -> None:
"""Write instruction metadata to output file."""
# Compute the set of all instruction formats.
all_formats: set[str] = set()
for thing in self.everything:
+ format: str | None
match thing:
case OverriddenInstructionPlaceHolder():
continue
- case parser.InstDef():
+ case parsing.InstDef():
format = self.instrs[thing.name].instr_fmt
- case parser.Macro():
+ case parsing.Macro():
format = self.macro_instrs[thing.name].instr_fmt
- case parser.Pseudo():
+ case parsing.Pseudo():
format = None
for target in self.pseudos[thing.name].targets:
target_instr = self.instrs.get(target)
@@ -1192,13 +254,14 @@ def write_metadata(self) -> None:
format = target_instr.instr_fmt
else:
assert format == target_instr.instr_fmt
+ assert format is not None
case _:
typing.assert_never(thing)
all_formats.add(format)
# Turn it into a list of enum definitions.
format_enums = [INSTR_FMT_PREFIX + format for format in sorted(all_formats)]
- with open(self.metadata_filename, "w") as f:
+ with open(metadata_filename, "w") as f:
# Create formatter
self.out = Formatter(f, 0)
@@ -1220,7 +283,8 @@ def write_metadata(self) -> None:
self.out.emit(
"#define IS_VALID_OPCODE(OP) \\\n"
" (((OP) >= 0) && ((OP) < OPCODE_METADATA_SIZE) && \\\n"
- " (_PyOpcode_opcode_metadata[(OP)].valid_entry))")
+ " (_PyOpcode_opcode_metadata[(OP)].valid_entry))"
+ )
self.out.emit("")
InstructionFlags.emit_macros(self.out)
@@ -1234,17 +298,23 @@ def write_metadata(self) -> None:
with self.out.block("struct opcode_macro_expansion", ";"):
self.out.emit("int nuops;")
- self.out.emit("struct { int16_t uop; int8_t size; int8_t offset; } uops[8];")
+ self.out.emit(
+ "struct { int16_t uop; int8_t size; int8_t offset; } uops[8];"
+ )
self.out.emit("")
for key, value in OPARG_SIZES.items():
self.out.emit(f"#define {key} {value}")
self.out.emit("")
- self.out.emit("#define OPCODE_METADATA_FMT(OP) "
- "(_PyOpcode_opcode_metadata[(OP)].instr_format)")
+ self.out.emit(
+ "#define OPCODE_METADATA_FMT(OP) "
+ "(_PyOpcode_opcode_metadata[(OP)].instr_format)"
+ )
self.out.emit("#define SAME_OPCODE_METADATA(OP1, OP2) \\")
- self.out.emit(" (OPCODE_METADATA_FMT(OP1) == OPCODE_METADATA_FMT(OP2))")
+ self.out.emit(
+ " (OPCODE_METADATA_FMT(OP1) == OPCODE_METADATA_FMT(OP2))"
+ )
self.out.emit("")
# Write metadata array declaration
@@ -1253,27 +323,35 @@ def write_metadata(self) -> None:
self.out.emit("#define OPCODE_MACRO_EXPANSION_SIZE 256")
self.out.emit("")
self.out.emit("#ifndef NEED_OPCODE_METADATA")
- self.out.emit("extern const struct opcode_metadata "
- "_PyOpcode_opcode_metadata[OPCODE_METADATA_SIZE];")
- self.out.emit("extern const struct opcode_macro_expansion "
- "_PyOpcode_macro_expansion[OPCODE_MACRO_EXPANSION_SIZE];")
- self.out.emit("extern const char * const _PyOpcode_uop_name[OPCODE_UOP_NAME_SIZE];")
+ self.out.emit(
+ "extern const struct opcode_metadata "
+ "_PyOpcode_opcode_metadata[OPCODE_METADATA_SIZE];"
+ )
+ self.out.emit(
+ "extern const struct opcode_macro_expansion "
+ "_PyOpcode_macro_expansion[OPCODE_MACRO_EXPANSION_SIZE];"
+ )
+ self.out.emit(
+ "extern const char * const _PyOpcode_uop_name[OPCODE_UOP_NAME_SIZE];"
+ )
self.out.emit("#else // if NEED_OPCODE_METADATA")
- self.out.emit("const struct opcode_metadata "
- "_PyOpcode_opcode_metadata[OPCODE_METADATA_SIZE] = {")
+ self.out.emit(
+ "const struct opcode_metadata "
+ "_PyOpcode_opcode_metadata[OPCODE_METADATA_SIZE] = {"
+ )
# Write metadata for each instruction
for thing in self.everything:
match thing:
case OverriddenInstructionPlaceHolder():
continue
- case parser.InstDef():
+ case parsing.InstDef():
if thing.kind != "op":
self.write_metadata_for_inst(self.instrs[thing.name])
- case parser.Macro():
+ case parsing.Macro():
self.write_metadata_for_macro(self.macro_instrs[thing.name])
- case parser.Pseudo():
+ case parsing.Pseudo():
self.write_metadata_for_pseudo(self.pseudo_instrs[thing.name])
case _:
typing.assert_never(thing)
@@ -1291,32 +369,38 @@ def write_metadata(self) -> None:
match thing:
case OverriddenInstructionPlaceHolder():
pass
- case parser.InstDef(name=name):
+ case parsing.InstDef(name=name):
instr = self.instrs[name]
# Since an 'op' is not a bytecode, it has no expansion; but 'inst' is
if instr.kind == "inst" and instr.is_viable_uop():
# Construct a dummy Component -- input/output mappings are not used
part = Component(instr, [], [], instr.active_caches)
self.write_macro_expansions(instr.name, [part])
- elif instr.kind == "inst" and variable_used(instr.inst, "oparg1"):
- assert variable_used(instr.inst, "oparg2"), "Half super-instr?"
+ elif instr.kind == "inst" and variable_used(
+ instr.inst, "oparg1"
+ ):
+ assert variable_used(
+ instr.inst, "oparg2"
+ ), "Half super-instr?"
self.write_super_expansions(instr.name)
- case parser.Macro():
+ case parsing.Macro():
mac = self.macro_instrs[thing.name]
self.write_macro_expansions(mac.name, mac.parts)
- case parser.Pseudo():
+ case parsing.Pseudo():
pass
case _:
typing.assert_never(thing)
- with self.out.block("const char * const _PyOpcode_uop_name[OPCODE_UOP_NAME_SIZE] =", ";"):
- self.write_uop_items(lambda name, counter: f"[{name}] = \"{name}\",")
+ with self.out.block(
+ "const char * const _PyOpcode_uop_name[OPCODE_UOP_NAME_SIZE] =", ";"
+ ):
+ self.write_uop_items(lambda name, counter: f'[{name}] = "{name}",')
self.out.emit("#endif // NEED_OPCODE_METADATA")
- with open(self.pymetadata_filename, "w") as f:
+ with open(pymetadata_filename, "w") as f:
# Create formatter
- self.out = Formatter(f, 0, comment = "#")
+ self.out = Formatter(f, 0, comment="#")
self.write_provenance_header()
@@ -1324,10 +408,10 @@ def write_metadata(self) -> None:
self.out.emit("_specializations = {")
for name, family in self.families.items():
with self.out.indent():
- self.out.emit(f"\"{family.name}\": [")
+ self.out.emit(f'"{family.name}": [')
with self.out.indent():
for m in family.members:
- self.out.emit(f"\"{m}\",")
+ self.out.emit(f'"{m}",')
self.out.emit(f"],")
self.out.emit("}")
@@ -1335,15 +419,17 @@ def write_metadata(self) -> None:
self.out.emit("")
self.out.emit("# An irregular case:")
self.out.emit(
- "_specializations[\"BINARY_OP\"].append("
- "\"BINARY_OP_INPLACE_ADD_UNICODE\")")
+ '_specializations["BINARY_OP"].append('
+ '"BINARY_OP_INPLACE_ADD_UNICODE")'
+ )
# Make list of specialized instructions
self.out.emit("")
self.out.emit(
"_specialized_instructions = ["
- "opcode for family in _specializations.values() for opcode in family"
- "]")
+ "opcode for family in _specializations.values() for opcode in family"
+ "]"
+ )
def write_pseudo_instrs(self) -> None:
"""Write the IS_PSEUDO_INSTR macro"""
@@ -1432,16 +518,18 @@ def write_super_expansions(self, name: str) -> None:
]
self.write_expansions(name, expansions)
- def write_expansions(self, name: str, expansions: list[tuple[str, int, int]]) -> None:
- pieces = [f"{{ {name}, {size}, {offset} }}" for name, size, offset in expansions]
+ def write_expansions(
+ self, name: str, expansions: list[tuple[str, int, int]]
+ ) -> None:
+ pieces = [
+ f"{{ {name}, {size}, {offset} }}" for name, size, offset in expansions
+ ]
self.out.emit(
f"[{name}] = "
f"{{ .nuops = {len(pieces)}, .uops = {{ {', '.join(pieces)} }} }},"
)
- def emit_metadata_entry(
- self, name: str, fmt: str, flags: InstructionFlags
- ) -> None:
+ def emit_metadata_entry(self, name: str, fmt: str, flags: InstructionFlags) -> None:
flag_names = flags.names(value=True)
if not flag_names:
flag_names.append("0")
@@ -1462,11 +550,13 @@ def write_metadata_for_pseudo(self, ps: PseudoInstruction) -> None:
"""Write metadata for a macro-instruction."""
self.emit_metadata_entry(ps.name, ps.instr_fmt, ps.instr_flags)
- def write_instructions(self) -> None:
+ def write_instructions(
+ self, output_filename: str, emit_line_directives: bool
+ ) -> None:
"""Write instructions to output file."""
- with open(self.output_filename, "w") as f:
+ with open(output_filename, "w") as f:
# Create formatter
- self.out = Formatter(f, 8, self.emit_line_directives)
+ self.out = Formatter(f, 8, emit_line_directives)
self.write_provenance_header()
@@ -1478,35 +568,37 @@ def write_instructions(self) -> None:
match thing:
case OverriddenInstructionPlaceHolder():
self.write_overridden_instr_place_holder(thing)
- case parser.InstDef():
+ case parsing.InstDef():
if thing.kind != "op":
n_instrs += 1
self.write_instr(self.instrs[thing.name])
- case parser.Macro():
+ case parsing.Macro():
n_macros += 1
self.write_macro(self.macro_instrs[thing.name])
- case parser.Pseudo():
+ case parsing.Pseudo():
n_pseudos += 1
case _:
typing.assert_never(thing)
print(
f"Wrote {n_instrs} instructions, {n_macros} macros, "
- f"and {n_pseudos} pseudos to {self.output_filename}",
+ f"and {n_pseudos} pseudos to {output_filename}",
file=sys.stderr,
)
- def write_executor_instructions(self) -> None:
+ def write_executor_instructions(
+ self, executor_filename: str, emit_line_directives: bool
+ ) -> None:
"""Generate cases for the Tier 2 interpreter."""
- with open(self.executor_filename, "w") as f:
- self.out = Formatter(f, 8, self.emit_line_directives)
+ with open(executor_filename, "w") as f:
+ self.out = Formatter(f, 8, emit_line_directives)
self.write_provenance_header()
for thing in self.everything:
match thing:
case OverriddenInstructionPlaceHolder():
# TODO: Is this helpful?
self.write_overridden_instr_place_holder(thing)
- case parser.InstDef():
+ case parsing.InstDef():
instr = self.instrs[thing.name]
if instr.is_viable_uop():
self.out.emit("")
@@ -1517,22 +609,24 @@ def write_executor_instructions(self) -> None:
self.out.emit("break;")
# elif instr.kind != "op":
# print(f"NOTE: {thing.name} is not a viable uop")
- case parser.Macro():
+ case parsing.Macro():
pass
- case parser.Pseudo():
+ case parsing.Pseudo():
pass
case _:
typing.assert_never(thing)
print(
- f"Wrote some stuff to {self.executor_filename}",
+ f"Wrote some stuff to {executor_filename}",
file=sys.stderr,
)
- def write_overridden_instr_place_holder(self,
- place_holder: OverriddenInstructionPlaceHolder) -> None:
+ def write_overridden_instr_place_holder(
+ self, place_holder: OverriddenInstructionPlaceHolder
+ ) -> None:
self.out.emit("")
self.out.emit(
- f"{self.out.comment} TARGET({place_holder.name}) overridden by later definition")
+ f"{self.out.comment} TARGET({place_holder.name}) overridden by later definition"
+ )
def write_instr(self, instr: Instruction) -> None:
name = instr.name
@@ -1555,7 +649,7 @@ def write_macro(self, mac: MacroInstruction) -> None:
cache_adjust = 0
for part in mac.parts:
match part:
- case parser.CacheEffect(size=size):
+ case parsing.CacheEffect(size=size):
cache_adjust += size
case Component() as comp:
last_instr = comp.instr
@@ -1603,7 +697,7 @@ def wrap_macro(self, mac: MacroInstruction):
yield
- self.out.stack_adjust(ieffects[:mac.initial_sp], mac.stack[:mac.final_sp])
+ self.out.stack_adjust(ieffects[: mac.initial_sp], mac.stack[: mac.final_sp])
for i, var in enumerate(reversed(mac.stack[: mac.final_sp]), 1):
dst = StackEffect(f"stack_pointer[-{i}]", "")
@@ -1612,99 +706,6 @@ def wrap_macro(self, mac: MacroInstruction):
self.out.emit(f"DISPATCH();")
-def prettify_filename(filename: str) -> str:
- # Make filename more user-friendly and less platform-specific,
- # it is only used for error reporting at this point.
- filename = filename.replace("\\", "/")
- if filename.startswith("./"):
- filename = filename[2:]
- if filename.endswith(".new"):
- filename = filename[:-4]
- return filename
-
-
-def extract_block_text(block: parser.Block) -> tuple[list[str], bool, int]:
- # Get lines of text with proper dedent
- blocklines = block.text.splitlines(True)
- first_token: lx.Token = block.tokens[0] # IndexError means the context is broken
- block_line = first_token.begin[0]
-
- # Remove blank lines from both ends
- while blocklines and not blocklines[0].strip():
- blocklines.pop(0)
- block_line += 1
- while blocklines and not blocklines[-1].strip():
- blocklines.pop()
-
- # Remove leading and trailing braces
- assert blocklines and blocklines[0].strip() == "{"
- assert blocklines and blocklines[-1].strip() == "}"
- blocklines.pop()
- blocklines.pop(0)
- block_line += 1
-
- # Remove trailing blank lines
- while blocklines and not blocklines[-1].strip():
- blocklines.pop()
-
- # Separate CHECK_EVAL_BREAKER() macro from end
- check_eval_breaker = \
- blocklines != [] and blocklines[-1].strip() == "CHECK_EVAL_BREAKER();"
- if check_eval_breaker:
- del blocklines[-1]
-
- return blocklines, check_eval_breaker, block_line
-
-
-def always_exits(lines: list[str]) -> bool:
- """Determine whether a block always ends in a return/goto/etc."""
- if not lines:
- return False
- line = lines[-1].rstrip()
- # Indent must match exactly (TODO: Do something better)
- if line[:12] != " " * 12:
- return False
- line = line[12:]
- return line.startswith(
- (
- "goto ",
- "return ",
- "DISPATCH",
- "GO_TO_",
- "Py_UNREACHABLE()",
- "ERROR_IF(true, ",
- )
- )
-
-
-def variable_used(node: parser.Node, name: str) -> bool:
- """Determine whether a variable with a given name is used in a node."""
- return any(
- token.kind == "IDENTIFIER" and token.text == name for token in node.tokens
- )
-
-
-def variable_used_unspecialized(node: parser.Node, name: str) -> bool:
- """Like variable_used(), but skips #if ENABLE_SPECIALIZATION blocks."""
- tokens: list[lx.Token] = []
- skipping = False
- for i, token in enumerate(node.tokens):
- if token.kind == "MACRO":
- text = "".join(token.text.split())
- # TODO: Handle nested #if
- if text == "#if":
- if (
- i + 1 < len(node.tokens)
- and node.tokens[i + 1].text == "ENABLE_SPECIALIZATION"
- ):
- skipping = True
- elif text in ("#else", "#endif"):
- skipping = False
- if not skipping:
- tokens.append(token)
- return any(token.kind == "IDENTIFIER" and token.text == name for token in tokens)
-
-
def main():
"""Parse command line, parse input, analyze, write output."""
args = arg_parser.parse_args() # Prints message and sys.exit(2) on error
@@ -1712,17 +713,17 @@ def main():
args.input.append(DEFAULT_INPUT)
# Raises OSError if input unreadable
- a = Analyzer(args.input, args.output, args.metadata, args.pymetadata, args.executor_cases)
+ a = Generator(args.input)
- if args.emit_line_directives:
- a.emit_line_directives = True
a.parse() # Raises SyntaxError on failure
a.analyze() # Prints messages and sets a.errors on failure
if a.errors:
sys.exit(f"Found {a.errors} errors")
- a.write_instructions() # Raises OSError if output can't be written
- a.write_metadata()
- a.write_executor_instructions()
+
+ # These raise OSError if output can't be written
+ a.write_instructions(args.output, args.emit_line_directives)
+ a.write_metadata(args.metadata, args.pymetadata)
+ a.write_executor_instructions(args.executor_cases, args.emit_line_directives)
if __name__ == "__main__":
diff --git a/Tools/cases_generator/instructions.py b/Tools/cases_generator/instructions.py
new file mode 100644
index 0000000..6f42699
--- /dev/null
+++ b/Tools/cases_generator/instructions.py
@@ -0,0 +1,424 @@
+import dataclasses
+import re
+import typing
+
+from flags import InstructionFlags, variable_used_unspecialized
+from formatting import (
+ Formatter,
+ UNUSED,
+ string_effect_size,
+ list_effect_size,
+ maybe_parenthesize,
+)
+import lexer as lx
+import parsing
+from parsing import StackEffect
+
+BITS_PER_CODE_UNIT = 16
+
+
+@dataclasses.dataclass
+class ActiveCacheEffect:
+ """Wraps a CacheEffect that is actually used, in context."""
+
+ effect: parsing.CacheEffect
+ offset: int
+
+
+FORBIDDEN_NAMES_IN_UOPS = (
+ "resume_with_error",
+ "kwnames",
+ "next_instr",
+ "oparg1", # Proxy for super-instructions like LOAD_FAST_LOAD_FAST
+ "JUMPBY",
+ "DISPATCH",
+ "INSTRUMENTED_JUMP",
+ "throwflag",
+ "exception_unwind",
+ "import_from",
+ "import_name",
+ "_PyObject_CallNoArgs", # Proxy for BEFORE_WITH
+)
+
+
+# Interpreter tiers
+TIER_ONE: typing.Final = 1 # Specializing adaptive interpreter (PEP 659)
+TIER_TWO: typing.Final = 2 # Experimental tracing interpreter
+Tiers: typing.TypeAlias = typing.Literal[1, 2]
+
+
+@dataclasses.dataclass
+class Instruction:
+ """An instruction with additional data and code."""
+
+ # Parts of the underlying instruction definition
+ inst: parsing.InstDef
+ kind: typing.Literal["inst", "op"]
+ name: str
+ block: parsing.Block
+ block_text: list[str] # Block.text, less curlies, less PREDICT() calls
+ block_line: int # First line of block in original code
+
+ # Computed by constructor
+ always_exits: bool
+ cache_offset: int
+ cache_effects: list[parsing.CacheEffect]
+ input_effects: list[StackEffect]
+ output_effects: list[StackEffect]
+ unmoved_names: frozenset[str]
+ instr_fmt: str
+ instr_flags: InstructionFlags
+ active_caches: list[ActiveCacheEffect]
+
+ # Set later
+ family: parsing.Family | None = None
+ predicted: bool = False
+
+ def __init__(self, inst: parsing.InstDef):
+ self.inst = inst
+ self.kind = inst.kind
+ self.name = inst.name
+ self.block = inst.block
+ self.block_text, self.check_eval_breaker, self.block_line = extract_block_text(
+ self.block
+ )
+ self.always_exits = always_exits(self.block_text)
+ self.cache_effects = [
+ effect for effect in inst.inputs if isinstance(effect, parsing.CacheEffect)
+ ]
+ self.cache_offset = sum(c.size for c in self.cache_effects)
+ self.input_effects = [
+ effect for effect in inst.inputs if isinstance(effect, StackEffect)
+ ]
+ self.output_effects = inst.outputs # For consistency/completeness
+ unmoved_names: set[str] = set()
+ for ieffect, oeffect in zip(self.input_effects, self.output_effects):
+ if ieffect.name == oeffect.name:
+ unmoved_names.add(ieffect.name)
+ else:
+ break
+ self.unmoved_names = frozenset(unmoved_names)
+
+ self.instr_flags = InstructionFlags.fromInstruction(inst)
+
+ self.active_caches = []
+ offset = 0
+ for effect in self.cache_effects:
+ if effect.name != UNUSED:
+ self.active_caches.append(ActiveCacheEffect(effect, offset))
+ offset += effect.size
+
+ if self.instr_flags.HAS_ARG_FLAG:
+ fmt = "IB"
+ else:
+ fmt = "IX"
+ if offset:
+ fmt += "C" + "0" * (offset - 1)
+ self.instr_fmt = fmt
+
+ def is_viable_uop(self) -> bool:
+ """Whether this instruction is viable as a uop."""
+ dprint: typing.Callable[..., None] = lambda *args, **kwargs: None
+ # if self.name.startswith("CALL"):
+ # dprint = print
+
+ if self.name == "EXIT_TRACE":
+ return True # This has 'return frame' but it's okay
+ if self.always_exits:
+ dprint(f"Skipping {self.name} because it always exits")
+ return False
+ if len(self.active_caches) > 1:
+ # print(f"Skipping {self.name} because it has >1 cache entries")
+ return False
+ res = True
+ for forbidden in FORBIDDEN_NAMES_IN_UOPS:
+ # NOTE: To disallow unspecialized uops, use
+ # if variable_used(self.inst, forbidden):
+ if variable_used_unspecialized(self.inst, forbidden):
+ dprint(f"Skipping {self.name} because it uses {forbidden}")
+ res = False
+ return res
+
+ def write(self, out: Formatter, tier: Tiers = TIER_ONE) -> None:
+ """Write one instruction, sans prologue and epilogue."""
+ # Write a static assertion that a family's cache size is correct
+ if family := self.family:
+ if self.name == family.name:
+ if cache_size := family.size:
+ out.emit(
+ f"static_assert({cache_size} == "
+ f'{self.cache_offset}, "incorrect cache size");'
+ )
+
+ # Write input stack effect variable declarations and initializations
+ ieffects = list(reversed(self.input_effects))
+ for i, ieffect in enumerate(ieffects):
+ isize = string_effect_size(
+ list_effect_size([ieff for ieff in ieffects[: i + 1]])
+ )
+ if ieffect.size:
+ src = StackEffect(
+ f"(stack_pointer - {maybe_parenthesize(isize)})", "PyObject **"
+ )
+ elif ieffect.cond:
+ src = StackEffect(
+ f"({ieffect.cond}) ? stack_pointer[-{maybe_parenthesize(isize)}] : NULL",
+ "",
+ )
+ else:
+ src = StackEffect(f"stack_pointer[-{maybe_parenthesize(isize)}]", "")
+ out.declare(ieffect, src)
+
+ # Write output stack effect variable declarations
+ isize = string_effect_size(list_effect_size(self.input_effects))
+ input_names = {ieffect.name for ieffect in self.input_effects}
+ for i, oeffect in enumerate(self.output_effects):
+ if oeffect.name not in input_names:
+ if oeffect.size:
+ osize = string_effect_size(
+ list_effect_size([oeff for oeff in self.output_effects[:i]])
+ )
+ offset = "stack_pointer"
+ if isize != osize:
+ if isize != "0":
+ offset += f" - ({isize})"
+ if osize != "0":
+ offset += f" + {osize}"
+ src = StackEffect(offset, "PyObject **")
+ out.declare(oeffect, src)
+ else:
+ out.declare(oeffect, None)
+
+ # out.emit(f"next_instr += OPSIZE({self.inst.name}) - 1;")
+
+ self.write_body(out, 0, self.active_caches, tier=tier)
+
+ # Skip the rest if the block always exits
+ if self.always_exits:
+ return
+
+ # Write net stack growth/shrinkage
+ out.stack_adjust(
+ [ieff for ieff in self.input_effects],
+ [oeff for oeff in self.output_effects],
+ )
+
+ # Write output stack effect assignments
+ oeffects = list(reversed(self.output_effects))
+ for i, oeffect in enumerate(oeffects):
+ if oeffect.name in self.unmoved_names:
+ continue
+ osize = string_effect_size(
+ list_effect_size([oeff for oeff in oeffects[: i + 1]])
+ )
+ if oeffect.size:
+ dst = StackEffect(
+ f"stack_pointer - {maybe_parenthesize(osize)}", "PyObject **"
+ )
+ else:
+ dst = StackEffect(f"stack_pointer[-{maybe_parenthesize(osize)}]", "")
+ out.assign(dst, oeffect)
+
+ # Write cache effect
+ if tier == TIER_ONE and self.cache_offset:
+ out.emit(f"next_instr += {self.cache_offset};")
+
+ def write_body(
+ self,
+ out: Formatter,
+ dedent: int,
+ active_caches: list[ActiveCacheEffect],
+ tier: Tiers = TIER_ONE,
+ ) -> None:
+ """Write the instruction body."""
+ # Write cache effect variable declarations and initializations
+ for active in active_caches:
+ ceffect = active.effect
+ bits = ceffect.size * BITS_PER_CODE_UNIT
+ if bits == 64:
+ # NOTE: We assume that 64-bit data in the cache
+ # is always an object pointer.
+ # If this becomes false, we need a way to specify
+ # syntactically what type the cache data is.
+ typ = "PyObject *"
+ func = "read_obj"
+ else:
+ typ = f"uint{bits}_t "
+ func = f"read_u{bits}"
+ if tier == TIER_ONE:
+ out.emit(
+ f"{typ}{ceffect.name} = {func}(&next_instr[{active.offset}].cache);"
+ )
+ else:
+ out.emit(f"{typ}{ceffect.name} = ({typ.strip()})operand;")
+
+ # Write the body, substituting a goto for ERROR_IF() and other stuff
+ assert dedent <= 0
+ extra = " " * -dedent
+ names_to_skip = self.unmoved_names | frozenset({UNUSED, "null"})
+ offset = 0
+ context = self.block.context
+ assert context is not None and context.owner is not None
+ filename = context.owner.filename
+ for line in self.block_text:
+ out.set_lineno(self.block_line + offset, filename)
+ offset += 1
+ if m := re.match(r"(\s*)ERROR_IF\((.+), (\w+)\);\s*(?://.*)?$", line):
+ space, cond, label = m.groups()
+ space = extra + space
+ # ERROR_IF() must pop the inputs from the stack.
+ # The code block is responsible for DECREF()ing them.
+ # NOTE: If the label doesn't exist, just add it to ceval.c.
+
+ # Don't pop common input/output effects at the bottom!
+ # These aren't DECREF'ed so they can stay.
+ ieffs = list(self.input_effects)
+ oeffs = list(self.output_effects)
+ while ieffs and oeffs and ieffs[0] == oeffs[0]:
+ ieffs.pop(0)
+ oeffs.pop(0)
+ ninputs, symbolic = list_effect_size(ieffs)
+ if ninputs:
+ label = f"pop_{ninputs}_{label}"
+ if symbolic:
+ out.write_raw(
+ f"{space}if ({cond}) {{ STACK_SHRINK({symbolic}); goto {label}; }}\n"
+ )
+ else:
+ out.write_raw(f"{space}if ({cond}) goto {label};\n")
+ elif m := re.match(r"(\s*)DECREF_INPUTS\(\);\s*(?://.*)?$", line):
+ out.reset_lineno()
+ space = extra + m.group(1)
+ for ieff in self.input_effects:
+ if ieff.name in names_to_skip:
+ continue
+ if ieff.size:
+ out.write_raw(
+ f"{space}for (int _i = {ieff.size}; --_i >= 0;) {{\n"
+ )
+ out.write_raw(f"{space} Py_DECREF({ieff.name}[_i]);\n")
+ out.write_raw(f"{space}}}\n")
+ else:
+ decref = "XDECREF" if ieff.cond else "DECREF"
+ out.write_raw(f"{space}Py_{decref}({ieff.name});\n")
+ else:
+ out.write_raw(extra + line)
+ out.reset_lineno()
+
+
+InstructionOrCacheEffect = Instruction | parsing.CacheEffect
+StackEffectMapping = list[tuple[StackEffect, StackEffect]]
+
+
+@dataclasses.dataclass
+class Component:
+ instr: Instruction
+ input_mapping: StackEffectMapping
+ output_mapping: StackEffectMapping
+ active_caches: list[ActiveCacheEffect]
+
+ def write_body(self, out: Formatter) -> None:
+ with out.block(""):
+ input_names = {ieffect.name for _, ieffect in self.input_mapping}
+ for var, ieffect in self.input_mapping:
+ out.declare(ieffect, var)
+ for _, oeffect in self.output_mapping:
+ if oeffect.name not in input_names:
+ out.declare(oeffect, None)
+
+ self.instr.write_body(out, -4, self.active_caches)
+
+ for var, oeffect in self.output_mapping:
+ out.assign(var, oeffect)
+
+
+MacroParts = list[Component | parsing.CacheEffect]
+
+
+@dataclasses.dataclass
+class MacroInstruction:
+ """A macro instruction."""
+
+ name: str
+ stack: list[StackEffect]
+ initial_sp: int
+ final_sp: int
+ instr_fmt: str
+ instr_flags: InstructionFlags
+ macro: parsing.Macro
+ parts: MacroParts
+ cache_offset: int
+ predicted: bool = False
+
+
+@dataclasses.dataclass
+class PseudoInstruction:
+ """A pseudo instruction."""
+
+ name: str
+ targets: list[Instruction]
+ instr_fmt: str
+ instr_flags: InstructionFlags
+
+
+@dataclasses.dataclass
+class OverriddenInstructionPlaceHolder:
+ name: str
+
+
+AnyInstruction = Instruction | MacroInstruction | PseudoInstruction
+
+
+def extract_block_text(block: parsing.Block) -> tuple[list[str], bool, int]:
+ # Get lines of text with proper dedent
+ blocklines = block.text.splitlines(True)
+ first_token: lx.Token = block.tokens[0] # IndexError means the context is broken
+ block_line = first_token.begin[0]
+
+ # Remove blank lines from both ends
+ while blocklines and not blocklines[0].strip():
+ blocklines.pop(0)
+ block_line += 1
+ while blocklines and not blocklines[-1].strip():
+ blocklines.pop()
+
+ # Remove leading and trailing braces
+ assert blocklines and blocklines[0].strip() == "{"
+ assert blocklines and blocklines[-1].strip() == "}"
+ blocklines.pop()
+ blocklines.pop(0)
+ block_line += 1
+
+ # Remove trailing blank lines
+ while blocklines and not blocklines[-1].strip():
+ blocklines.pop()
+
+ # Separate CHECK_EVAL_BREAKER() macro from end
+ check_eval_breaker = (
+ blocklines != [] and blocklines[-1].strip() == "CHECK_EVAL_BREAKER();"
+ )
+ if check_eval_breaker:
+ del blocklines[-1]
+
+ return blocklines, check_eval_breaker, block_line
+
+
+def always_exits(lines: list[str]) -> bool:
+ """Determine whether a block always ends in a return/goto/etc."""
+ if not lines:
+ return False
+ line = lines[-1].rstrip()
+ # Indent must match exactly (TODO: Do something better)
+ if line[:12] != " " * 12:
+ return False
+ line = line[12:]
+ return line.startswith(
+ (
+ "goto ",
+ "return ",
+ "DISPATCH",
+ "GO_TO_",
+ "Py_UNREACHABLE()",
+ "ERROR_IF(true, ",
+ )
+ )
diff --git a/Tools/cases_generator/parser.py b/Tools/cases_generator/parsing.py
similarity index 96%
rename from Tools/cases_generator/parser.py
rename to Tools/cases_generator/parsing.py
index ac77e7e..290285d 100644
--- a/Tools/cases_generator/parser.py
+++ b/Tools/cases_generator/parsing.py
@@ -1,7 +1,7 @@
"""Parser for bytecodes.inst."""
from dataclasses import dataclass, field
-from typing import NamedTuple, Callable, TypeVar, Literal
+from typing import NamedTuple, Callable, TypeVar, Literal, cast
import lexer as lx
from plexer import PLexer
@@ -19,7 +19,7 @@ def contextual_wrapper(self: P) -> N | None:
res = func(self)
if res is None:
self.setpos(begin)
- return
+ return None
end = self.getpos()
res.context = Context(begin, end, self)
return res
@@ -147,6 +147,7 @@ def definition(self) -> InstDef | Macro | Pseudo | Family | None:
return family
if pseudo := self.pseudo_def():
return pseudo
+ return None
@contextual
def inst_def(self) -> InstDef | None:
@@ -166,7 +167,8 @@ def inst_header(self) -> InstHeader | None:
# TODO: Make INST a keyword in the lexer.
override = bool(self.expect(lx.OVERRIDE))
register = bool(self.expect(lx.REGISTER))
- if (tkn := self.expect(lx.IDENTIFIER)) and (kind := tkn.text) in ("inst", "op"):
+ if (tkn := self.expect(lx.IDENTIFIER)) and tkn.text in ("inst", "op"):
+ kind = cast(Literal["inst", "op"], tkn.text)
if self.expect(lx.LPAREN) and (tkn := self.expect(lx.IDENTIFIER)):
name = tkn.text
if self.expect(lx.COMMA):
@@ -190,6 +192,7 @@ def inputs(self) -> list[InputEffect] | None:
# input (',' input)*
here = self.getpos()
if inp := self.input():
+ inp = cast(InputEffect, inp)
near = self.getpos()
if self.expect(lx.COMMA):
if rest := self.inputs():
@@ -232,6 +235,7 @@ def cache_effect(self) -> CacheEffect | None:
raise self.make_syntax_error(f"Expected integer, got {num!r}")
else:
return CacheEffect(tkn.text, size)
+ return None
@contextual
def stack_effect(self) -> StackEffect | None:
@@ -258,6 +262,7 @@ def stack_effect(self) -> StackEffect | None:
type_text = "PyObject **"
size_text = size.text.strip()
return StackEffect(tkn.text, type_text, cond_text, size_text)
+ return None
@contextual
def expression(self) -> Expression | None:
@@ -288,6 +293,7 @@ def expression(self) -> Expression | None:
def op(self) -> OpName | None:
if tkn := self.expect(lx.IDENTIFIER):
return OpName(tkn.text)
+ return None
@contextual
def macro_def(self) -> Macro | None:
@@ -300,16 +306,20 @@ def macro_def(self) -> Macro | None:
self.require(lx.SEMI)
res = Macro(tkn.text, uops)
return res
+ return None
def uops(self) -> list[UOp] | None:
if uop := self.uop():
+ uop = cast(UOp, uop)
uops = [uop]
while self.expect(lx.PLUS):
if uop := self.uop():
+ uop = cast(UOp, uop)
uops.append(uop)
else:
raise self.make_syntax_error("Expected op name or cache effect")
return uops
+ return None
@contextual
def uop(self) -> UOp | None:
@@ -327,6 +337,7 @@ def uop(self) -> UOp | None:
raise self.make_syntax_error("Expected integer")
else:
return OpName(tkn.text)
+ return None
@contextual
def family_def(self) -> Family | None:
@@ -385,6 +396,7 @@ def members(self) -> list[str] | None:
def block(self) -> Block | None:
if self.c_blob():
return Block()
+ return None
def c_blob(self) -> list[lx.Token]:
tokens: list[lx.Token] = []