| import errno |
| import fnmatch |
| import os |
| import os.path |
| import random |
| import shutil |
| import stat |
| import sys |
| from contextlib import contextmanager |
| from tempfile import NamedTemporaryFile |
| |
| # NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is |
| # why we ignore the type on this import. |
| from pip._vendor.retrying import retry # type: ignore |
| from pip._vendor.six import PY2 |
| |
| from pip._internal.utils.compat import get_path_uid |
| from pip._internal.utils.misc import format_size |
| from pip._internal.utils.typing import MYPY_CHECK_RUNNING, cast |
| |
| if MYPY_CHECK_RUNNING: |
| from typing import Any, BinaryIO, Iterator, List, Union |
| |
| class NamedTemporaryFileResult(BinaryIO): |
| @property |
| def file(self): |
| # type: () -> BinaryIO |
| pass |
| |
| |
| def check_path_owner(path): |
| # type: (str) -> bool |
| # If we don't have a way to check the effective uid of this process, then |
| # we'll just assume that we own the directory. |
| if sys.platform == "win32" or not hasattr(os, "geteuid"): |
| return True |
| |
| assert os.path.isabs(path) |
| |
| previous = None |
| while path != previous: |
| if os.path.lexists(path): |
| # Check if path is writable by current user. |
| if os.geteuid() == 0: |
| # Special handling for root user in order to handle properly |
| # cases where users use sudo without -H flag. |
| try: |
| path_uid = get_path_uid(path) |
| except OSError: |
| return False |
| return path_uid == 0 |
| else: |
| return os.access(path, os.W_OK) |
| else: |
| previous, path = path, os.path.dirname(path) |
| return False # assume we don't own the path |
| |
| |
| def copy2_fixed(src, dest): |
| # type: (str, str) -> None |
| """Wrap shutil.copy2() but map errors copying socket files to |
| SpecialFileError as expected. |
| |
| See also https://bugs.python.org/issue37700. |
| """ |
| try: |
| shutil.copy2(src, dest) |
| except (OSError, IOError): |
| for f in [src, dest]: |
| try: |
| is_socket_file = is_socket(f) |
| except OSError: |
| # An error has already occurred. Another error here is not |
| # a problem and we can ignore it. |
| pass |
| else: |
| if is_socket_file: |
| raise shutil.SpecialFileError( |
| "`{f}` is a socket".format(**locals())) |
| |
| raise |
| |
| |
| def is_socket(path): |
| # type: (str) -> bool |
| return stat.S_ISSOCK(os.lstat(path).st_mode) |
| |
| |
| @contextmanager |
| def adjacent_tmp_file(path, **kwargs): |
| # type: (str, **Any) -> Iterator[NamedTemporaryFileResult] |
| """Return a file-like object pointing to a tmp file next to path. |
| |
| The file is created securely and is ensured to be written to disk |
| after the context reaches its end. |
| |
| kwargs will be passed to tempfile.NamedTemporaryFile to control |
| the way the temporary file will be opened. |
| """ |
| with NamedTemporaryFile( |
| delete=False, |
| dir=os.path.dirname(path), |
| prefix=os.path.basename(path), |
| suffix='.tmp', |
| **kwargs |
| ) as f: |
| result = cast('NamedTemporaryFileResult', f) |
| try: |
| yield result |
| finally: |
| result.file.flush() |
| os.fsync(result.file.fileno()) |
| |
| |
| _replace_retry = retry(stop_max_delay=1000, wait_fixed=250) |
| |
| if PY2: |
| @_replace_retry |
| def replace(src, dest): |
| # type: (str, str) -> None |
| try: |
| os.rename(src, dest) |
| except OSError: |
| os.remove(dest) |
| os.rename(src, dest) |
| |
| else: |
| replace = _replace_retry(os.replace) |
| |
| |
| # test_writable_dir and _test_writable_dir_win are copied from Flit, |
| # with the author's agreement to also place them under pip's license. |
| def test_writable_dir(path): |
| # type: (str) -> bool |
| """Check if a directory is writable. |
| |
| Uses os.access() on POSIX, tries creating files on Windows. |
| """ |
| # If the directory doesn't exist, find the closest parent that does. |
| while not os.path.isdir(path): |
| parent = os.path.dirname(path) |
| if parent == path: |
| break # Should never get here, but infinite loops are bad |
| path = parent |
| |
| if os.name == 'posix': |
| return os.access(path, os.W_OK) |
| |
| return _test_writable_dir_win(path) |
| |
| |
| def _test_writable_dir_win(path): |
| # type: (str) -> bool |
| # os.access doesn't work on Windows: http://bugs.python.org/issue2528 |
| # and we can't use tempfile: http://bugs.python.org/issue22107 |
| basename = 'accesstest_deleteme_fishfingers_custard_' |
| alphabet = 'abcdefghijklmnopqrstuvwxyz0123456789' |
| for _ in range(10): |
| name = basename + ''.join(random.choice(alphabet) for _ in range(6)) |
| file = os.path.join(path, name) |
| try: |
| fd = os.open(file, os.O_RDWR | os.O_CREAT | os.O_EXCL) |
| # Python 2 doesn't support FileExistsError and PermissionError. |
| except OSError as e: |
| # exception FileExistsError |
| if e.errno == errno.EEXIST: |
| continue |
| # exception PermissionError |
| if e.errno == errno.EPERM or e.errno == errno.EACCES: |
| # This could be because there's a directory with the same name. |
| # But it's highly unlikely there's a directory called that, |
| # so we'll assume it's because the parent dir is not writable. |
| # This could as well be because the parent dir is not readable, |
| # due to non-privileged user access. |
| return False |
| raise |
| else: |
| os.close(fd) |
| os.unlink(file) |
| return True |
| |
| # This should never be reached |
| raise EnvironmentError( |
| 'Unexpected condition testing for writable directory' |
| ) |
| |
| |
| def find_files(path, pattern): |
| # type: (str, str) -> List[str] |
| """Returns a list of absolute paths of files beneath path, recursively, |
| with filenames which match the UNIX-style shell glob pattern.""" |
| result = [] # type: List[str] |
| for root, _, files in os.walk(path): |
| matches = fnmatch.filter(files, pattern) |
| result.extend(os.path.join(root, f) for f in matches) |
| return result |
| |
| |
| def file_size(path): |
| # type: (str) -> Union[int, float] |
| # If it's a symlink, return 0. |
| if os.path.islink(path): |
| return 0 |
| return os.path.getsize(path) |
| |
| |
| def format_file_size(path): |
| # type: (str) -> str |
| return format_size(file_size(path)) |
| |
| |
| def directory_size(path): |
| # type: (str) -> Union[int, float] |
| size = 0.0 |
| for root, _dirs, files in os.walk(path): |
| for filename in files: |
| file_path = os.path.join(root, filename) |
| size += file_size(file_path) |
| return size |
| |
| |
| def format_directory_size(path): |
| # type: (str) -> str |
| return format_size(directory_size(path)) |