blob: d390e66c5670ede402d77630cfe23e21a8ae6563 [file] [log] [blame]
# Copyright 2009 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable-msg=W0612,W0613,C6409
"""A fake filesystem implementation for unit testing.
Includes:
FakeFile: Provides the appearance of a real file.
FakeDirectory: Provides the appearance of a real dir.
FakeFilesystem: Provides the appearance of a real directory hierarchy.
FakeOsModule: Uses FakeFilesystem to provide a fake os module replacement.
FakePathModule: Faked os.path module replacement.
FakeFileOpen: Faked file() and open() function replacements.
Usage:
>>> import fake_filesystem
>>> filesystem = fake_filesystem.FakeFilesystem()
>>> os_module = fake_filesystem.FakeOsModule(filesystem)
>>> pathname = '/a/new/dir/new-file'
Create a new file object, creating parent directory objects as needed:
>>> os_module.path.exists(pathname)
False
>>> new_file = filesystem.CreateFile(pathname)
File objects can't be overwritten:
>>> os_module.path.exists(pathname)
True
>>> try:
... filesystem.CreateFile(pathname)
... except IOError as e:
... assert e.errno == errno.EEXIST, 'unexpected errno: %d' % e.errno
... assert e.strerror == 'File already exists in fake filesystem'
Remove a file object:
>>> filesystem.RemoveObject(pathname)
>>> os_module.path.exists(pathname)
False
Create a new file object at the previous path:
>>> beatles_file = filesystem.CreateFile(pathname,
... contents='Dear Prudence\\nWon\\'t you come out to play?\\n')
>>> os_module.path.exists(pathname)
True
Use the FakeFileOpen class to read fake file objects:
>>> file_module = fake_filesystem.FakeFileOpen(filesystem)
>>> for line in file_module(pathname):
... print line.rstrip()
...
Dear Prudence
Won't you come out to play?
File objects cannot be treated like directory objects:
>>> os_module.listdir(pathname) #doctest: +NORMALIZE_WHITESPACE
Traceback (most recent call last):
File "fake_filesystem.py", line 291, in listdir
raise OSError(errno.ENOTDIR,
OSError: [Errno 20] Fake os module: not a directory: '/a/new/dir/new-file'
The FakeOsModule can list fake directory objects:
>>> os_module.listdir(os_module.path.dirname(pathname))
['new-file']
The FakeOsModule also supports stat operations:
>>> import stat
>>> stat.S_ISREG(os_module.stat(pathname).st_mode)
True
>>> stat.S_ISDIR(os_module.stat(os_module.path.dirname(pathname)).st_mode)
True
"""
import errno
import heapq
import os
import stat
import sys
import time
import warnings
try:
import cStringIO as io # pylint: disable-msg=C6204
except ImportError:
import io # pylint: disable-msg=C6204
__pychecker__ = 'no-reimportself'
__version__ = '2.5'
PERM_READ = 0o400 # Read permission bit.
PERM_WRITE = 0o200 # Write permission bit.
PERM_EXE = 0o100 # Write permission bit.
PERM_DEF = 0o777 # Default permission bits.
PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
PERM_ALL = 0o7777 # All permission bits.
_OPEN_MODE_MAP = {
# mode name:(file must exist, need read, need write,
# truncate [implies need write], append)
'r': (True, True, False, False, False),
'w': (False, False, True, True, False),
'a': (False, False, True, False, True),
'r+': (True, True, True, False, False),
'w+': (False, True, True, True, False),
'a+': (False, True, True, False, True),
}
_MAX_LINK_DEPTH = 20
FAKE_PATH_MODULE_DEPRECATION = ('Do not instantiate a FakePathModule directly; '
'let FakeOsModule instantiate it. See the '
'FakeOsModule docstring for details.')
class Error(Exception):
pass
_is_windows = sys.platform.startswith('win')
_is_cygwin = sys.platform == 'cygwin'
if _is_windows:
# On Windows, raise WindowsError instead of OSError if available
OSError = WindowsError # pylint: disable-msg=E0602,W0622
class FakeLargeFileIoException(Error):
def __init__(self, file_path):
Error.__init__(self,
'Read and write operations not supported for '
'fake large file: %s' % file_path)
def CopyModule(old):
"""Recompiles and creates new module object."""
saved = sys.modules.pop(old.__name__, None)
new = __import__(old.__name__)
sys.modules[old.__name__] = saved
return new
class FakeFile(object):
"""Provides the appearance of a real file.
Attributes currently faked out:
st_mode: user-specified, otherwise S_IFREG
st_ctime: the time.time() timestamp when the file is created.
st_size: the size of the file
Other attributes needed by os.stat are assigned default value of None
these include: st_ino, st_dev, st_nlink, st_uid, st_gid, st_atime,
st_mtime
"""
def __init__(self, name, st_mode=stat.S_IFREG | PERM_DEF_FILE,
contents=None):
"""init.
Args:
name: name of the file/directory, without parent path information
st_mode: the stat.S_IF* constant representing the file type (i.e.
stat.S_IFREG, stat.SIFDIR)
contents: the contents of the filesystem object; should be a string for
regular files, and a list of other FakeFile or FakeDirectory objects
for FakeDirectory objects
"""
self.name = name
self.st_mode = st_mode
self.contents = contents
self.epoch = 0
self.st_ctime = int(time.time())
self.st_atime = self.st_ctime
self.st_mtime = self.st_ctime
if contents:
self.st_size = len(contents)
else:
self.st_size = 0
# Non faked features, write setter methods for fakeing them
self.st_ino = None
self.st_dev = None
self.st_nlink = None
self.st_uid = None
self.st_gid = None
def SetLargeFileSize(self, st_size):
"""Sets the self.st_size attribute and replaces self.content with None.
Provided specifically to simulate very large files without regards
to their content (which wouldn't fit in memory)
Args:
st_size: The desired file size
Raises:
IOError: if the st_size is not a non-negative integer
"""
# the st_size should be an positive integer value
if not isinstance(st_size, int) or st_size < 0:
raise IOError(errno.ENOSPC,
'Fake file object: can not create non negative integer '
'size=%r fake file' % st_size,
self.name)
self.st_size = st_size
self.contents = None
def IsLargeFile(self):
"""Return True if this file was initialized with size but no contents."""
return self.contents is None
def SetContents(self, contents):
"""Sets the file contents and size.
Args:
contents: string, new content of file.
"""
# convert a byte array to a string
if sys.version_info >= (3, 0) and isinstance(contents, bytes):
contents = ''.join(chr(i) for i in contents)
self.contents = contents
self.st_size = len(contents)
self.epoch += 1
def SetSize(self, st_size):
"""Resizes file content, padding with nulls if new size exceeds the old.
Args:
st_size: The desired size for the file.
Raises:
IOError: if the st_size arg is not a non-negative integer
"""
if not isinstance(st_size, int) or st_size < 0:
raise IOError(errno.ENOSPC,
'Fake file object: can not create non negative integer '
'size=%r fake file' % st_size,
self.name)
current_size = len(self.contents)
if st_size < current_size:
self.contents = self.contents[:st_size]
else:
self.contents = '%s%s' % (self.contents, '\0' * (st_size - current_size))
self.st_size = len(self.contents)
self.epoch += 1
def SetATime(self, st_atime):
"""Set the self.st_atime attribute.
Args:
st_atime: The desired atime.
"""
self.st_atime = st_atime
def SetMTime(self, st_mtime):
"""Set the self.st_mtime attribute.
Args:
st_mtime: The desired mtime.
"""
self.st_mtime = st_mtime
def __str__(self):
return '%s(%o)' % (self.name, self.st_mode)
def SetIno(self, st_ino):
"""Set the self.st_ino attribute.
Args:
st_ino: The desired inode.
"""
self.st_ino = st_ino
class FakeDirectory(FakeFile):
"""Provides the appearance of a real dir."""
def __init__(self, name, perm_bits=PERM_DEF):
"""init.
Args:
name: name of the file/directory, without parent path information
perm_bits: permission bits. defaults to 0o777.
"""
FakeFile.__init__(self, name, stat.S_IFDIR | perm_bits, {})
def AddEntry(self, pathname):
"""Adds a child FakeFile to this directory.
Args:
pathname: FakeFile instance to add as a child of this directory
"""
self.contents[pathname.name] = pathname
def GetEntry(self, pathname_name):
"""Retrieves the specified child file or directory.
Args:
pathname_name: basename of the child object to retrieve
Returns:
string, file contents
Raises:
KeyError: if no child exists by the specified name
"""
return self.contents[pathname_name]
def RemoveEntry(self, pathname_name):
"""Removes the specified child file or directory.
Args:
pathname_name: basename of the child object to remove
Raises:
KeyError: if no child exists by the specified name
"""
del self.contents[pathname_name]
def __str__(self):
rc = super(FakeDirectory, self).__str__() + ':\n'
for item in self.contents:
item_desc = self.contents[item].__str__()
for line in item_desc.split('\n'):
if line:
rc = rc + ' ' + line + '\n'
return rc
class FakeFilesystem(object):
"""Provides the appearance of a real directory tree for unit testing."""
def __init__(self, path_separator=os.path.sep):
"""init.
Args:
path_separator: optional substitute for os.path.sep
"""
self.path_separator = path_separator
self.root = FakeDirectory(self.path_separator)
self.cwd = self.root.name
# We can't query the current value without changing it:
self.umask = os.umask(0o22)
os.umask(self.umask)
# A list of open file objects. Their position in the list is their
# file descriptor number
self.open_files = []
# A heap containing all free positions in self.open_files list
self.free_fd_heap = []
def SetIno(self, path, st_ino):
"""Set the self.st_ino attribute of file at 'path'.
Args:
path: Path to file.
st_ino: The desired inode.
"""
self.GetObject(path).SetIno(st_ino)
def AddOpenFile(self, file_obj):
"""Adds file_obj to the list of open files on the filesystem.
The position in the self.open_files array is the file descriptor number
Args:
file_obj: file object to be added to open files list.
Returns:
File descriptor number for the file object.
"""
if self.free_fd_heap:
open_fd = heapq.heappop(self.free_fd_heap)
self.open_files[open_fd] = file_obj
return open_fd
self.open_files.append(file_obj)
return len(self.open_files) - 1
def CloseOpenFile(self, file_obj):
"""Removes file_obj from the list of open files on the filesystem.
Sets the entry in open_files to None.
Args:
file_obj: file object to be removed to open files list.
"""
self.open_files[file_obj.filedes] = None
heapq.heappush(self.free_fd_heap, file_obj.filedes)
def GetOpenFile(self, file_des):
"""Returns an open file.
Args:
file_des: file descriptor of the open file.
Raises:
OSError: an invalid file descriptor.
TypeError: filedes is not an integer.
Returns:
Open file object.
"""
if not isinstance(file_des, int):
raise TypeError('an integer is required')
if (file_des >= len(self.open_files) or
self.open_files[file_des] is None):
raise OSError(errno.EBADF, 'Bad file descriptor', file_des)
return self.open_files[file_des]
def CollapsePath(self, path):
"""Mimics os.path.normpath using the specified path_separator.
Mimics os.path.normpath using the path_separator that was specified
for this FakeFilesystem. Normalizes the path, but unlike the method
NormalizePath, does not make it absolute. Eliminates dot components
(. and ..) and combines repeated path separators (//). Initial ..
components are left in place for relative paths. If the result is an empty
path, '.' is returned instead. Unlike the real os.path.normpath, this does
not replace '/' with '\\' on Windows.
Args:
path: (str) The path to normalize.
Returns:
(str) A copy of path with empty components and dot components removed.
"""
is_absolute_path = path.startswith(self.path_separator)
path_components = path.split(self.path_separator)
collapsed_path_components = []
for component in path_components:
if (not component) or (component == '.'):
continue
if component == '..':
if collapsed_path_components and (
collapsed_path_components[-1] != '..'):
# Remove an up-reference: directory/..
collapsed_path_components.pop()
continue
elif is_absolute_path:
# Ignore leading .. components if starting from the root directory.
continue
collapsed_path_components.append(component)
collapsed_path = self.path_separator.join(collapsed_path_components)
if is_absolute_path:
collapsed_path = self.path_separator + collapsed_path
return collapsed_path or '.'
def NormalizePath(self, path):
"""Absolutize and minimalize the given path.
Forces all relative paths to be absolute, and normalizes the path to
eliminate dot and empty components.
Args:
path: path to normalize
Returns:
The normalized path relative to the current working directory, or the root
directory if path is empty.
"""
if not path:
path = self.path_separator
elif not path.startswith(self.path_separator):
# Prefix relative paths with cwd, if cwd is not root.
path = self.path_separator.join(
(self.cwd != self.root.name and self.cwd or '',
path))
if path == '.':
path = self.cwd
return self.CollapsePath(path)
def SplitPath(self, path):
"""Mimics os.path.split using the specified path_separator.
Mimics os.path.split using the path_separator that was specified
for this FakeFilesystem.
Args:
path: (str) The path to split.
Returns:
(str) A duple (pathname, basename) for which pathname does not
end with a slash, and basename does not contain a slash.
"""
path_components = path.split(self.path_separator)
if not path_components:
return ('', '')
basename = path_components.pop()
if not path_components:
return ('', basename)
for component in path_components:
if component:
# The path is not the root; it contains a non-separator component.
# Strip all trailing separators.
while not path_components[-1]:
path_components.pop()
return (self.path_separator.join(path_components), basename)
# Root path. Collapse all leading separators.
return (self.path_separator, basename)
def JoinPaths(self, *paths):
"""Mimics os.path.join using the specified path_separator.
Mimics os.path.join using the path_separator that was specified
for this FakeFilesystem.
Args:
*paths: (str) Zero or more paths to join.
Returns:
(str) The paths joined by the path separator, starting with the last
absolute path in paths.
"""
if len(paths) == 1:
return paths[0]
joined_path_segments = []
for path_segment in paths:
if path_segment.startswith(self.path_separator):
# An absolute path
joined_path_segments = [path_segment]
else:
if (joined_path_segments and
not joined_path_segments[-1].endswith(self.path_separator)):
joined_path_segments.append(self.path_separator)
if path_segment:
joined_path_segments.append(path_segment)
return ''.join(joined_path_segments)
def GetPathComponents(self, path):
"""Breaks the path into a list of component names.
Does not include the root directory as a component, as all paths
are considered relative to the root directory for the FakeFilesystem.
Callers should basically follow this pattern:
file_path = self.NormalizePath(file_path)
path_components = self.GetPathComponents(file_path)
current_dir = self.root
for component in path_components:
if component not in current_dir.contents:
raise IOError
DoStuffWithComponent(curent_dir, component)
current_dir = current_dir.GetEntry(component)
Args:
path: path to tokenize
Returns:
The list of names split from path
"""
if not path or path == self.root.name:
return []
path_components = path.split(self.path_separator)
assert path_components
if not path_components[0]:
# This is an absolute path.
path_components = path_components[1:]
return path_components
def Exists(self, file_path):
"""True if a path points to an existing file system object.
Args:
file_path: path to examine
Returns:
bool(if object exists)
Raises:
TypeError: if file_path is None
"""
if file_path is None:
raise TypeError
if not file_path:
return False
try:
file_path = self.ResolvePath(file_path)
except IOError:
return False
if file_path == self.root.name:
return True
path_components = self.GetPathComponents(file_path)
current_dir = self.root
for component in path_components:
if component not in current_dir.contents:
return False
current_dir = current_dir.contents[component]
return True
def ResolvePath(self, file_path):
"""Follow a path, resolving symlinks.
ResolvePath traverses the filesystem along the specified file path,
resolving file names and symbolic links until all elements of the path are
exhausted, or we reach a file which does not exist. If all the elements
are not consumed, they just get appended to the path resolved so far.
This gives us the path which is as resolved as it can be, even if the file
does not exist.
This behavior mimics Unix semantics, and is best shown by example. Given a
file system that looks like this:
/a/b/
/a/b/c -> /a/b2 c is a symlink to /a/b2
/a/b2/x
/a/c -> ../d
/a/x -> y
Then:
/a/b/x => /a/b/x
/a/c => /a/d
/a/x => /a/y
/a/b/c/d/e => /a/b2/d/e
Args:
file_path: path to examine
Returns:
resolved_path (string) or None
Raises:
TypeError: if file_path is None
IOError: if file_path is '' or a part of the path doesn't exist
"""
def _ComponentsToPath(component_folders):
return '%s%s' % (self.path_separator,
self.path_separator.join(component_folders))
def _ValidRelativePath(file_path):
while file_path and '/..' in file_path:
file_path = file_path[:file_path.rfind('/..')]
if not self.Exists(self.NormalizePath(file_path)):
return False
return True
def _FollowLink(link_path_components, link):
"""Follow a link w.r.t. a path resolved so far.
The component is either a real file, which is a no-op, or a symlink.
In the case of a symlink, we have to modify the path as built up so far
/a/b => ../c should yield /a/../c (which will normalize to /a/c)
/a/b => x should yield /a/x
/a/b => /x/y/z should yield /x/y/z
The modified path may land us in a new spot which is itself a
link, so we may repeat the process.
Args:
link_path_components: The resolved path built up to the link so far.
link: The link object itself.
Returns:
(string) the updated path resolved after following the link.
Raises:
IOError: if there are too many levels of symbolic link
"""
link_path = link.contents
# For links to absolute paths, we want to throw out everything in the
# path built so far and replace with the link. For relative links, we
# have to append the link to what we have so far,
if not link_path.startswith(self.path_separator):
# Relative path. Append remainder of path to what we have processed
# so far, excluding the name of the link itself.
# /a/b => ../c should yield /a/../c (which will normalize to /c)
# /a/b => d should yield a/d
components = link_path_components[:-1]
components.append(link_path)
link_path = self.path_separator.join(components)
# Don't call self.NormalizePath(), as we don't want to prepend self.cwd.
return self.CollapsePath(link_path)
if file_path is None:
# file.open(None) raises TypeError, so mimic that.
raise TypeError('Expected file system path string, received None')
if not file_path or not _ValidRelativePath(file_path):
# file.open('') raises IOError, so mimic that, and validate that all
# parts of a relative path exist.
raise IOError(errno.ENOENT,
'No such file or directory: \'%s\'' % file_path)
file_path = self.NormalizePath(file_path)
if file_path == self.root.name:
return file_path
current_dir = self.root
path_components = self.GetPathComponents(file_path)
resolved_components = []
link_depth = 0
while path_components:
component = path_components.pop(0)
resolved_components.append(component)
if component not in current_dir.contents:
# The component of the path at this point does not actually exist in
# the folder. We can't resolve the path any more. It is legal to link
# to a file that does not yet exist, so rather than raise an error, we
# just append the remaining components to what return path we have built
# so far and return that.
resolved_components.extend(path_components)
break
current_dir = current_dir.contents[component]
# Resolve any possible symlinks in the current path component.
if stat.S_ISLNK(current_dir.st_mode):
# This link_depth check is not really meant to be an accurate check.
# It is just a quick hack to prevent us from looping forever on
# cycles.
link_depth += 1
if link_depth > _MAX_LINK_DEPTH:
raise IOError(errno.EMLINK,
'Too many levels of symbolic links: \'%s\'' %
_ComponentsToPath(resolved_components))
link_path = _FollowLink(resolved_components, current_dir)
# Following the link might result in the complete replacement of the
# current_dir, so we evaluate the entire resulting path.
target_components = self.GetPathComponents(link_path)
path_components = target_components + path_components
resolved_components = []
current_dir = self.root
return _ComponentsToPath(resolved_components)
def GetObjectFromNormalizedPath(self, file_path):
"""Searches for the specified filesystem object within the fake filesystem.
Args:
file_path: specifies target FakeFile object to retrieve, with a
path that has already been normalized/resolved
Returns:
the FakeFile object corresponding to file_path
Raises:
IOError: if the object is not found
"""
if file_path == self.root.name:
return self.root
path_components = self.GetPathComponents(file_path)
target_object = self.root
try:
for component in path_components:
if not isinstance(target_object, FakeDirectory):
raise IOError(errno.ENOENT,
'No such file or directory in fake filesystem',
file_path)
target_object = target_object.GetEntry(component)
except KeyError:
raise IOError(errno.ENOENT,
'No such file or directory in fake filesystem',
file_path)
return target_object
def GetObject(self, file_path):
"""Searches for the specified filesystem object within the fake filesystem.
Args:
file_path: specifies target FakeFile object to retrieve
Returns:
the FakeFile object corresponding to file_path
Raises:
IOError: if the object is not found
"""
file_path = self.NormalizePath(file_path)
return self.GetObjectFromNormalizedPath(file_path)
def ResolveObject(self, file_path):
"""Searches for the specified filesystem object, resolving all links.
Args:
file_path: specifies target FakeFile object to retrieve
Returns:
the FakeFile object corresponding to file_path
Raises:
IOError: if the object is not found
"""
return self.GetObjectFromNormalizedPath(self.ResolvePath(file_path))
def LResolveObject(self, path):
"""Searches for the specified object, resolving only parent links.
This is analogous to the stat/lstat difference. This resolves links *to*
the object but not of the final object itself.
Args:
path: specifies target FakeFile object to retrieve
Returns:
the FakeFile object corresponding to path
Raises:
IOError: if the object is not found
"""
if path == self.root.name:
# The root directory will never be a link
return self.root
parent_directory, child_name = self.SplitPath(path)
if not parent_directory:
parent_directory = self.cwd
try:
parent_obj = self.ResolveObject(parent_directory)
assert parent_obj
if not isinstance(parent_obj, FakeDirectory):
raise IOError(errno.ENOENT,
'No such file or directory in fake filesystem',
path)
return parent_obj.GetEntry(child_name)
except KeyError:
raise IOError(errno.ENOENT,
'No such file or directory in the fake filesystem',
path)
def AddObject(self, file_path, file_object):
"""Add a fake file or directory into the filesystem at file_path.
Args:
file_path: the path to the file to be added relative to self
file_object: file or directory to add
Raises:
IOError: if file_path does not correspond to a directory
"""
try:
target_directory = self.GetObject(file_path)
target_directory.AddEntry(file_object)
except AttributeError:
raise IOError(errno.ENOTDIR,
'Not a directory in the fake filesystem',
file_path)
def RemoveObject(self, file_path):
"""Remove an existing file or directory.
Args:
file_path: the path to the file relative to self
Raises:
IOError: if file_path does not correspond to an existing file, or if part
of the path refers to something other than a directory
OSError: if the directory is in use (eg, if it is '/')
"""
if file_path == self.root.name:
raise OSError(errno.EBUSY, 'Fake device or resource busy',
file_path)
try:
dirname, basename = self.SplitPath(file_path)
target_directory = self.GetObject(dirname)
target_directory.RemoveEntry(basename)
except KeyError:
raise IOError(errno.ENOENT,
'No such file or directory in the fake filesystem',
file_path)
except AttributeError:
raise IOError(errno.ENOTDIR,
'Not a directory in the fake filesystem',
file_path)
def CreateDirectory(self, directory_path, perm_bits=PERM_DEF, inode=None):
"""Creates directory_path, and all the parent directories.
Helper method to set up your test faster
Args:
directory_path: directory to create
perm_bits: permission bits
inode: inode of directory
Returns:
the newly created FakeDirectory object
Raises:
OSError: if the directory already exists
"""
directory_path = self.NormalizePath(directory_path)
if self.Exists(directory_path):
raise OSError(errno.EEXIST,
'Directory exists in fake filesystem',
directory_path)
path_components = self.GetPathComponents(directory_path)
current_dir = self.root
for component in path_components:
if component not in current_dir.contents:
new_dir = FakeDirectory(component, perm_bits)
current_dir.AddEntry(new_dir)
current_dir = new_dir
else:
current_dir = current_dir.contents[component]
current_dir.SetIno(inode)
return current_dir
def CreateFile(self, file_path, st_mode=stat.S_IFREG | PERM_DEF_FILE,
contents='', st_size=None, create_missing_dirs=True,
apply_umask=False, inode=None):
"""Creates file_path, including all the parent directories along the way.
Helper method to set up your test faster.
Args:
file_path: path to the file to create
st_mode: the stat.S_IF constant representing the file type
contents: the contents of the file
st_size: file size; only valid if contents=None
create_missing_dirs: if True, auto create missing directories
apply_umask: whether or not the current umask must be applied on st_mode
inode: inode of the file
Returns:
the newly created FakeFile object
Raises:
IOError: if the file already exists
IOError: if the containing directory is required and missing
"""
file_path = self.NormalizePath(file_path)
if self.Exists(file_path):
raise IOError(errno.EEXIST,
'File already exists in fake filesystem',
file_path)
parent_directory, new_file = self.SplitPath(file_path)
if not parent_directory:
parent_directory = self.cwd
if not self.Exists(parent_directory):
if not create_missing_dirs:
raise IOError(errno.ENOENT, 'No such fake directory', parent_directory)
self.CreateDirectory(parent_directory)
if apply_umask:
st_mode &= ~self.umask
file_object = FakeFile(new_file, st_mode, contents)
file_object.SetIno(inode)
self.AddObject(parent_directory, file_object)
# set the size if st_size is given
if not contents and st_size is not None:
try:
file_object.SetLargeFileSize(st_size)
except IOError:
self.RemoveObject(file_path)
raise
return file_object
def CreateLink(self, file_path, link_target):
"""Creates the specified symlink, pointed at the specified link target.
Args:
file_path: path to the symlink to create
link_target: the target of the symlink
Returns:
the newly created FakeFile object
Raises:
IOError: if the file already exists
"""
resolved_file_path = self.ResolvePath(file_path)
return self.CreateFile(resolved_file_path, st_mode=stat.S_IFLNK | PERM_DEF,
contents=link_target)
def __str__(self):
return str(self.root)
class FakePathModule(object):
"""Faked os.path module replacement.
FakePathModule should *only* be instantiated by FakeOsModule. See the
FakeOsModule docstring for details.
"""
_OS_PATH_COPY = CopyModule(os.path)
def __init__(self, filesystem, os_module=None):
"""Init.
Args:
filesystem: FakeFilesystem used to provide file system information
os_module: (deprecated) FakeOsModule to assign to self.os
"""
self.filesystem = filesystem
self._os_path = self._OS_PATH_COPY
if os_module is None:
warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
stacklevel=2)
self._os_path.os = self.os = os_module
self.sep = self.filesystem.path_separator
def exists(self, path):
"""Determines whether the file object exists within the fake filesystem.
Args:
path: path to the file object
Returns:
bool (if file exists)
"""
return self.filesystem.Exists(path)
def lexists(self, path):
"""Test whether a path exists. Returns True for broken symbolic links.
Args:
path: path to the symlnk object
Returns:
bool (if file exists)
"""
return self.exists(path) or self.islink(path)
def getsize(self, path):
"""Return the file object size in bytes.
Args:
path: path to the file object
Returns:
file size in bytes
"""
file_obj = self.filesystem.GetObject(path)
return file_obj.st_size
def _istype(self, path, st_flag):
"""Helper function to implement isdir(), islink(), etc.
See the stat(2) man page for valid stat.S_I* flag values
Args:
path: path to file to stat and test
st_flag: the stat.S_I* flag checked for the file's st_mode
Returns:
boolean (the st_flag is set in path's st_mode)
Raises:
TypeError: if path is None
"""
if path is None:
raise TypeError
try:
obj = self.filesystem.ResolveObject(path)
if obj:
return stat.S_IFMT(obj.st_mode) == st_flag
except IOError:
return False
return False
def isabs(self, path):
if self.filesystem.path_separator == os.path.sep:
# Pass through to os.path.isabs, which on Windows has special
# handling for a leading drive letter.
return self._os_path.isabs(path)
else:
return path.startswith(self.filesystem.path_separator)
def isdir(self, path):
"""Determines if path identifies a directory."""
return self._istype(path, stat.S_IFDIR)
def isfile(self, path):
"""Determines if path identifies a regular file."""
return self._istype(path, stat.S_IFREG)
def islink(self, path):
"""Determines if path identifies a symbolic link.
Args:
path: path to filesystem object.
Returns:
boolean (the st_flag is set in path's st_mode)
Raises:
TypeError: if path is None
"""
if path is None:
raise TypeError
try:
link_obj = self.filesystem.LResolveObject(path)
return stat.S_IFMT(link_obj.st_mode) == stat.S_IFLNK
except IOError:
return False
except KeyError:
return False
return False
def getmtime(self, path):
"""Returns the mtime of the file."""
try:
file_obj = self.filesystem.GetObject(path)
except IOError as e:
raise OSError(errno.ENOENT, str(e))
return file_obj.st_mtime
def abspath(self, path):
"""Return the absolute version of a path."""
if not self.isabs(path):
if sys.version_info < (3, 0) and isinstance(path, unicode):
cwd = self.os.getcwdu()
else:
cwd = self.os.getcwd()
path = self.join(cwd, path)
return self.normpath(path)
def join(self, *p):
"""Returns the completed path with a separator of the parts."""
return self.filesystem.JoinPaths(*p)
def normpath(self, path):
"""Normalize path, eliminating double slashes, etc."""
return self.filesystem.CollapsePath(path)
if _is_windows:
def relpath(self, path, start=None):
"""ntpath.relpath() needs the cwd passed in the start argument."""
if start is None:
start = self.filesystem.cwd
path = self._os_path.relpath(path, start)
return path.replace(self._os_path.sep, self.filesystem.path_separator)
realpath = abspath
def __getattr__(self, name):
"""Forwards any non-faked calls to os.path."""
return self._os_path.__dict__[name]
class FakeOsModule(object):
"""Uses FakeFilesystem to provide a fake os module replacement.
Do not create os.path separately from os, as there is a necessary circular
dependency between os and os.path to replicate the behavior of the standard
Python modules. What you want to do is to just let FakeOsModule take care of
os.path setup itself.
# You always want to do this.
filesystem = fake_filesystem.FakeFilesystem()
my_os_module = fake_filesystem.FakeOsModule(filesystem)
"""
def __init__(self, filesystem, os_path_module=None):
"""Also exposes self.path (to fake os.path).
Args:
filesystem: FakeFilesystem used to provide file system information
os_path_module: (deprecated) optional FakePathModule instance
"""
self.filesystem = filesystem
self.sep = filesystem.path_separator
self._os_module = os
if os_path_module is None:
self.path = FakePathModule(self.filesystem, self)
else:
warnings.warn(FAKE_PATH_MODULE_DEPRECATION, DeprecationWarning,
stacklevel=2)
self.path = os_path_module
if sys.version_info < (3, 0):
self.fdopen = self._fdopen_ver2
else:
self.fdopen = self._fdopen
def _fdopen(self, *args, **kwargs):
"""Redirector to open() builtin function.
Args:
*args: pass through args
**kwargs: pass through kwargs
Returns:
File object corresponding to file_des.
Raises:
TypeError: if file descriptor is not an integer.
"""
if not isinstance(args[0], int):
raise TypeError('an integer is required')
return FakeFileOpen(self.filesystem)(*args, **kwargs)
def _fdopen_ver2(self, file_des, mode='r', bufsize=None):
"""Returns an open file object connected to the file descriptor file_des.
Args:
file_des: An integer file descriptor for the file object requested.
mode: additional file flags. Currently checks to see if the mode matches
the mode of the requested file object.
bufsize: ignored. (Used for signature compliance with __builtin__.fdopen)
Returns:
File object corresponding to file_des.
Raises:
OSError: if bad file descriptor or incompatible mode is given.
TypeError: if file descriptor is not an integer.
"""
if not isinstance(file_des, int):
raise TypeError('an integer is required')
try:
return FakeFileOpen(self.filesystem).Call(file_des, mode=mode)
except IOError as e:
raise OSError(e)
def open(self, file_path, flags, mode=None):
"""Returns the file descriptor for a FakeFile.
WARNING: This implementation only implements creating a file. Please fill
out the remainder for your needs.
Args:
file_path: the path to the file
flags: low-level bits to indicate io operation
mode: bits to define default permissions
Returns:
A file descriptor.
Raises:
OSError: if the path cannot be found
ValueError: if invalid mode is given
NotImplementedError: if an unsupported flag is passed in
"""
if flags & os.O_CREAT:
fake_file = FakeFileOpen(self.filesystem)(file_path, 'w')
if mode:
self.chmod(file_path, mode)
return fake_file.fileno()
else:
raise NotImplementedError('FakeOsModule.open')
def close(self, file_des):
"""Closes a file descriptor.
Args:
file_des: An integer file descriptor for the file object requested.
Raises:
OSError: bad file descriptor.
TypeError: if file descriptor is not an integer.
"""
fh = self.filesystem.GetOpenFile(file_des)
fh.close()
def read(self, file_des, num_bytes):
"""Reads number of bytes from a file descriptor, returns bytes read.
Args:
file_des: An integer file descriptor for the file object requested.
num_bytes: Number of bytes to read from file.
Returns:
Bytes read from file.
Raises:
OSError: bad file descriptor.
TypeError: if file descriptor is not an integer.
"""
fh = self.filesystem.GetOpenFile(file_des)
return fh.read(num_bytes)
def write(self, file_des, contents):
"""Writes string to file descriptor, returns number of bytes written.
Args:
file_des: An integer file descriptor for the file object requested.
contents: String of bytes to write to file.
Returns:
Number of bytes written.
Raises:
OSError: bad file descriptor.
TypeError: if file descriptor is not an integer.
"""
fh = self.filesystem.GetOpenFile(file_des)
fh.write(contents)
fh.flush()
return len(contents)
def fstat(self, file_des):
"""Returns the os.stat-like tuple for the FakeFile object of file_des.
Args:
file_des: file descriptor of filesystem object to retrieve
Returns:
the os.stat_result object corresponding to entry_path
Raises:
OSError: if the filesystem object doesn't exist.
"""
# stat should return the tuple representing return value of os.stat
stats = self.filesystem.GetOpenFile(file_des).GetObject()
st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
stats.st_nlink, stats.st_uid, stats.st_gid,
stats.st_size, stats.st_atime,
stats.st_mtime, stats.st_ctime))
return st_obj
def _ConfirmDir(self, target_directory):
"""Tests that the target is actually a directory, raising OSError if not.
Args:
target_directory: path to the target directory within the fake
filesystem
Returns:
the FakeFile object corresponding to target_directory
Raises:
OSError: if the target is not a directory
"""
try:
directory = self.filesystem.GetObject(target_directory)
except IOError as e:
raise OSError(e.errno, e.strerror, target_directory)
if not directory.st_mode & stat.S_IFDIR:
raise OSError(errno.ENOTDIR,
'Fake os module: not a directory',
target_directory)
return directory
def umask(self, new_mask):
"""Change the current umask.
Args:
new_mask: An integer.
Returns:
The old mask.
Raises:
TypeError: new_mask is of an invalid type.
"""
if not isinstance(new_mask, int):
raise TypeError('an integer is required')
old_umask = self.filesystem.umask
self.filesystem.umask = new_mask
return old_umask
def chdir(self, target_directory):
"""Change current working directory to target directory.
Args:
target_directory: path to new current working directory
Raises:
OSError: if user lacks permission to enter the argument directory or if
the target is not a directory
"""
target_directory = self.filesystem.ResolvePath(target_directory)
self._ConfirmDir(target_directory)
directory = self.filesystem.GetObject(target_directory)
# A full implementation would check permissions all the way up the tree.
if not directory.st_mode | PERM_EXE:
raise OSError(errno.EACCES, 'Fake os module: permission denied',
directory)
self.filesystem.cwd = target_directory
def getcwd(self):
"""Return current working directory."""
return self.filesystem.cwd
def getcwdu(self):
"""Return current working directory. Deprecated in Python 3."""
if sys.version_info >= (3, 0):
raise AttributeError('no attribute getcwdu')
return unicode(self.filesystem.cwd)
def listdir(self, target_directory):
"""Returns a sorted list of filenames in target_directory.
Args:
target_directory: path to the target directory within the fake
filesystem
Returns:
a sorted list of file names within the target directory
Raises:
OSError: if the target is not a directory
"""
target_directory = self.filesystem.ResolvePath(target_directory)
directory = self._ConfirmDir(target_directory)
return sorted(directory.contents)
def _ClassifyDirectoryContents(self, root):
"""Classify contents of a directory as files/directories.
Args:
root: (str) Directory to examine.
Returns:
(tuple) A tuple consisting of three values: the directory examined, a
list containing all of the directory entries, and a list containing all
of the non-directory entries. (This is the same format as returned by
the os.walk generator.)
Raises:
Nothing on its own, but be ready to catch exceptions generated by
underlying mechanisms like os.listdir.
"""
dirs = []
files = []
for entry in self.listdir(root):
if self.path.isdir(self.path.join(root, entry)):
dirs.append(entry)
else:
files.append(entry)
return (root, dirs, files)
def walk(self, top, topdown=True, onerror=None):
"""Performs an os.walk operation over the fake filesystem.
Args:
top: root directory from which to begin walk
topdown: determines whether to return the tuples with the root as the
first entry (True) or as the last, after all the child directory
tuples (False)
onerror: if not None, function which will be called to handle the
os.error instance provided when os.listdir() fails
Yields:
(path, directories, nondirectories) for top and each of its
subdirectories. See the documentation for the builtin os module for
further details.
"""
top = self.path.normpath(top)
try:
top_contents = self._ClassifyDirectoryContents(top)
except OSError as e:
top_contents = None
if onerror is not None:
onerror(e)
if top_contents is not None:
if topdown:
yield top_contents
for directory in top_contents[1]:
for contents in self.walk(self.path.join(top, directory),
topdown=topdown, onerror=onerror):
yield contents
if not topdown:
yield top_contents
def readlink(self, path):
"""Reads the target of a symlink.
Args:
path: symlink to read the target of
Returns:
the string representing the path to which the symbolic link points.
Raises:
TypeError: if path is None
OSError: (with errno=ENOENT) if path is not a valid path, or
(with errno=EINVAL) if path is valid, but is not a symlink
"""
if path is None:
raise TypeError
try:
link_obj = self.filesystem.LResolveObject(path)
except IOError:
raise OSError(errno.ENOENT, 'Fake os module: path does not exist', path)
if stat.S_IFMT(link_obj.st_mode) != stat.S_IFLNK:
raise OSError(errno.EINVAL, 'Fake os module: not a symlink', path)
return link_obj.contents
def stat(self, entry_path):
"""Returns the os.stat-like tuple for the FakeFile object of entry_path.
Args:
entry_path: path to filesystem object to retrieve
Returns:
the os.stat_result object corresponding to entry_path
Raises:
OSError: if the filesystem object doesn't exist.
"""
# stat should return the tuple representing return value of os.stat
try:
stats = self.filesystem.ResolveObject(entry_path)
st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
stats.st_nlink, stats.st_uid, stats.st_gid,
stats.st_size, stats.st_atime,
stats.st_mtime, stats.st_ctime))
return st_obj
except IOError as io_error:
raise OSError(io_error.errno, io_error.strerror, entry_path)
def lstat(self, entry_path):
"""Returns the os.stat-like tuple for entry_path, not following symlinks.
Args:
entry_path: path to filesystem object to retrieve
Returns:
the os.stat_result object corresponding to entry_path
Raises:
OSError: if the filesystem object doesn't exist.
"""
# stat should return the tuple representing return value of os.stat
try:
stats = self.filesystem.LResolveObject(entry_path)
st_obj = os.stat_result((stats.st_mode, stats.st_ino, stats.st_dev,
stats.st_nlink, stats.st_uid, stats.st_gid,
stats.st_size, stats.st_atime,
stats.st_mtime, stats.st_ctime))
return st_obj
except IOError as io_error:
raise OSError(io_error.errno, io_error.strerror, entry_path)
def remove(self, path):
"""Removes the FakeFile object representing the specified file."""
path = self.filesystem.NormalizePath(path)
if self.path.isdir(path) and not self.path.islink(path):
raise OSError(errno.EISDIR, "Is a directory: '%s'" % path)
try:
self.filesystem.RemoveObject(path)
except IOError as e:
raise OSError(e.errno, e.strerror, e.filename)
# As per the documentation unlink = remove.
unlink = remove
def rename(self, old_file, new_file):
"""Adds a FakeFile object at new_file containing contents of old_file.
Also removes the FakeFile object for old_file, and replaces existing
new_file object, if one existed.
Args:
old_file: path to filesystem object to rename
new_file: path to where the filesystem object will live after this call
Raises:
OSError: if old_file does not exist.
IOError: if dirname(new_file) does not exist
"""
old_file = self.filesystem.NormalizePath(old_file)
new_file = self.filesystem.NormalizePath(new_file)
if not self.filesystem.Exists(old_file):
raise OSError(errno.ENOENT,
'Fake os object: can not rename nonexistent file '
'with name',
old_file)
if self.filesystem.Exists(new_file):
if old_file == new_file:
return None # Nothing to do here.
else:
self.remove(new_file)
old_dir, old_name = self.path.split(old_file)
new_dir, new_name = self.path.split(new_file)
if not self.filesystem.Exists(new_dir):
raise IOError(errno.ENOENT, 'No such fake directory', new_dir)
old_dir_object = self.filesystem.ResolveObject(old_dir)
old_object = old_dir_object.GetEntry(old_name)
old_object_mtime = old_object.st_mtime
new_dir_object = self.filesystem.ResolveObject(new_dir)
if old_object.st_mode & stat.S_IFDIR:
old_object.name = new_name
new_dir_object.AddEntry(old_object)
old_dir_object.RemoveEntry(old_name)
else:
self.filesystem.CreateFile(new_file,
st_mode=old_object.st_mode,
contents=old_object.contents,
create_missing_dirs=False)
self.remove(old_file)
new_object = self.filesystem.GetObject(new_file)
new_object.SetMTime(old_object_mtime)
self.chown(new_file, old_object.st_uid, old_object.st_gid)
def rmdir(self, target_directory):
"""Remove a leaf Fake directory.
Args:
target_directory: (str) Name of directory to remove.
Raises:
OSError: if target_directory does not exist or is not a directory,
or as per FakeFilesystem.RemoveObject. Cannot remove '.'.
"""
if target_directory == '.':
raise OSError(errno.EINVAL, 'Invalid argument: \'.\'')
target_directory = self.filesystem.NormalizePath(target_directory)
if self._ConfirmDir(target_directory):
if self.listdir(target_directory):
raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
target_directory)
try:
self.filesystem.RemoveObject(target_directory)
except IOError as e:
raise OSError(e.errno, e.strerror, e.filename)
def removedirs(self, target_directory):
"""Remove a leaf Fake directory and all empty intermediate ones."""
target_directory = self.filesystem.NormalizePath(target_directory)
directory = self._ConfirmDir(target_directory)
if directory.contents:
raise OSError(errno.ENOTEMPTY, 'Fake Directory not empty',
self.path.basename(target_directory))
else:
self.rmdir(target_directory)
head, tail = self.path.split(target_directory)
if not tail:
head, tail = self.path.split(head)
while head and tail:
head_dir = self._ConfirmDir(head)
if head_dir.contents:
break
self.rmdir(head)
head, tail = self.path.split(head)
def mkdir(self, dir_name, mode=PERM_DEF):
"""Create a leaf Fake directory.
Args:
dir_name: (str) Name of directory to create. Relative paths are assumed
to be relative to '/'.
mode: (int) Mode to create directory with. This argument defaults to
0o777. The umask is applied to this mode.
Raises:
OSError: if the directory name is invalid or parent directory is read only
or as per FakeFilesystem.AddObject.
"""
if dir_name.endswith(self.sep):
dir_name = dir_name[:-1]
parent_dir, _ = self.path.split(dir_name)
if parent_dir:
base_dir = self.path.normpath(parent_dir)
if parent_dir.endswith(self.sep + '..'):
base_dir, unused_dotdot, _ = parent_dir.partition(self.sep + '..')
if not self.filesystem.Exists(base_dir):
raise OSError(errno.ENOENT, 'No such fake directory', base_dir)
dir_name = self.filesystem.NormalizePath(dir_name)
if self.filesystem.Exists(dir_name):
raise OSError(errno.EEXIST, 'Fake object already exists', dir_name)
head, tail = self.path.split(dir_name)
directory_object = self.filesystem.GetObject(head)
if not directory_object.st_mode & PERM_WRITE:
raise OSError(errno.EACCES, 'Permission Denied', dir_name)
self.filesystem.AddObject(
head, FakeDirectory(tail, mode & ~self.filesystem.umask))
def makedirs(self, dir_name, mode=PERM_DEF):
"""Create a leaf Fake directory + create any non-existent parent dirs.
Args:
dir_name: (str) Name of directory to create.
mode: (int) Mode to create directory (and any necessary parent
directories) with. This argument defaults to 0o777. The umask is
applied to this mode.
Raises:
OSError: if the directory already exists or as per
FakeFilesystem.CreateDirectory
"""
dir_name = self.filesystem.NormalizePath(dir_name)
path_components = self.filesystem.GetPathComponents(dir_name)
# Raise a permission denied error if the first existing directory is not
# writeable.
current_dir = self.filesystem.root
for component in path_components:
if component not in current_dir.contents:
if not current_dir.st_mode & PERM_WRITE:
raise OSError(errno.EACCES, 'Permission Denied', dir_name)
else:
break
else:
current_dir = current_dir.contents[component]
self.filesystem.CreateDirectory(dir_name, mode & ~self.filesystem.umask)
def access(self, path, mode):
"""Check if a file exists and has the specified permissions.
Args:
path: (str) Path to the file.
mode: (int) Permissions represented as a bitwise-OR combination of
os.F_OK, os.R_OK, os.W_OK, and os.X_OK.
Returns:
boolean, True if file is accessible, False otherwise
"""
try:
st = self.stat(path)
except OSError as os_error:
if os_error.errno == errno.ENOENT:
return False
raise
return (mode & ((st.st_mode >> 6) & 7)) == mode
def chmod(self, path, mode):
"""Change the permissions of a file as encoded in integer mode.
Args:
path: (str) Path to the file.
mode: (int) Permissions
"""
try:
file_object = self.filesystem.GetObject(path)
except IOError as io_error:
if io_error.errno == errno.ENOENT:
raise OSError(errno.ENOENT,
'No such file or directory in fake filesystem',
path)
raise
file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) |
(mode & PERM_ALL))
file_object.st_ctime = int(time.time())
def utime(self, path, times):
"""Change the access and modified times of a file.
Args:
path: (str) Path to the file.
times: 2-tuple of numbers, of the form (atime, mtime) which is used to set
the access and modified times, respectively. If None, file's access
and modified times are set to the current time.
Raises:
TypeError: If anything other than integers is specified in passed tuple or
number of elements in the tuple is not equal to 2.
"""
try:
file_object = self.filesystem.GetObject(path)
except IOError as io_error:
if io_error.errno == errno.ENOENT:
raise OSError(errno.ENOENT,
'No such file or directory in fake filesystem',
path)
raise
if times is None:
file_object.st_atime = int(time.time())
file_object.st_mtime = int(time.time())
else:
if len(times) != 2:
raise TypeError('utime() arg 2 must be a tuple (atime, mtime)')
for t in times:
if not isinstance(t, (int, float)):
raise TypeError('an integer is required')
file_object.st_atime = times[0]
file_object.st_mtime = times[1]
def chown(self, path, uid, gid):
"""Set ownership of a faked file.
Args:
path: (str) Path to the file or directory.
uid: (int) Numeric uid to set the file or directory to.
gid: (int) Numeric gid to set the file or directory to.
"""
try:
file_object = self.filesystem.GetObject(path)
except IOError as io_error:
if io_error.errno == errno.ENOENT:
raise OSError(errno.ENOENT,
'No such file or directory in fake filesystem',
path)
raise
if uid != -1:
file_object.st_uid = uid
if gid != -1:
file_object.st_gid = gid
def mknod(self, filename, mode=None, device=None):
"""Create a filesystem node named 'filename'.
Does not support device special files or named pipes as the real os
module does.
Args:
filename: (str) Name of the file to create
mode: (int) permissions to use and type of file to be created.
Default permissions are 0o666. Only the stat.S_IFREG file type
is supported by the fake implementation. The umask is applied
to this mode.
device: not supported in fake implementation
Raises:
OSError: if called with unsupported options or the file can not be
created.
"""
if mode is None:
mode = stat.S_IFREG | PERM_DEF_FILE
if device or not mode & stat.S_IFREG:
raise OSError(errno.EINVAL,
'Fake os mknod implementation only supports '
'regular files.')
head, tail = self.path.split(filename)
if not tail:
if self.filesystem.Exists(head):
raise OSError(errno.EEXIST, 'Fake filesystem: %s: %s' % (
os.strerror(errno.EEXIST), filename))
raise OSError(errno.ENOENT, 'Fake filesystem: %s: %s' % (
os.strerror(errno.ENOENT), filename))
if tail == '.' or tail == '..' or self.filesystem.Exists(filename):
raise OSError(errno.EEXIST, 'Fake fileystem: %s: %s' % (
os.strerror(errno.EEXIST), filename))
try:
self.filesystem.AddObject(head, FakeFile(tail,
mode & ~self.filesystem.umask))
except IOError:
raise OSError(errno.ENOTDIR, 'Fake filesystem: %s: %s' % (
os.strerror(errno.ENOTDIR), filename))
def symlink(self, link_target, path):
"""Creates the specified symlink, pointed at the specified link target.
Args:
link_target: the target of the symlink
path: path to the symlink to create
Returns:
None
Raises:
IOError: if the file already exists
"""
self.filesystem.CreateLink(path, link_target)
# pylint: disable-msg=C6002
# TODO: Link doesn't behave like os.link, this needs to be fixed properly.
link = symlink
def __getattr__(self, name):
"""Forwards any unfaked calls to the standard os module."""
return getattr(self._os_module, name)
class FakeFileOpen(object):
"""Faked file() and open() function replacements.
Returns FakeFile objects in a FakeFilesystem in place of the file()
or open() function.
"""
def __init__(self, filesystem, delete_on_close=False):
"""init.
Args:
filesystem: FakeFilesystem used to provide file system information
delete_on_close: optional boolean, deletes file on close()
"""
self.filesystem = filesystem
self._delete_on_close = delete_on_close
def __call__(self, *args, **kwargs):
"""Redirects calls to file() or open() to appropriate method."""
if sys.version_info < (3, 0):
return self._call_ver2(*args, **kwargs)
else:
return self.Call(*args, **kwargs)
def _call_ver2(self, file_path, mode='r', buffering=-1, flags=None):
"""Limits args of open() or file() for Python 2.x versions."""
# Backwards compatibility, mode arg used to be named flags
mode = flags or mode
return self.Call(file_path, mode, buffering)
def Call(self, file_, mode='r', buffering=-1, encoding=None,
errors=None, newline=None, closefd=True, opener=None):
"""Returns a StringIO object with the contents of the target file object.
Args:
file_: path to target file or a file descriptor
mode: additional file modes. All r/w/a r+/w+/a+ modes are supported.
't', and 'U' are ignored, e.g., 'wU' is treated as 'w'. 'b' sets
binary mode, no end of line translations in StringIO.
buffering: ignored. (Used for signature compliance with __builtin__.open)
encoding: ignored, strings have no encoding
errors: ignored, this relates to encoding
newline: controls universal newlines, passed to StringIO object
closefd: if a file descriptor rather than file name is passed, and set
to false, then the file descriptor is kept open when file is closed
opener: not supported
Returns:
a StringIO object containing the contents of the target file
Raises:
IOError: if the target object is a directory, the path is invalid or
permission is denied.
"""
orig_modes = mode # Save original mdoes for error messages.
# Binary mode for non 3.x or set by mode
binary = sys.version_info < (3, 0) or 'b' in mode
# Normalize modes. Ignore 't' and 'U'.
mode = mode.replace('t', '').replace('b', '')
mode = mode.replace('rU', 'r').replace('U', 'r')
if mode not in _OPEN_MODE_MAP:
raise IOError('Invalid mode: %r' % orig_modes)
must_exist, need_read, need_write, truncate, append = _OPEN_MODE_MAP[mode]
file_object = None
filedes = None
# opening a file descriptor
if isinstance(file_, int):
filedes = file_
file_object = self.filesystem.GetOpenFile(filedes).GetObject()
file_path = file_object.name
else:
file_path = file_
real_path = self.filesystem.ResolvePath(file_path)
if self.filesystem.Exists(file_path):
file_object = self.filesystem.GetObjectFromNormalizedPath(real_path)
closefd = True
if file_object:
if ((need_read and not file_object.st_mode & PERM_READ) or
(need_write and not file_object.st_mode & PERM_WRITE)):
raise IOError(errno.EACCES, 'Permission denied', file_path)
if need_write:
file_object.st_ctime = int(time.time())
if truncate:
file_object.SetContents('')
else:
if must_exist:
raise IOError(errno.ENOENT, 'No such file or directory', file_path)
file_object = self.filesystem.CreateFile(
real_path, create_missing_dirs=False, apply_umask=True)
if file_object.st_mode & stat.S_IFDIR:
raise IOError(errno.EISDIR, 'Fake file object: is a directory', file_path)
class FakeFileWrapper(object):
"""Wrapper for a StringIO object for use by a FakeFile object.
If the wrapper has any data written to it, it will propagate to
the FakeFile object on close() or flush().
"""
if sys.version_info < (3, 0):
_OPERATION_ERROR = IOError
else:
_OPERATION_ERROR = io.UnsupportedOperation
def __init__(self, file_object, update=False, read=False, append=False,
delete_on_close=False, filesystem=None, newline=None,
binary=True, closefd=True):
self._file_object = file_object
self._append = append
self._read = read
self._update = update
self._closefd = closefd
self._file_epoch = file_object.epoch
contents = file_object.contents
newline_arg = {} if binary else {'newline': newline}
io_class = io.StringIO
# For Python 3, files opened as binary only read/write byte contents.
if sys.version_info >= (3, 0) and binary:
io_class = io.BytesIO
if contents and isinstance(contents, str):
contents = bytes(contents, 'ascii')
if contents:
if update:
self._io = io_class(**newline_arg)
self._io.write(contents)
if not append:
self._io.seek(0)
else:
self._read_whence = 0
if read:
self._read_seek = 0
else:
self._read_seek = self._io.tell()
else:
self._io = io_class(contents, **newline_arg)
else:
self._io = io_class(**newline_arg)
self._read_whence = 0
self._read_seek = 0
if delete_on_close:
assert filesystem, 'delete_on_close=True requires filesystem='
self._filesystem = filesystem
self._delete_on_close = delete_on_close
# override, don't modify FakeFile.name, as FakeFilesystem expects
# it to be the file name only, no directories.
self.name = file_object.opened_as
def __enter__(self):
"""To support usage of this fake file with the 'with' statement."""
return self
def __exit__(self, type, value, traceback): # pylint: disable-msg=W0622
"""To support usage of this fake file with the 'with' statement."""
self.close()
def GetObject(self):
"""Returns FakeFile object that is wrapped by current class."""
return self._file_object
def fileno(self):
"""Returns file descriptor of file object."""
return self.filedes
def close(self):
"""File close."""
if self._update:
self._file_object.SetContents(self._io.getvalue())
if self._closefd:
self._filesystem.CloseOpenFile(self)
if self._delete_on_close:
self._filesystem.RemoveObject(self.name)
def flush(self):
"""Flush file contents to 'disk'."""
if self._update:
self._file_object.SetContents(self._io.getvalue())
self._file_epoch = self._file_object.epoch
def seek(self, offset, whence=0):
"""Move read/write pointer in 'file'."""
if not self._append:
self._io.seek(offset, whence)
else:
self._read_seek = offset
self._read_whence = whence
def tell(self):
"""Return the file's current position.
Returns:
int, file's current position in bytes.
"""
if not self._append:
return self._io.tell()
if self._read_whence:
write_seek = self._io.tell()
self._io.seek(self._read_seek, self._read_whence)
self._read_seek = self._io.tell()
self._read_whence = 0
self._io.seek(write_seek)
return self._read_seek
def _UpdateStringIO(self):
"""Updates the StringIO with changes to the file object contents."""
if self._file_epoch == self._file_object.epoch:
return
whence = self._io.tell()
self._io.seek(0)
self._io.truncate()
self._io.write(self._file_object.contents)
self._io.seek(whence)
self._file_epoch = self._file_object.epoch
def _ReadWrappers(self, name):
"""Wrap a StringIO attribute in a read wrapper.
Returns a read_wrapper which tracks our own read pointer since the
StringIO object has no concept of a different read and write pointer.
Args:
name: the name StringIO attribute to wrap. Should be a read call.
Returns:
either a read_error or read_wrapper function.
"""
io_attr = getattr(self._io, name)
def read_wrapper(*args, **kwargs):
"""Wrap all read calls to the StringIO Object.
We do this to track the read pointer separate from the write
pointer. Anything that wants to read from the StringIO object
while we're in append mode goes through this.
Args:
*args: pass through args
**kwargs: pass through kwargs
Returns:
Wrapped StringIO object method
"""
self._io.seek(self._read_seek, self._read_whence)
ret_value = io_attr(*args, **kwargs)
self._read_seek = self._io.tell()
self._read_whence = 0
self._io.seek(0, 2)
return ret_value
return read_wrapper
def _OtherWrapper(self, name):
"""Wrap a StringIO attribute in an other_wrapper.
Args:
name: the name of the StringIO attribute to wrap.
Returns:
other_wrapper which is described below.
"""
io_attr = getattr(self._io, name)
def other_wrapper(*args, **kwargs):
"""Wrap all other calls to the StringIO Object.
We do this to track changes to the write pointer. Anything that
moves the write pointer in a file open for appending should move
the read pointer as well.
Args:
*args: pass through args
**kwargs: pass through kwargs
Returns:
Wrapped StringIO object method
"""
write_seek = self._io.tell()
ret_value = io_attr(*args, **kwargs)
if write_seek != self._io.tell():
self._read_seek = self._io.tell()
self._read_whence = 0
self._file_object.st_size += (self._read_seek - write_seek)
return ret_value
return other_wrapper
def Size(self):
return self._file_object.st_size
def __getattr__(self, name):
if self._file_object.IsLargeFile():
raise FakeLargeFileIoException(file_path)
# errors on called method vs. open mode
if not self._read and name.startswith('read'):
def read_error(*args, **kwargs):
"""Throw an error unless the argument is zero."""
if args and args[0] == 0:
return ''
raise self._OPERATION_ERROR('File is not open for reading.')
return read_error
if not self._update and (name.startswith('write')
or name == 'truncate'):
def write_error(*args, **kwargs):
"""Throw an error."""
raise self._OPERATION_ERROR('File is not open for writing.')
return write_error
if name.startswith('read'):
self._UpdateStringIO()
if self._append:
if name.startswith('read'):
return self._ReadWrappers(name)
else:
return self._OtherWrapper(name)
return getattr(self._io, name)
def __iter__(self):
if not self._read:
raise self._OPERATION_ERROR('File is not open for reading')
return self._io.__iter__()
# if you print obj.name, the argument to open() must be printed. Not the
# abspath, not the filename, but the actual argument.
file_object.opened_as = file_path
fakefile = FakeFileWrapper(file_object,
update=need_write,
read=need_read,
append=append,
delete_on_close=self._delete_on_close,
filesystem=self.filesystem,
newline=newline,
binary=binary,
closefd=closefd)
if filedes is not None:
fakefile.filedes = filedes
else:
fakefile.filedes = self.filesystem.AddOpenFile(fakefile)
return fakefile
def _RunDoctest():
# pylint: disable-msg=C6204
import doctest
import fake_filesystem # pylint: disable-msg=W0406
return doctest.testmod(fake_filesystem)
if __name__ == '__main__':
_RunDoctest()