blob: 8d659ad27eddd65f118d3d828bcceb74cc400f7f [file] [log] [blame]
#!/usr/bin/env python3
import collections
import copyreg
import io
import pickle
import traceback
def serialize(obj):
f = io.BytesIO()
p = pickle.Pickler(f)
p.dispatch_table = copyreg.dispatch_table.copy()
p.dump(obj)
return f.getvalue()
def run_python_udf_internal(pickled_python_udf):
python_udf = pickle.loads(pickled_python_udf)
try:
result = python_udf.func(*python_udf.args, **python_udf.kwargs)
except Exception as e:
# except str = exception info + traceback string
except_str = "{}\n{}".format(repr(e), traceback.format_exc())
result = RemoteException(except_str)
return serialize(result)
def load_python_udf_result_internal(pickled_python_result):
result = pickle.loads(pickled_python_result)
if isinstance(result, RemoteException):
raise Exception(result.msg)
return result
PythonUDF = collections.namedtuple("PythonUDF", ["func", "args", "kwargs"])
RemoteException = collections.namedtuple("RemoteException", ["msg"])