|  | import os | 
|  | import yaml | 
|  | from string import Template | 
|  | from copy import deepcopy | 
|  | from .plugins import ArgcountChecker, OptionalArguments, ArgumentReferences, \ | 
|  | BeforeAfterCall, ConstantArguments, ReturnArguments, GILRelease | 
|  | from ..shared import cwrap_common | 
|  |  | 
|  |  | 
|  | class cwrap(object): | 
|  | BASE_INDENT_SIZE = 6 | 
|  |  | 
|  | RETURN_WRAPPERS = { | 
|  | 'void': Template('Py_RETURN_NONE;'), | 
|  | 'long': Template('return PyLong_FromLong($result);'), | 
|  | 'int64_t': Template('return PyLong_FromLong($result);'), | 
|  | 'bool': Template('return PyBool_FromLong($result);'), | 
|  | 'void*': Template('return PyLong_FromVoidPtr($result);'), | 
|  | } | 
|  |  | 
|  | OPTION_TEMPLATE = Template(""" | 
|  | ${els}if ($arg_check) { | 
|  | $pre_arg_assign | 
|  | $arg_assign | 
|  | $code | 
|  | """) | 
|  |  | 
|  | ARG_ASSIGN_TEMPLATE = Template("""${type} ${name} = ${unpack};""") | 
|  |  | 
|  | OPTION_CODE_TEMPLATE = [ | 
|  | '$call', | 
|  | '$return_result', | 
|  | ] | 
|  |  | 
|  | FUNCTION_CALL_TEMPLATE = Template("$capture_result$cname($call_arg);") | 
|  |  | 
|  | DEFAULT_PLUGIN_CLASSES = [ArgcountChecker, ConstantArguments, OptionalArguments, | 
|  | ArgumentReferences, BeforeAfterCall, ReturnArguments, GILRelease] | 
|  |  | 
|  | def __init__(self, source, destination=None, plugins=None, default_plugins=True): | 
|  | if destination is None: | 
|  | destination = source.replace('.cwrap', '.cpp') | 
|  |  | 
|  | self.plugins = [] if plugins is None else plugins | 
|  | if default_plugins: | 
|  | defaults = [cls() for cls in self.DEFAULT_PLUGIN_CLASSES] | 
|  | self.plugins = defaults + self.plugins | 
|  |  | 
|  | for plugin in self.plugins: | 
|  | plugin.initialize(self) | 
|  |  | 
|  | self.base_path = os.path.dirname(os.path.abspath(source)) | 
|  | with open(source, 'r') as f: | 
|  | declarations = f.read() | 
|  |  | 
|  | # wrap all the declarations in the source .cwrap file | 
|  | wrapper = self.wrap_declarations(declarations) | 
|  |  | 
|  | # let each plugin do any post-processing of the wrapped file | 
|  | for plugin in self.plugins: | 
|  | wrapper = plugin.process_full_file(wrapper) | 
|  |  | 
|  | # See Note [Unchanging results for ninja] | 
|  | try: | 
|  | with open(destination, 'r') as f: | 
|  | old_wrapper = f.read() | 
|  | except IOError: | 
|  | old_wrapper = None | 
|  |  | 
|  | if old_wrapper != wrapper: | 
|  | with open(destination, 'w') as f: | 
|  | print("Writing {}".format(destination)) | 
|  | f.write(wrapper) | 
|  | else: | 
|  | print("Skipped writing {}".format(destination)) | 
|  |  | 
|  | def wrap_declarations(self, declarations): | 
|  | lines = declarations.split('\n') | 
|  | declaration_lines = [] | 
|  | output = [] | 
|  | in_declaration = False | 
|  | i = 0 | 
|  |  | 
|  | while i < len(lines): | 
|  | line = lines[i] | 
|  | if line == '[[': | 
|  | declaration_lines = [] | 
|  | in_declaration = True | 
|  | elif line == ']]': | 
|  | in_declaration = False | 
|  | declaration = yaml.load('\n'.join(declaration_lines)) | 
|  | cwrap_common.set_declaration_defaults(declaration) | 
|  |  | 
|  | # Pass declaration in a list - maybe some plugins want to add | 
|  | # multiple wrappers | 
|  | declarations = [declaration] | 
|  | for plugin in self.plugins: | 
|  | declarations = plugin.process_declarations(declarations) | 
|  | # Generate wrappers for all declarations and append them to | 
|  | # the output | 
|  | for declaration in declarations: | 
|  | wrapper = self.generate_wrapper(declaration) | 
|  | for plugin in self.plugins: | 
|  | wrapper = plugin.process_wrapper(wrapper, declaration) | 
|  | output.append(wrapper) | 
|  | elif in_declaration: | 
|  | declaration_lines.append(line) | 
|  | elif '!!inc ' == line[:6]: | 
|  | fname = os.path.join(self.base_path, line[6:].strip()) | 
|  | with open(fname, 'r') as f: | 
|  | included = f.read().split('\n') | 
|  | # insert it into lines at position i+1 | 
|  | lines[i + 1:i + 1] = included | 
|  | else: | 
|  | output.append(line) | 
|  | i += 1 | 
|  |  | 
|  | return '\n'.join(output) | 
|  |  | 
|  | def parse_arguments(self, args): | 
|  | new_args = [] | 
|  | for arg in args: | 
|  | # Simple arg declaration of form "<type> <name>" | 
|  | if isinstance(arg, str): | 
|  | t, _, name = arg.partition(' ') | 
|  | new_args.append({'type': t, 'name': name}) | 
|  | elif isinstance(arg, dict): | 
|  | if 'arg' in arg: | 
|  | arg['type'], _, arg['name'] = arg['arg'].partition(' ') | 
|  | del arg['arg'] | 
|  | new_args.append(arg) | 
|  | else: | 
|  | assert False | 
|  | return new_args | 
|  |  | 
|  | def search_plugins(self, fnname, args, fallback): | 
|  | """Search plugins for the given function to call with args. | 
|  |  | 
|  | If not found, call fallback with args. | 
|  | """ | 
|  | for plugin in self.plugins: | 
|  | wrapper = getattr(plugin, fnname)(*args) | 
|  | if wrapper is not None: | 
|  | return wrapper | 
|  | return fallback(*args) | 
|  |  | 
|  | def get_type_check(self, arg, option): | 
|  | return self.search_plugins('get_type_check', (arg, option), lambda arg, _: None) | 
|  |  | 
|  | def get_type_unpack(self, arg, option): | 
|  | return self.search_plugins('get_type_unpack', (arg, option), lambda arg, _: None) | 
|  |  | 
|  | def get_return_wrapper(self, option): | 
|  | return self.search_plugins('get_return_wrapper', (option,), lambda _: self.RETURN_WRAPPERS[option['return']]) | 
|  |  | 
|  | def get_wrapper_template(self, declaration): | 
|  | return self.search_plugins('get_wrapper_template', (declaration,), lambda _: None) | 
|  |  | 
|  | def get_assign_args(self, arguments): | 
|  | return self.search_plugins('get_assign_args', (arguments,), lambda _: arguments) | 
|  |  | 
|  | def get_arg_accessor(self, arg, option): | 
|  | def wrap_accessor(arg, _): | 
|  | if arg.get('idx') is None: | 
|  | raise RuntimeError("Missing accessor for '{} {}'".format( | 
|  | arg['type'], arg['name'])) | 
|  | return 'PyTuple_GET_ITEM(args, {})'.format(arg['idx']) | 
|  |  | 
|  | return self.search_plugins('get_arg_accessor', (arg, option), wrap_accessor) | 
|  |  | 
|  | def generate_wrapper(self, declaration): | 
|  | wrapper = '' | 
|  | for i, option in enumerate(declaration['options']): | 
|  | option_wrapper = self.generate_option(option, is_first=(i == 0)) | 
|  | for plugin in self.plugins: | 
|  | option_wrapper = plugin.process_option_code(option_wrapper, option) | 
|  | wrapper += option_wrapper | 
|  | return self.get_wrapper_template(declaration).substitute(name=declaration['name'], options=wrapper) | 
|  |  | 
|  | def map_selected_arguments(self, base_fn_name, plugin_fn_name, option, arguments): | 
|  | result = [] | 
|  | for arg in arguments: | 
|  | accessor = self.get_arg_accessor(arg, option) | 
|  | tmpl = getattr(self, base_fn_name)(arg, option) | 
|  | if tmpl is None: | 
|  | fn = 'check' if base_fn_name == 'get_type_check' else 'unpack' | 
|  | raise RuntimeError("Missing type {} for '{} {}'".format( | 
|  | fn, arg['type'], arg['name'])) | 
|  | res = tmpl.substitute(arg=accessor, idx=arg.get('idx')) | 
|  | for plugin in self.plugins: | 
|  | res = getattr(plugin, plugin_fn_name)(res, arg, accessor) | 
|  |  | 
|  | result.append(res) | 
|  | return result | 
|  |  | 
|  | def build_option_args(self, arguments, arg_unpack): | 
|  | assignement = [] | 
|  | call_arg = [] | 
|  | # If types or names needs to be changed | 
|  | arguments = self.get_assign_args(arguments) | 
|  | for arg, unpack in zip(arguments, arg_unpack): | 
|  | if arg['type'] == 'CONSTANT': | 
|  | call_arg.append(unpack) | 
|  | else: | 
|  | var_name = "arg_" + str(arg.get('assign_name', arg['name'])) | 
|  | res = self.ARG_ASSIGN_TEMPLATE.substitute( | 
|  | type=arg['type'], | 
|  | name=var_name, | 
|  | unpack=unpack) | 
|  |  | 
|  | if var_name not in call_arg: | 
|  | assignement.append(res) | 
|  | call_arg.append(var_name) | 
|  | return assignement, call_arg | 
|  |  | 
|  | def indent_code(self, code): | 
|  | if code == '': | 
|  | return code | 
|  | code_lines = map(lambda s: s.strip(), code.split('\n')) | 
|  | code = '\n' | 
|  | depth = self.BASE_INDENT_SIZE | 
|  | for line in code_lines: | 
|  | depth -= line.count('}') * 2 | 
|  | code += ' ' * depth + line + '\n' | 
|  | depth += line.count('{') * 2 | 
|  | depth += line.count('(') * 4 | 
|  | depth -= line.count(')') * 4 | 
|  | return code[:-1] | 
|  |  | 
|  | def generate_option(self, option, is_first): | 
|  | checked_args = list(filter( | 
|  | lambda arg: 'ignore_check' not in arg or not arg['ignore_check'], | 
|  | option['arguments'])) | 
|  | option['num_checked_args'] = len(checked_args) | 
|  | idx_args = list(filter( | 
|  | lambda arg: not arg.get('ignore_check') and not arg.get('no_idx'), | 
|  | option['arguments'])) | 
|  | for i, arg in enumerate(idx_args): | 
|  | arg['idx'] = i | 
|  |  | 
|  | # Generate checks | 
|  | arg_checks = self.map_selected_arguments('get_type_check', | 
|  | 'process_single_check', option, checked_args) | 
|  | arg_checks = ' &&\n          '.join(arg_checks) | 
|  | for plugin in self.plugins: | 
|  | arg_checks = plugin.process_all_checks(arg_checks, option) | 
|  |  | 
|  | # Generate pre_arg assign | 
|  | pre_arg_assign = [] | 
|  | for plugin in self.plugins: | 
|  | pre_arg_assign = plugin.process_pre_arg_assign(pre_arg_assign, option) | 
|  |  | 
|  | # Generate arg assignment and call arguments | 
|  | arg_unpack = self.map_selected_arguments('get_type_unpack', | 
|  | 'process_single_unpack', option, option['arguments']) | 
|  | arg_assign, call_arg = self.build_option_args(option['arguments'], arg_unpack) | 
|  |  | 
|  | call_arg = ', '.join(call_arg) | 
|  | for plugin in self.plugins: | 
|  | call_arg = plugin.process_all_call_arg(call_arg, option) | 
|  |  | 
|  | # Generate call | 
|  | try: | 
|  | return_result = self.get_return_wrapper(option).substitute() | 
|  | call = self.FUNCTION_CALL_TEMPLATE.substitute(capture_result='', | 
|  | cname=option['cname'], call_arg=call_arg) | 
|  | except KeyError: | 
|  | return_result = self.get_return_wrapper(option).substitute(result='__result') | 
|  | call = self.FUNCTION_CALL_TEMPLATE.substitute(capture_result=(option['return'] + ' __result = '), | 
|  | cname=option['cname'], call_arg=call_arg) | 
|  |  | 
|  | code_template = deepcopy(self.OPTION_CODE_TEMPLATE) | 
|  | for plugin in self.plugins: | 
|  | code_template = plugin.process_option_code_template(code_template, | 
|  | option) | 
|  | code_template = Template('\n'.join(code_template)) | 
|  | code = code_template.substitute(call=call, return_result=return_result) | 
|  | code = self.indent_code(code) | 
|  | pre_arg_assign = self.indent_code('\n'.join(pre_arg_assign)) | 
|  | arg_assign = self.indent_code('\n'.join(arg_assign)) | 
|  |  | 
|  | # Put everything together | 
|  | return self.OPTION_TEMPLATE.substitute( | 
|  | els=('} else ' if not is_first else ''), | 
|  | arg_check=arg_checks, | 
|  | pre_arg_assign=pre_arg_assign, | 
|  | arg_assign=arg_assign, | 
|  | code=code, | 
|  | ) |