blob: a5c5235059c9b8ca1dfc3eef51c4a36aaedc6662 [file] [log] [blame]
#!/usr/bin/env python
"""Input Reader interface for map job."""
from mapreduce import errors
from mapreduce import json_util
from mapreduce import shard_life_cycle
# pylint: disable=protected-access
# pylint: disable=invalid-name
class InputReader(shard_life_cycle._ShardLifeCycle, json_util.JsonMixin):
"""Abstract base class for input readers.
InputReader's lifecycle:
1. validate() is called to validate JobConfig.
2. split_input is called to split inputs based on map_job.JobConfig.
The class method creates a set of InputReader instances.
3. beging_shard/end_shard/begin_slice/end_slice are called at the time
implied by the names.
4. next() is called by each shard on each instance. The output of next()
is fed into JobConfig.mapper instance.
5. to_json()/from_json() are used to persist reader's state across multiple
slices.
"""
# Counters.
# Bytes read.
COUNTER_IO_READ_BYTE = "io-read-byte"
# Milliseconds spent reading data.
COUNTER_IO_READ_MSEC = "io-read-msec"
def __init__(self):
self._slice_ctx = None
def __iter__(self):
return self
def next(self):
"""Returns the next input from this input reader.
Returns:
The next input read by this input reader. The return value is
fed into mapper.
Raises:
StopIteration when no more item is left.
"""
raise NotImplementedError("next() not implemented in %s" % self.__class__)
@classmethod
def from_json(cls, state):
"""Creates an instance of the InputReader for the given state.
Args:
state: The InputReader state as returned by to_json.
Returns:
An instance of the InputReader that can resume iteration.
"""
raise NotImplementedError("from_json() not implemented in %s" % cls)
def to_json(self):
"""Returns input reader state for the remaining inputs.
Returns:
A json-serializable state for the InputReader.
"""
raise NotImplementedError("to_json() not implemented in %s" %
self.__class__)
@classmethod
def split_input(cls, job_config):
"""Returns an iterator of input readers.
This method returns a container of input readers,
one for each shard. The container must have __iter__ defined.
http://docs.python.org/2/reference/datamodel.html#object.__iter__
This method should try to split inputs among readers evenly.
Args:
job_config: an instance of map_job.JobConfig.
Returns:
An iterator of input readers.
"""
raise NotImplementedError("split_input() not implemented in %s" % cls)
@classmethod
def validate(cls, job_config):
"""Validates relevant parameters.
This method can validate fields which it deems relevant.
Args:
job_config: an instance of map_job.JobConfig.
Raises:
errors.BadReaderParamsError: required parameters are missing or invalid.
"""
if job_config.input_reader_cls != cls:
raise errors.BadReaderParamsError(
"Expect input reader class %r, got %r." %
(cls, job_config.input_reader_cls))
def begin_slice(self, slice_ctx):
"""Keeps an internal reference to slice_ctx.
Args:
slice_ctx: SliceContext singleton instance for this slice.
"""
self._slice_ctx = slice_ctx
def end_slice(self, slice_ctx):
"""Drops the internal reference to slice_ctx.
Args:
slice_ctx: SliceContext singleton instance for this slice.
"""
self._slice_ctx = None
@classmethod
def params_to_json(cls, params):
"""Translates JobConfig.input_reader_params to json serializable format.
For most reader, this may be an identity transformation.
Args:
params: JobConfig.input_reader_params.
Returns:
The json serializable format of params.
"""
return params
@classmethod
def params_from_json(cls, json_params):
"""Reverse function of params_to_json."""
return json_params