"""Temporary files. | |
This module provides generic, low- and high-level interfaces for | |
creating temporary files and directories. The interfaces listed | |
as "safe" just below can be used without fear of race conditions. | |
Those listed as "unsafe" cannot, and are provided for backward | |
compatibility only. | |
This module also provides some data items to the user: | |
TMP_MAX - maximum number of names that will be tried before | |
giving up. | |
template - the default prefix for all temporary names. | |
You may change this to control the default prefix. | |
tempdir - If this is set to a string before the first use of | |
any routine from this module, it will be considered as | |
another candidate location to store temporary files. | |
""" | |
__all__ = [ | |
"NamedTemporaryFile", "TemporaryFile", # high level safe interfaces | |
"SpooledTemporaryFile", | |
"mkstemp", "mkdtemp", # low level safe interfaces | |
"mktemp", # deprecated unsafe interface | |
"TMP_MAX", "gettempprefix", # constants | |
"tempdir", "gettempdir" | |
] | |
# Imports. | |
import os as _os | |
import errno as _errno | |
from random import Random as _Random | |
try: | |
from cStringIO import StringIO as _StringIO | |
except ImportError: | |
from StringIO import StringIO as _StringIO | |
try: | |
import fcntl as _fcntl | |
except ImportError: | |
def _set_cloexec(fd): | |
pass | |
else: | |
def _set_cloexec(fd): | |
try: | |
flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0) | |
except IOError: | |
pass | |
else: | |
# flags read successfully, modify | |
flags |= _fcntl.FD_CLOEXEC | |
_fcntl.fcntl(fd, _fcntl.F_SETFD, flags) | |
try: | |
import thread as _thread | |
except ImportError: | |
import dummy_thread as _thread | |
_allocate_lock = _thread.allocate_lock | |
_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL | |
if hasattr(_os, 'O_NOINHERIT'): | |
_text_openflags |= _os.O_NOINHERIT | |
if hasattr(_os, 'O_NOFOLLOW'): | |
_text_openflags |= _os.O_NOFOLLOW | |
_bin_openflags = _text_openflags | |
if hasattr(_os, 'O_BINARY'): | |
_bin_openflags |= _os.O_BINARY | |
if hasattr(_os, 'TMP_MAX'): | |
TMP_MAX = _os.TMP_MAX | |
else: | |
TMP_MAX = 10000 | |
template = "tmp" | |
# Internal routines. | |
_once_lock = _allocate_lock() | |
if hasattr(_os, "lstat"): | |
_stat = _os.lstat | |
elif hasattr(_os, "stat"): | |
_stat = _os.stat | |
else: | |
# Fallback. All we need is something that raises os.error if the | |
# file doesn't exist. | |
def _stat(fn): | |
try: | |
f = open(fn) | |
except IOError: | |
raise _os.error | |
f.close() | |
def _exists(fn): | |
try: | |
_stat(fn) | |
except _os.error: | |
return False | |
else: | |
return True | |
class _RandomNameSequence: | |
"""An instance of _RandomNameSequence generates an endless | |
sequence of unpredictable strings which can safely be incorporated | |
into file names. Each string is six characters long. Multiple | |
threads can safely use the same instance at the same time. | |
_RandomNameSequence is an iterator.""" | |
characters = ("abcdefghijklmnopqrstuvwxyz" + | |
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" + | |
"0123456789_") | |
def __init__(self): | |
self.mutex = _allocate_lock() | |
self.rng = _Random() | |
self.normcase = _os.path.normcase | |
def __iter__(self): | |
return self | |
def next(self): | |
m = self.mutex | |
c = self.characters | |
choose = self.rng.choice | |
m.acquire() | |
try: | |
letters = [choose(c) for dummy in "123456"] | |
finally: | |
m.release() | |
return self.normcase(''.join(letters)) | |
def _candidate_tempdir_list(): | |
"""Generate a list of candidate temporary directories which | |
_get_default_tempdir will try.""" | |
dirlist = [] | |
# First, try the environment. | |
for envname in 'TMPDIR', 'TEMP', 'TMP': | |
dirname = _os.getenv(envname) | |
if dirname: dirlist.append(dirname) | |
# Failing that, try OS-specific locations. | |
if _os.name == 'riscos': | |
dirname = _os.getenv('Wimp$ScrapDir') | |
if dirname: dirlist.append(dirname) | |
elif _os.name == 'nt': | |
dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ]) | |
else: | |
dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ]) | |
# As a last resort, the current directory. | |
try: | |
dirlist.append(_os.getcwd()) | |
except (AttributeError, _os.error): | |
dirlist.append(_os.curdir) | |
return dirlist | |
def _get_default_tempdir(): | |
"""Calculate the default directory to use for temporary files. | |
This routine should be called exactly once. | |
We determine whether or not a candidate temp dir is usable by | |
trying to create and write to a file in that directory. If this | |
is successful, the test file is deleted. To prevent denial of | |
service, the name of the test file must be randomized.""" | |
namer = _RandomNameSequence() | |
dirlist = _candidate_tempdir_list() | |
flags = _text_openflags | |
for dir in dirlist: | |
if dir != _os.curdir: | |
dir = _os.path.normcase(_os.path.abspath(dir)) | |
# Try only a few names per directory. | |
for seq in xrange(100): | |
name = namer.next() | |
filename = _os.path.join(dir, name) | |
try: | |
fd = _os.open(filename, flags, 0600) | |
fp = _os.fdopen(fd, 'w') | |
fp.write('blat') | |
fp.close() | |
_os.unlink(filename) | |
del fp, fd | |
return dir | |
except (OSError, IOError), e: | |
if e[0] != _errno.EEXIST: | |
break # no point trying more names in this directory | |
pass | |
raise IOError, (_errno.ENOENT, | |
("No usable temporary directory found in %s" % dirlist)) | |
_name_sequence = None | |
def _get_candidate_names(): | |
"""Common setup sequence for all user-callable interfaces.""" | |
global _name_sequence | |
if _name_sequence is None: | |
_once_lock.acquire() | |
try: | |
if _name_sequence is None: | |
_name_sequence = _RandomNameSequence() | |
finally: | |
_once_lock.release() | |
return _name_sequence | |
def _mkstemp_inner(dir, pre, suf, flags): | |
"""Code common to mkstemp, TemporaryFile, and NamedTemporaryFile.""" | |
names = _get_candidate_names() | |
for seq in xrange(TMP_MAX): | |
name = names.next() | |
file = _os.path.join(dir, pre + name + suf) | |
try: | |
fd = _os.open(file, flags, 0600) | |
_set_cloexec(fd) | |
return (fd, _os.path.abspath(file)) | |
except OSError, e: | |
if e.errno == _errno.EEXIST: | |
continue # try again | |
raise | |
raise IOError, (_errno.EEXIST, "No usable temporary file name found") | |
# User visible interfaces. | |
def gettempprefix(): | |
"""Accessor for tempdir.template.""" | |
return template | |
tempdir = None | |
def gettempdir(): | |
"""Accessor for tempfile.tempdir.""" | |
global tempdir | |
if tempdir is None: | |
_once_lock.acquire() | |
try: | |
if tempdir is None: | |
tempdir = _get_default_tempdir() | |
finally: | |
_once_lock.release() | |
return tempdir | |
def mkstemp(suffix="", prefix=template, dir=None, text=False): | |
"""User-callable function to create and return a unique temporary | |
file. The return value is a pair (fd, name) where fd is the | |
file descriptor returned by os.open, and name is the filename. | |
If 'suffix' is specified, the file name will end with that suffix, | |
otherwise there will be no suffix. | |
If 'prefix' is specified, the file name will begin with that prefix, | |
otherwise a default prefix is used. | |
If 'dir' is specified, the file will be created in that directory, | |
otherwise a default directory is used. | |
If 'text' is specified and true, the file is opened in text | |
mode. Else (the default) the file is opened in binary mode. On | |
some operating systems, this makes no difference. | |
The file is readable and writable only by the creating user ID. | |
If the operating system uses permission bits to indicate whether a | |
file is executable, the file is executable by no one. The file | |
descriptor is not inherited by children of this process. | |
Caller is responsible for deleting the file when done with it. | |
""" | |
if dir is None: | |
dir = gettempdir() | |
if text: | |
flags = _text_openflags | |
else: | |
flags = _bin_openflags | |
return _mkstemp_inner(dir, prefix, suffix, flags) | |
def mkdtemp(suffix="", prefix=template, dir=None): | |
"""User-callable function to create and return a unique temporary | |
directory. The return value is the pathname of the directory. | |
Arguments are as for mkstemp, except that the 'text' argument is | |
not accepted. | |
The directory is readable, writable, and searchable only by the | |
creating user. | |
Caller is responsible for deleting the directory when done with it. | |
""" | |
if dir is None: | |
dir = gettempdir() | |
names = _get_candidate_names() | |
for seq in xrange(TMP_MAX): | |
name = names.next() | |
file = _os.path.join(dir, prefix + name + suffix) | |
try: | |
_os.mkdir(file, 0700) | |
return file | |
except OSError, e: | |
if e.errno == _errno.EEXIST: | |
continue # try again | |
raise | |
raise IOError, (_errno.EEXIST, "No usable temporary directory name found") | |
def mktemp(suffix="", prefix=template, dir=None): | |
"""User-callable function to return a unique temporary file name. The | |
file is not created. | |
Arguments are as for mkstemp, except that the 'text' argument is | |
not accepted. | |
This function is unsafe and should not be used. The file name | |
refers to a file that did not exist at some point, but by the time | |
you get around to creating it, someone else may have beaten you to | |
the punch. | |
""" | |
## from warnings import warn as _warn | |
## _warn("mktemp is a potential security risk to your program", | |
## RuntimeWarning, stacklevel=2) | |
if dir is None: | |
dir = gettempdir() | |
names = _get_candidate_names() | |
for seq in xrange(TMP_MAX): | |
name = names.next() | |
file = _os.path.join(dir, prefix + name + suffix) | |
if not _exists(file): | |
return file | |
raise IOError, (_errno.EEXIST, "No usable temporary filename found") | |
class _TemporaryFileWrapper: | |
"""Temporary file wrapper | |
This class provides a wrapper around files opened for | |
temporary use. In particular, it seeks to automatically | |
remove the file when it is no longer needed. | |
""" | |
def __init__(self, file, name, delete=True): | |
self.file = file | |
self.name = name | |
self.close_called = False | |
self.delete = delete | |
def __getattr__(self, name): | |
# Attribute lookups are delegated to the underlying file | |
# and cached for non-numeric results | |
# (i.e. methods are cached, closed and friends are not) | |
file = self.__dict__['file'] | |
a = getattr(file, name) | |
if not issubclass(type(a), type(0)): | |
setattr(self, name, a) | |
return a | |
# The underlying __enter__ method returns the wrong object | |
# (self.file) so override it to return the wrapper | |
def __enter__(self): | |
self.file.__enter__() | |
return self | |
# NT provides delete-on-close as a primitive, so we don't need | |
# the wrapper to do anything special. We still use it so that | |
# file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile. | |
if _os.name != 'nt': | |
# Cache the unlinker so we don't get spurious errors at | |
# shutdown when the module-level "os" is None'd out. Note | |
# that this must be referenced as self.unlink, because the | |
# name TemporaryFileWrapper may also get None'd out before | |
# __del__ is called. | |
unlink = _os.unlink | |
def close(self): | |
if not self.close_called: | |
self.close_called = True | |
self.file.close() | |
if self.delete: | |
self.unlink(self.name) | |
def __del__(self): | |
self.close() | |
# Need to trap __exit__ as well to ensure the file gets | |
# deleted when used in a with statement | |
def __exit__(self, exc, value, tb): | |
result = self.file.__exit__(exc, value, tb) | |
self.close() | |
return result | |
else: | |
def __exit__(self, exc, value, tb): | |
self.file.__exit__(exc, value, tb) | |
def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="", | |
prefix=template, dir=None, delete=True): | |
"""Create and return a temporary file. | |
Arguments: | |
'prefix', 'suffix', 'dir' -- as for mkstemp. | |
'mode' -- the mode argument to os.fdopen (default "w+b"). | |
'bufsize' -- the buffer size argument to os.fdopen (default -1). | |
'delete' -- whether the file is deleted on close (default True). | |
The file is created as mkstemp() would do it. | |
Returns an object with a file-like interface; the name of the file | |
is accessible as file.name. The file will be automatically deleted | |
when it is closed unless the 'delete' argument is set to False. | |
""" | |
if dir is None: | |
dir = gettempdir() | |
if 'b' in mode: | |
flags = _bin_openflags | |
else: | |
flags = _text_openflags | |
# Setting O_TEMPORARY in the flags causes the OS to delete | |
# the file when it is closed. This is only supported by Windows. | |
if _os.name == 'nt' and delete: | |
flags |= _os.O_TEMPORARY | |
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags) | |
file = _os.fdopen(fd, mode, bufsize) | |
return _TemporaryFileWrapper(file, name, delete) | |
if _os.name != 'posix' or _os.sys.platform == 'cygwin': | |
# On non-POSIX and Cygwin systems, assume that we cannot unlink a file | |
# while it is open. | |
TemporaryFile = NamedTemporaryFile | |
else: | |
def TemporaryFile(mode='w+b', bufsize=-1, suffix="", | |
prefix=template, dir=None): | |
"""Create and return a temporary file. | |
Arguments: | |
'prefix', 'suffix', 'dir' -- as for mkstemp. | |
'mode' -- the mode argument to os.fdopen (default "w+b"). | |
'bufsize' -- the buffer size argument to os.fdopen (default -1). | |
The file is created as mkstemp() would do it. | |
Returns an object with a file-like interface. The file has no | |
name, and will cease to exist when it is closed. | |
""" | |
if dir is None: | |
dir = gettempdir() | |
if 'b' in mode: | |
flags = _bin_openflags | |
else: | |
flags = _text_openflags | |
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags) | |
try: | |
_os.unlink(name) | |
return _os.fdopen(fd, mode, bufsize) | |
except: | |
_os.close(fd) | |
raise | |
class SpooledTemporaryFile: | |
"""Temporary file wrapper, specialized to switch from | |
StringIO to a real file when it exceeds a certain size or | |
when a fileno is needed. | |
""" | |
_rolled = False | |
def __init__(self, max_size=0, mode='w+b', bufsize=-1, | |
suffix="", prefix=template, dir=None): | |
self._file = _StringIO() | |
self._max_size = max_size | |
self._rolled = False | |
self._TemporaryFileArgs = (mode, bufsize, suffix, prefix, dir) | |
def _check(self, file): | |
if self._rolled: return | |
max_size = self._max_size | |
if max_size and file.tell() > max_size: | |
self.rollover() | |
def rollover(self): | |
if self._rolled: return | |
file = self._file | |
newfile = self._file = TemporaryFile(*self._TemporaryFileArgs) | |
del self._TemporaryFileArgs | |
newfile.write(file.getvalue()) | |
newfile.seek(file.tell(), 0) | |
self._rolled = True | |
# The method caching trick from NamedTemporaryFile | |
# won't work here, because _file may change from a | |
# _StringIO instance to a real file. So we list | |
# all the methods directly. | |
# Context management protocol | |
def __enter__(self): | |
if self._file.closed: | |
raise ValueError("Cannot enter context with closed file") | |
return self | |
def __exit__(self, exc, value, tb): | |
self._file.close() | |
# file protocol | |
def __iter__(self): | |
return self._file.__iter__() | |
def close(self): | |
self._file.close() | |
@property | |
def closed(self): | |
return self._file.closed | |
@property | |
def encoding(self): | |
return self._file.encoding | |
def fileno(self): | |
self.rollover() | |
return self._file.fileno() | |
def flush(self): | |
self._file.flush() | |
def isatty(self): | |
return self._file.isatty() | |
@property | |
def mode(self): | |
return self._file.mode | |
@property | |
def name(self): | |
return self._file.name | |
@property | |
def newlines(self): | |
return self._file.newlines | |
def next(self): | |
return self._file.next | |
def read(self, *args): | |
return self._file.read(*args) | |
def readline(self, *args): | |
return self._file.readline(*args) | |
def readlines(self, *args): | |
return self._file.readlines(*args) | |
def seek(self, *args): | |
self._file.seek(*args) | |
@property | |
def softspace(self): | |
return self._file.softspace | |
def tell(self): | |
return self._file.tell() | |
def truncate(self): | |
self._file.truncate() | |
def write(self, s): | |
file = self._file | |
rv = file.write(s) | |
self._check(file) | |
return rv | |
def writelines(self, iterable): | |
file = self._file | |
rv = file.writelines(iterable) | |
self._check(file) | |
return rv | |
def xreadlines(self, *args): | |
return self._file.xreadlines(*args) |