blob: 4b03c138b589e6fb03c4a096f9f76c810b3269c2 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipelines for mapreduce library."""
from __future__ import with_statement
__all__ = [
"CleanupPipeline",
"MapPipeline",
"MapperPipeline",
"MapreducePipeline",
"ReducePipeline",
"ShufflePipeline",
]
import pipeline
from pipeline import common as pipeline_common
from google.appengine.api import app_identity
from mapreduce import errors
from mapreduce import input_readers
from mapreduce import mapper_pipeline
from mapreduce import model
from mapreduce import output_writers
from mapreduce import pipeline_base
from mapreduce import shuffler
from mapreduce import util
# pylint: disable=g-bad-name
# pylint: disable=protected-access
# Mapper pipeline is extracted only to remove dependency cycle with shuffler.py
# Reimport it back.
MapperPipeline = mapper_pipeline.MapperPipeline
ShufflePipeline = shuffler.ShufflePipeline
CleanupPipeline = shuffler._GCSCleanupPipeline
# For backward compatibility.
_ReducerReader = input_readers._ReducerReader
class MapPipeline(pipeline_base._OutputSlotsMixin,
pipeline_base.PipelineBase):
"""Runs the map stage of MapReduce.
Iterates over input reader and outputs data into key/value format
for shuffler consumption.
Args:
job_name: mapreduce job name as string.
mapper_spec: specification of map handler function as string.
input_reader_spec: input reader specification as string.
params: mapper and input reader parameters as dict.
shards: number of shards to start as int.
Returns:
list of filenames written to by this mapper, one for each shard.
"""
def run(self,
job_name,
mapper_spec,
input_reader_spec,
params,
shards=None):
new_params = dict(params or {})
# Although we are using all the default settings (and inherited bucket_name)
# we still need to define an output_writer dict in order to pass validation.
new_params.update({"output_writer": {}})
yield MapperPipeline(
job_name + "-map",
mapper_spec,
input_reader_spec,
output_writer_spec=(output_writers.__name__ +
"._GoogleCloudStorageKeyValueOutputWriter"),
params=new_params,
shards=shards)
class ReducePipeline(pipeline_base._OutputSlotsMixin,
pipeline_base.PipelineBase):
"""Runs the reduce stage of MapReduce.
Merge-reads input files and runs reducer function on them.
Args:
job_name: mapreduce job name as string.
reader_spec: specification of reduce function.
output_writer_spec: specification of output write to use with reduce
function.
params: mapper parameters to use as dict.
bucket_name: The name of the Google Cloud Storage bucket.
filenames: list of filenames to reduce.
combiner_spec: Optional. Specification of a combine function. If not
supplied, no combine step will take place. The combine function takes a
key, list of values and list of previously combined results. It yields
combined values that might be processed by another combiner call, but will
eventually end up in reducer. The combiner output key is assumed to be the
same as the input key.
shards: Optional. Number of output shards. Defaults to the number of
input files.
Returns:
filenames from output writer.
"""
output_names = mapper_pipeline.MapperPipeline.output_names
def run(self,
job_name,
reducer_spec,
output_writer_spec,
params,
bucket_name,
filenames,
combiner_spec=None,
shards=None):
filenames_only = (
util.strip_prefix_from_items("/%s/" % bucket_name, filenames))
new_params = dict(params or {})
new_params.update({
"input_reader": {
"bucket_name": bucket_name,
"objects": filenames_only,
}})
if combiner_spec:
new_params.update({
"combiner_spec": combiner_spec,
})
# TODO(user): Test this
if shards is None:
shards = len(filenames)
yield mapper_pipeline.MapperPipeline(
job_name + "-reduce",
reducer_spec,
__name__ + "._ReducerReader",
output_writer_spec,
new_params,
shards=shards)
class MapreducePipeline(pipeline_base._OutputSlotsMixin,
pipeline_base.PipelineBase):
"""Pipeline to execute MapReduce jobs.
The Shuffle stage uses Google Cloud Storage (GCS). For newly created projects,
GCS is activated automatically. To activate GCS follow these instructions:
https://cloud.google.com/storage/docs/signup#activate
Args:
job_name: job name as string.
mapper_spec: specification of mapper to use.
reducer_spec: specification of reducer to use.
input_reader_spec: specification of input reader to read data from.
output_writer_spec: specification of output writer to save reduce output to.
mapper_params: parameters to use for mapper phase.
reducer_params: parameters to use for reduce phase.
shards: number of shards to use as int.
combiner_spec: Optional. Specification of a combine function. If not
supplied, no combine step will take place. The combine function takes a
key, list of values and list of previously combined results. It yields
combined values that might be processed by another combiner call, but will
eventually end up in reducer. The combiner output key is assumed to be the
same as the input key.
Returns:
result_status: one of model.MapreduceState._RESULTS. Check this to see
if the job is successful.
default: a list of filenames if the mapreduce was successful and
was outputting files. An empty list otherwise.
"""
output_names = mapper_pipeline.MapperPipeline.output_names
def run(self,
job_name,
mapper_spec,
reducer_spec,
input_reader_spec,
output_writer_spec=None,
mapper_params=None,
reducer_params=None,
shards=None,
combiner_spec=None):
# Check that you have a bucket_name set in the mapper_params and set it
# to the default if not.
if mapper_params.get("bucket_name") is None:
try:
mapper_params["bucket_name"] = (
app_identity.get_default_gcs_bucket_name())
except Exception, e:
raise errors.Error("Unable to get the GCS default bucket name. "
"Check to see that GCS is properly activated. "
+ str(e))
if mapper_params["bucket_name"] is None:
raise errors.Error("There is no GCS default bucket name. "
"Check to see that GCS is properly activated.")
# TODO(user): Check that the bucket is indeed writable.
map_pipeline = yield MapPipeline(job_name,
mapper_spec,
input_reader_spec,
params=mapper_params,
shards=shards)
shuffler_pipeline = yield ShufflePipeline(
job_name, mapper_params, map_pipeline)
reducer_pipeline = yield ReducePipeline(
job_name,
reducer_spec,
output_writer_spec,
reducer_params,
mapper_params["bucket_name"],
shuffler_pipeline,
combiner_spec=combiner_spec)
with pipeline.After(reducer_pipeline):
all_temp_files = yield pipeline_common.Extend(
map_pipeline, shuffler_pipeline)
yield CleanupPipeline(all_temp_files)
yield _ReturnPipeline(map_pipeline.result_status,
reducer_pipeline.result_status,
reducer_pipeline.counters,
reducer_pipeline.job_id,
reducer_pipeline)
class _ReturnPipeline(pipeline_base._OutputSlotsMixin,
pipeline_base.PipelineBase):
"""Returns Mapreduce result.
Fills outputs for MapreducePipeline. See MapreducePipeline.
"""
output_names = mapper_pipeline.MapperPipeline.output_names
def run(self,
map_result_status,
reduce_result_status,
reduce_counters,
job_id,
reduce_outputs):
if (map_result_status == model.MapreduceState.RESULT_ABORTED or
reduce_result_status == model.MapreduceState.RESULT_ABORTED):
result_status = model.MapreduceState.RESULT_ABORTED
elif (map_result_status == model.MapreduceState.RESULT_FAILED or
reduce_result_status == model.MapreduceState.RESULT_FAILED):
result_status = model.MapreduceState.RESULT_FAILED
else:
result_status = model.MapreduceState.RESULT_SUCCESS
self.fill(self.outputs.result_status, result_status)
self.fill(self.outputs.counters, reduce_counters)
self.fill(self.outputs.job_id, job_id)
if result_status == model.MapreduceState.RESULT_SUCCESS:
yield pipeline_common.Return(reduce_outputs)
else:
yield pipeline_common.Return([])