| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """A fake implementation for the `scandir` function working with |
| FakeFilesystem. |
| Works with both the function integrated into the `os` module since Python 3.5 |
| and the standalone function available in the standalone `scandir` python |
| package. |
| """ |
| import os |
| import sys |
| |
| from pyfakefs.extra_packages import use_scandir_package |
| from pyfakefs.helpers import to_string |
| |
| if sys.version_info >= (3, 6): |
| BaseClass = os.PathLike |
| else: |
| BaseClass = object |
| |
| |
| class DirEntry(BaseClass): |
| """Emulates os.DirEntry. Note that we did not enforce keyword only |
| arguments.""" |
| |
| def __init__(self, filesystem): |
| """Initialize the dir entry with unset values. |
| |
| Args: |
| filesystem: the fake filesystem used for implementation. |
| """ |
| self._filesystem = filesystem |
| self.name = '' |
| self.path = '' |
| self._abspath = '' |
| self._inode = None |
| self._islink = False |
| self._isdir = False |
| self._statresult = None |
| self._statresult_symlink = None |
| |
| def inode(self): |
| """Return the inode number of the entry.""" |
| if self._inode is None: |
| self.stat(follow_symlinks=False) |
| return self._inode |
| |
| def is_dir(self, follow_symlinks=True): |
| """Return True if this entry is a directory entry. |
| |
| Args: |
| follow_symlinks: If True, also return True if this entry is a |
| symlink pointing to a directory. |
| |
| Returns: |
| True if this entry is an existing directory entry, or if |
| follow_symlinks is set, and this entry points to an existing |
| directory entry. |
| """ |
| return self._isdir and (follow_symlinks or not self._islink) |
| |
| def is_file(self, follow_symlinks=True): |
| """Return True if this entry is a regular file entry. |
| |
| Args: |
| follow_symlinks: If True, also return True if this entry is a |
| symlink pointing to a regular file. |
| |
| Returns: |
| True if this entry is an existing file entry, or if |
| follow_symlinks is set, and this entry points to an existing |
| file entry. |
| """ |
| return not self._isdir and (follow_symlinks or not self._islink) |
| |
| def is_symlink(self): |
| """Return True if this entry is a symbolic link (even if broken).""" |
| return self._islink |
| |
| def stat(self, follow_symlinks=True): |
| """Return a stat_result object for this entry. |
| |
| Args: |
| follow_symlinks: If False and the entry is a symlink, return the |
| result for the symlink, otherwise for the object it points to. |
| """ |
| if follow_symlinks: |
| if self._statresult_symlink is None: |
| file_object = self._filesystem.resolve(self._abspath) |
| self._statresult_symlink = file_object.stat_result.copy() |
| if self._filesystem.is_windows_fs: |
| self._statresult_symlink.st_nlink = 0 |
| return self._statresult_symlink |
| |
| if self._statresult is None: |
| file_object = self._filesystem.lresolve(self._abspath) |
| self._inode = file_object.st_ino |
| self._statresult = file_object.stat_result.copy() |
| if self._filesystem.is_windows_fs: |
| self._statresult.st_nlink = 0 |
| return self._statresult |
| |
| if sys.version_info >= (3, 6): |
| def __fspath__(self): |
| return self.path |
| |
| |
| class ScanDirIter: |
| """Iterator for DirEntry objects returned from `scandir()` |
| function.""" |
| |
| def __init__(self, filesystem, path): |
| self.filesystem = filesystem |
| if isinstance(path, int): |
| if not use_scandir_package and ( |
| sys.version_info < (3, 7) or |
| self.filesystem.is_windows_fs): |
| raise NotImplementedError( |
| 'scandir does not support file descriptor ' |
| 'path argument') |
| self.abspath = self.filesystem.absnormpath( |
| self.filesystem.get_open_file(path).get_object().path) |
| self.path = '' |
| else: |
| self.abspath = self.filesystem.absnormpath(path) |
| self.path = to_string(path) |
| contents = self.filesystem.confirmdir(self.abspath).contents |
| self.contents_iter = iter(contents) |
| |
| def __iter__(self): |
| return self |
| |
| def __next__(self): |
| entry = self.contents_iter.__next__() |
| dir_entry = DirEntry(self.filesystem) |
| dir_entry.name = entry |
| dir_entry.path = self.filesystem.joinpaths(self.path, |
| dir_entry.name) |
| dir_entry._abspath = self.filesystem.joinpaths(self.abspath, |
| dir_entry.name) |
| dir_entry._isdir = self.filesystem.isdir(dir_entry._abspath) |
| dir_entry._islink = self.filesystem.islink(dir_entry._abspath) |
| return dir_entry |
| |
| if sys.version_info >= (3, 6): |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.close() |
| |
| def close(self): |
| pass |
| |
| |
| def scandir(filesystem, path=''): |
| """Return an iterator of DirEntry objects corresponding to the entries |
| in the directory given by path. |
| |
| Args: |
| filesystem: The fake filesystem used for implementation |
| path: Path to the target directory within the fake filesystem. |
| |
| Returns: |
| an iterator to an unsorted list of os.DirEntry objects for |
| each entry in path. |
| |
| Raises: |
| OSError: if the target is not a directory. |
| """ |
| return ScanDirIter(filesystem, path) |
| |
| |
| def _classify_directory_contents(filesystem, root): |
| """Classify contents of a directory as files/directories. |
| |
| Args: |
| filesystem: The fake filesystem used for implementation |
| root: (str) Directory to examine. |
| |
| Returns: |
| (tuple) A tuple consisting of three values: the directory examined, |
| a list containing all of the directory entries, and a list |
| containing all of the non-directory entries. |
| (This is the same format as returned by the `os.walk` generator.) |
| |
| Raises: |
| Nothing on its own, but be ready to catch exceptions generated by |
| underlying mechanisms like `os.listdir`. |
| """ |
| dirs = [] |
| files = [] |
| for entry in filesystem.listdir(root): |
| if filesystem.isdir(filesystem.joinpaths(root, entry)): |
| dirs.append(entry) |
| else: |
| files.append(entry) |
| return root, dirs, files |
| |
| |
| def walk(filesystem, top, topdown=True, onerror=None, followlinks=False): |
| """Perform an os.walk operation over the fake filesystem. |
| |
| Args: |
| filesystem: The fake filesystem used for implementation |
| top: The root directory from which to begin walk. |
| topdown: Determines whether to return the tuples with the root as |
| the first entry (`True`) or as the last, after all the child |
| directory tuples (`False`). |
| onerror: If not `None`, function which will be called to handle the |
| `os.error` instance provided when `os.listdir()` fails. |
| followlinks: If `True`, symbolic links are followed. |
| |
| Yields: |
| (path, directories, nondirectories) for top and each of its |
| subdirectories. See the documentation for the builtin os module |
| for further details. |
| """ |
| |
| def do_walk(top_dir, top_most=False): |
| if not top_most and not followlinks and filesystem.islink(top_dir): |
| return |
| try: |
| top_contents = _classify_directory_contents(filesystem, top_dir) |
| except OSError as exc: |
| top_contents = None |
| if onerror is not None: |
| onerror(exc) |
| |
| if top_contents is not None: |
| if topdown: |
| yield top_contents |
| |
| for directory in top_contents[1]: |
| if not followlinks and filesystem.islink(directory): |
| continue |
| for contents in do_walk(filesystem.joinpaths(top_dir, |
| directory)): |
| yield contents |
| |
| if not topdown: |
| yield top_contents |
| |
| return do_walk(to_string(top), top_most=True) |
| |
| |
| class FakeScanDirModule: |
| """Uses FakeFilesystem to provide a fake `scandir` module replacement. |
| |
| .. Note:: The ``scandir`` function is a part of the standard ``os`` module |
| since Python 3.5. This class handles the separate ``scandir`` module |
| that is available on pypi. |
| |
| You need a fake_filesystem to use this: |
| `filesystem = fake_filesystem.FakeFilesystem()` |
| `fake_scandir_module = fake_filesystem.FakeScanDirModule(filesystem)` |
| """ |
| |
| @staticmethod |
| def dir(): |
| """Return the list of patched function names. Used for patching |
| functions imported from the module. |
| """ |
| return 'scandir', 'walk' |
| |
| def __init__(self, filesystem): |
| self.filesystem = filesystem |
| |
| def scandir(self, path='.'): |
| """Return an iterator of DirEntry objects corresponding to the entries |
| in the directory given by path. |
| |
| Args: |
| path: Path to the target directory within the fake filesystem. |
| |
| Returns: |
| an iterator to an unsorted list of os.DirEntry objects for |
| each entry in path. |
| |
| Raises: |
| OSError: if the target is not a directory. |
| """ |
| return scandir(self.filesystem, path) |
| |
| def walk(self, top, topdown=True, onerror=None, followlinks=False): |
| """Perform a walk operation over the fake filesystem. |
| |
| Args: |
| top: The root directory from which to begin walk. |
| topdown: Determines whether to return the tuples with the root as |
| the first entry (`True`) or as the last, after all the child |
| directory tuples (`False`). |
| onerror: If not `None`, function which will be called to handle the |
| `os.error` instance provided when `os.listdir()` fails. |
| followlinks: If `True`, symbolic links are followed. |
| |
| Yields: |
| (path, directories, nondirectories) for top and each of its |
| subdirectories. See the documentation for the builtin os module |
| for further details. |
| """ |
| return walk(self.filesystem, top, topdown, onerror, followlinks) |