blob: b563a67201a8840542a956bec65efefde4659644 [file] [log] [blame]
#!/usr/bin/env python
"""GCS related input readers."""
__all__ = [
"GCSInputReader",
"GCSRecordInputReader",
"PathFilter",
]
# pylint: disable=g-bad-name
# pylint: disable=protected-access
import logging
import pickle
import time
from mapreduce import errors
from mapreduce import records
from mapreduce.api import map_job
# pylint: disable=g-import-not-at-top
# TODO(user): Cleanup imports if/when cloudstorage becomes part of runtime.
try:
# Check if the full cloudstorage package exists. The stub part is in runtime.
import cloudstorage
if hasattr(cloudstorage, "_STUB"):
cloudstorage = None
except ImportError:
pass # CloudStorage library not available
class PathFilter(object):
"""Path filter for GCSInputReader."""
def accept(self, slice_ctx, path):
"""Accepts a path.
Only accepted path will be opened for read.
Args:
slice_ctx: the instance of SliceContext for current slice.
path: a GCS filename of form '/bucket/filename'
Returns:
True if this file should be read. False otherwise.
"""
raise NotImplementedError()
class GCSInputReader(map_job.InputReader):
"""Input reader from Google Cloud Storage using the cloudstorage library.
Required configuration in the mapper_spec.input_reader dictionary.
BUCKET_NAME_PARAM: name of the bucket to use. No "/" prefix or suffix.
OBJECT_NAMES_PARAM: a list of object names or prefixes. All objects must be
in the BUCKET_NAME_PARAM bucket. If the name ends with a * it will be
treated as prefix and all objects with matching names will be read.
Entries should not start with a slash unless that is part of the object's
name. An example list could be:
["my-1st-input-file", "directory/my-2nd-file", "some/other/dir/input-*"]
To retrieve all files "*" will match every object in the bucket. If a file
is listed twice or is covered by multiple prefixes it will be read twice,
there is no de-duplication.
Optional configuration in the mapper_sec.input_reader dictionary.
BUFFER_SIZE_PARAM: the size of the read buffer for each file handle.
PATH_FILTER_PARAM: an instance of PathFilter. PathFilter is a predicate
on which filenames to read.
DELIMITER_PARAM: str. The delimiter that signifies directory.
If you have too many files to shard on the granularity of individual
files, you can specify this to enable shallow splitting. In this mode,
the reader only goes one level deep during "*" expansion and stops when
the delimiter is encountered.
"""
# Counters.
COUNTER_FILE_READ = "file-read"
COUNTER_FILE_MISSING = "file-missing"
# Supported parameters
BUCKET_NAME_PARAM = "bucket_name"
OBJECT_NAMES_PARAM = "objects"
BUFFER_SIZE_PARAM = "buffer_size"
DELIMITER_PARAM = "delimiter"
PATH_FILTER_PARAM = "path_filter"
# Internal parameters
_ACCOUNT_ID_PARAM = "account_id"
# Other internal configuration constants
_JSON_PICKLE = "pickle"
_STRING_MAX_FILES_LISTED = 10 # Max files shown in the str representation
# Input reader can also take in start and end filenames and do
# listbucket. This saves space but has two cons.
# 1. Files to read are less well defined: files can be added or removed over
# the lifetime of the MR job.
# 2. A shard has to process files from a contiguous namespace.
# May introduce staggering shard.
def __init__(self, filenames, index=0, buffer_size=None, _account_id=None,
delimiter=None, path_filter=None):
"""Initialize a GoogleCloudStorageInputReader instance.
Args:
filenames: A list of Google Cloud Storage filenames of the form
'/bucket/objectname'.
index: Index of the next filename to read.
buffer_size: The size of the read buffer, None to use default.
_account_id: Internal use only. See cloudstorage documentation.
delimiter: Delimiter used as path separator. See class doc.
path_filter: An instance of PathFilter.
"""
super(GCSInputReader, self).__init__()
self._filenames = filenames
self._index = index
self._buffer_size = buffer_size
self._account_id = _account_id
self._delimiter = delimiter
self._bucket = None
self._bucket_iter = None
self._path_filter = path_filter
self._slice_ctx = None
def _next_file(self):
"""Find next filename.
self._filenames may need to be expanded via listbucket.
Returns:
None if no more file is left. Filename otherwise.
"""
while True:
if self._bucket_iter:
try:
return self._bucket_iter.next().filename
except StopIteration:
self._bucket_iter = None
self._bucket = None
if self._index >= len(self._filenames):
return
filename = self._filenames[self._index]
self._index += 1
if self._delimiter is None or not filename.endswith(self._delimiter):
return filename
self._bucket = cloudstorage.listbucket(filename,
delimiter=self._delimiter)
self._bucket_iter = iter(self._bucket)
@classmethod
def validate(cls, job_config):
"""Validate mapper specification.
Args:
job_config: map_job.JobConfig.
Raises:
BadReaderParamsError: if the specification is invalid for any reason such
as missing the bucket name or providing an invalid bucket name.
"""
reader_params = job_config.input_reader_params
# Bucket Name is required
if cls.BUCKET_NAME_PARAM not in reader_params:
raise errors.BadReaderParamsError(
"%s is required for Google Cloud Storage" %
cls.BUCKET_NAME_PARAM)
try:
cloudstorage.validate_bucket_name(
reader_params[cls.BUCKET_NAME_PARAM])
except ValueError, error:
raise errors.BadReaderParamsError("Bad bucket name, %s" % (error))
# Object Name(s) are required
if cls.OBJECT_NAMES_PARAM not in reader_params:
raise errors.BadReaderParamsError(
"%s is required for Google Cloud Storage" %
cls.OBJECT_NAMES_PARAM)
filenames = reader_params[cls.OBJECT_NAMES_PARAM]
if not isinstance(filenames, list):
raise errors.BadReaderParamsError(
"Object name list is not a list but a %s" %
filenames.__class__.__name__)
for filename in filenames:
if not isinstance(filename, basestring):
raise errors.BadReaderParamsError(
"Object name is not a string but a %s" %
filename.__class__.__name__)
# Delimiter.
if cls.DELIMITER_PARAM in reader_params:
delimiter = reader_params[cls.DELIMITER_PARAM]
if not isinstance(delimiter, basestring):
raise errors.BadReaderParamsError(
"%s is not a string but a %s" %
(cls.DELIMITER_PARAM, type(delimiter)))
# Buffer size.
if cls.BUFFER_SIZE_PARAM in reader_params:
buffer_size = reader_params[cls.BUFFER_SIZE_PARAM]
if not isinstance(buffer_size, int):
raise errors.BadReaderParamsError(
"%s is not an int but a %s" %
(cls.BUFFER_SIZE_PARAM, type(buffer_size)))
# Path filter.
if cls.PATH_FILTER_PARAM in reader_params:
path_filter = reader_params[cls.PATH_FILTER_PARAM]
if not isinstance(path_filter, PathFilter):
raise errors.BadReaderParamsError(
"%s is not an instance of PathFilter but %s." %
(cls.PATH_FILTER_PARAM, type(path_filter)))
@classmethod
def split_input(cls, job_config):
"""Returns a list of input readers.
An equal number of input files are assigned to each shard (+/- 1). If there
are fewer files than shards, fewer than the requested number of shards will
be used. Input files are currently never split (although for some formats
could be and may be split in a future implementation).
Args:
job_config: map_job.JobConfig
Returns:
A list of InputReaders. None when no input data can be found.
"""
reader_params = job_config.input_reader_params
bucket = reader_params[cls.BUCKET_NAME_PARAM]
filenames = reader_params[cls.OBJECT_NAMES_PARAM]
delimiter = reader_params.get(cls.DELIMITER_PARAM)
account_id = reader_params.get(cls._ACCOUNT_ID_PARAM)
buffer_size = reader_params.get(cls.BUFFER_SIZE_PARAM)
path_filter = reader_params.get(cls.PATH_FILTER_PARAM)
# Gather the complete list of files (expanding wildcards)
all_filenames = []
for filename in filenames:
if filename.endswith("*"):
all_filenames.extend(
[file_stat.filename for file_stat in cloudstorage.listbucket(
"/" + bucket + "/" + filename[:-1], delimiter=delimiter,
_account_id=account_id)])
else:
all_filenames.append("/%s/%s" % (bucket, filename))
# Split into shards
readers = []
for shard in range(0, job_config.shard_count):
shard_filenames = all_filenames[shard::job_config.shard_count]
if shard_filenames:
readers.append(cls(
shard_filenames, buffer_size=buffer_size, _account_id=account_id,
delimiter=delimiter, path_filter=path_filter))
return readers
@classmethod
def from_json(cls, state):
obj = pickle.loads(state[cls._JSON_PICKLE])
if obj._bucket:
obj._bucket_iter = iter(obj._bucket)
return obj
def to_json(self):
before_iter = self._bucket_iter
before_slice_ctx = self._slice_ctx
self._bucket_iter = None
self._slice_ctx = None
try:
return {self._JSON_PICKLE: pickle.dumps(self)}
finally:
self._bucket_itr = before_iter
self._slice_ctx = before_slice_ctx
def next(self):
"""Returns a handler to the next file.
Non existent files will be logged and skipped. The file might have been
removed after input splitting.
Returns:
The next input from this input reader in the form of a cloudstorage
ReadBuffer that supports a File-like interface (read, readline, seek,
tell, and close). An error may be raised if the file can not be opened.
Raises:
StopIteration: The list of files has been exhausted.
"""
options = {}
if self._buffer_size:
options["read_buffer_size"] = self._buffer_size
if self._account_id:
options["_account_id"] = self._account_id
while True:
filename = self._next_file()
if filename is None:
raise StopIteration()
if (self._path_filter and
not self._path_filter.accept(self._slice_ctx, filename)):
continue
try:
start_time = time.time()
handle = cloudstorage.open(filename, **options)
self._slice_ctx.incr(self.COUNTER_IO_READ_MSEC,
int(time.time() - start_time) * 1000)
self._slice_ctx.incr(self.COUNTER_FILE_READ)
return handle
except cloudstorage.NotFoundError:
logging.warning("File %s may have been removed. Skipping file.",
filename)
self._slice_ctx.incr(self.COUNTER_FILE_MISSING)
def __str__(self):
# Only show a limited number of files individually for readability
num_files = len(self._filenames)
if num_files > self._STRING_MAX_FILES_LISTED:
names = "%s...%s + %d not shown" % (
",".join(self._filenames[0:self._STRING_MAX_FILES_LISTED-1]),
self._filenames[-1],
num_files - self._STRING_MAX_FILES_LISTED)
else:
names = ",".join(self._filenames)
if self._index > num_files:
status = "EOF"
else:
status = "Next %s (%d of %d)" % (
self._filenames[self._index],
self._index + 1, # +1 for human 1-indexing
num_files)
return "CloudStorage [%s, %s]" % (status, names)
@classmethod
def params_to_json(cls, params):
"""Inherit docs."""
params_cp = dict(params)
if cls.PATH_FILTER_PARAM in params_cp:
path_filter = params_cp[cls.PATH_FILTER_PARAM]
params_cp[cls.PATH_FILTER_PARAM] = pickle.dumps(path_filter)
return params_cp
@classmethod
def params_from_json(cls, json_params):
if cls.PATH_FILTER_PARAM in json_params:
path_filter = pickle.loads(json_params[cls.PATH_FILTER_PARAM])
json_params[cls.PATH_FILTER_PARAM] = path_filter
return json_params
class GCSRecordInputReader(GCSInputReader):
"""Read data from a Google Cloud Storage file using LevelDB format.
See the GCSInputReader for additional configuration options.
"""
def __getstate__(self):
result = self.__dict__.copy()
# record reader may not exist if reader has not been used
if "_record_reader" in result:
# RecordsReader has no buffering, it can safely be reconstructed after
# deserialization
result.pop("_record_reader")
return result
def next(self):
"""Returns the next input from this input reader, a record.
Returns:
The next input from this input reader in the form of a record read from
an LevelDB file.
Raises:
StopIteration: The ordered set records has been exhausted.
"""
while True:
if not hasattr(self, "_cur_handle") or self._cur_handle is None:
# If there are no more files, StopIteration is raised here
self._cur_handle = super(GCSRecordInputReader, self).next()
if not hasattr(self, "_record_reader") or self._record_reader is None:
self._record_reader = records.RecordsReader(self._cur_handle)
try:
start_time = time.time()
content = self._record_reader.read()
self._slice_ctx.incr(self.COUNTER_IO_READ_BYTE, len(content))
self._slice_ctx.incr(self.COUNTER_IO_READ_MSEC,
int(time.time() - start_time) * 1000)
return content
except EOFError:
self._cur_handle = None
self._record_reader = None