blob: 69518bcb09504e0dc98a5715dacb6a727f10fb7b [file] [log] [blame]
import torch
import os
import weakref
import multiprocessing
from multiprocessing.reduction import ForkingPickler
import sys
try:
# Early load resource_sharer to prevent a partially initialized instance
# from being inherited in a forked child process. The reduce_storage method
# requires this module indirectly through DupFd(). The built-in mp.Queue
# class pickles arguments in a background thread which may overlap with the
# fork.
import multiprocessing.resource_sharer
except ImportError:
pass
class StorageRef(object):
# An object with a cdata field which may be set to None. We subclass object
# instead of using a dict() to support weak references.
def __init__(self, ptr):
self.cdata = ptr
def __del__(self):
torch.Storage._free_weak_ref(self.cdata)
# mapping from handles to StorageRef objects
shared_cache = weakref.WeakValueDictionary()
def rebuild_event(handle):
return torch.cuda.Event(_handle=handle)
def reduce_event(event):
return (rebuild_event, (event.ipc_handle(),))
def rebuild_tensor(cls, storage, metadata):
storage_offset, size, stride = metadata
return torch._utils._rebuild_tensor(storage, storage_offset, size, stride)
def rebuild_cuda_tensor(tensor_cls, tensor_size, tensor_stride, tensor_offset,
storage_cls, storage_device, storage_handle, storage_size):
storage = storage_from_cache(storage_cls, storage_handle)
if storage is None:
torch.cuda._lazy_init()
storage = storage_cls._new_shared_cuda(storage_device, storage_handle, storage_size)
shared_cache[storage_handle] = storage._weak_ref(StorageRef)
return torch._utils._rebuild_tensor(storage, tensor_offset, tensor_size, tensor_stride)
def reduce_tensor(tensor):
storage = tensor.storage()
# Note [CUDA IPC and the caching allocator]
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# When you send a CUDA tensor over IPC, you might expect that you will
# get out the same storage from the other end. However, the CUDA caching
# allocator makes it difficult to preserve this invariant. Consider
# the following situation: a tensor of size 0x100 points to offset 0x20 of
# a storage at 0xA100 of size 0x100. (For simplicity, all of these
# sizes are given in bytes). HOWEVER, with the caching allocator, this storage
# might be part of a larger cudaMalloc allocation 0xA000 of size 0x4000.
#
# When we want to send this CUDA tensor over IPC, we must send the
# *entire* cudaMalloc allocation, i.e., the 0xA000 region, not just
# the storage 0xA100 (because that is what CUDA supports). So, on the
# other end, there simply isn't any way to say, "Wait, you gave me
# a bigger region (0xA000) than the one I wanted (0xA100)"; we have
# to just make a storage for the entire caching allocator block.
#
# This is fine, because all we need to do is just adjust the offset
# on the tensor itself: instead of:
#
# Tensor(size=0x100, offset=0x020, storage=Storage(data=0xA100, size=0x0100))
#
# we have
#
# Tensor(size=0x100, offset=0x120, storage=Storage(data=0xA000, size=0x4000))
#
# This strategy has a few implications:
#
# 1. When we serialize a CUDA tensor for IPC, we have to do it all in one
# go (non-compositionally), instead of first serializing storage, and
# then serializing tensor. This is because the base address of the
# storage allocation affects what offset we write into the tensor.
#
# 2. We MUST NOT let the new IPC tensor be resizable. Originally, a resize
# of the storage beyond 0x100 would merely have caused us to do a
# reallocation. You don't really want to do this, but if you did,
# all that would happen is that you would lose IPC sharing. But if
# you do this in the new world, we will happily let you write out of
# bounds of your "allocation", clobbering unrelated data in the cached
# allocator block. BAD!
#
# By the way, in old versions of PyTorch, we supported this situation
# natively using a "storage view", which permitted multiple storages to be
# views on each other. But this was the *only* use of storage views, so we
# eliminated it so that we could just use tensor views to implement the same
# thing.
#
if storage.is_cuda:
(device, handle, storage_size, storage_offset) = storage._share_cuda_()
tensor_offset = tensor.storage_offset()
# WARNING! This call to _weak_ref could lead to O(n) deleter
# behavior, if you repeatedly call it on the same Storage (all
# other sites are guarded by shared_cache; maybe this site
# should be too?)
shared_cache[handle] = storage._weak_ref(StorageRef)
return (rebuild_cuda_tensor,
(type(tensor),
tensor.size(),
tensor.stride(),
tensor_offset + storage_offset,
type(storage),
device,
handle,
storage_size))
metadata = (tensor.storage_offset(), tensor.size(), tensor.stride())
return (rebuild_tensor, (type(tensor), storage, metadata))
def fd_id(fd):
# Returns a tuple which uniquely identifies a file descriptor. In Mac OS,
# this doesn't work with shared memory handles, which is why we don't
# support the "file_descriptor" sharing method on that platform.
stat = os.fstat(fd)
return (stat.st_ino, stat.st_dev)
def storage_from_cache(cls, key):
storage_ref = shared_cache.get(key)
if storage_ref is None:
return None
return cls._new_with_weak_ptr(storage_ref)
def rebuild_storage_fd(cls, df, size):
if sys.version_info[0] == 2:
fd = multiprocessing.reduction.rebuild_handle(df)
else:
fd = df.detach()
try:
storage = storage_from_cache(cls, fd_id(fd))
if storage is not None:
return storage
storage = cls._new_shared_fd(fd, size)
shared_cache[fd_id(fd)] = storage._weak_ref(StorageRef)
return storage
finally:
os.close(fd)
def rebuild_storage_filename(cls, manager, handle, size):
storage = storage_from_cache(cls, handle)
if storage is not None:
return storage._shared_decref()
storage = cls._new_shared_filename(manager, handle, size)
shared_cache[handle] = storage._weak_ref(StorageRef)
return storage._shared_decref()
def rebuild_storage_empty(cls):
return cls()
def reduce_storage(storage):
from . import get_sharing_strategy
if storage.is_cuda:
raise RuntimeError("Cannot pickle CUDA storage; try pickling a CUDA tensor instead")
elif get_sharing_strategy() == 'file_system':
metadata = storage._share_filename_()
cache_key = metadata[1]
rebuild = rebuild_storage_filename
storage._shared_incref()
elif storage.size() == 0:
# This is special cased because Empty tensors
# (with size 0) cannot be mmapped.
return (rebuild_storage_empty, (type(storage),))
else:
fd, size = storage._share_fd_()
if sys.version_info[0] == 2:
df = multiprocessing.reduction.reduce_handle(fd)
else:
df = multiprocessing.reduction.DupFd(fd)
cache_key = fd_id(fd)
metadata = (df, size)
rebuild = rebuild_storage_fd
# WARNING! This call to _weak_ref could lead to O(n) deleter
# behavior, if you repeatedly call it on the same Storage (all
# other sites are guarded by shared_cache; maybe this site
# should be too?)
shared_cache[cache_key] = storage._weak_ref(StorageRef)
return (rebuild, (type(storage),) + metadata)
def init_reductions():
ForkingPickler.register(torch.cuda.Event, reduce_event)
for t in torch._storage_classes:
ForkingPickler.register(t, reduce_storage)
for t in torch._tensor_classes:
ForkingPickler.register(t, reduce_tensor)
# TODO: Maybe this should be in tensor_classes? :)
ForkingPickler.register(torch.Tensor, reduce_tensor)