| /* Copyright 2017 The TensorFlow Authors. All Rights Reserved. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ==============================================================================*/ |
| |
| // The algorithm for dynamic partition has the following steps: |
| // 1. Let N be the size of partitions. We initialize a new vector indices_in |
| // with the values 0, 1, 2, ..., N-1. |
| // 2. We apply cub::DeviceRadixSort::SortPairs to the key - value pairs given |
| // by partitions and indices_in. This will result in two new vectors |
| // partitions_out and indices_out, with partitions_out sorted. |
| // 3. The first dimension of outputs[i] is equal to the number of i-values in |
| // partitions_out. We determine it in two steps: |
| // - apply cub::DeviceReduce::ReduceByKey to count how many times each value |
| // appears in partitions_out, |
| // - move the results to partition_count. This handles missing values |
| // (corresponding to empty parts). |
| // 4. Because partition_count is on the GPU, we bring it asynchronously to |
| // the CPU. Then we can allocate the output tensors. |
| // 5. Finally, we use indices_out and the gather functor to collect the output. |
| // This works, because for each interval of i-values, indices_out points |
| // to the slices which should form output[i]. |
| |
| #if GOOGLE_CUDA |
| |
| #define EIGEN_USE_GPU |
| |
| #include "third_party/cub/device/device_radix_sort.cuh" |
| #include "third_party/cub/device/device_reduce.cuh" |
| #include "third_party/cub/iterator/constant_input_iterator.cuh" |
| #include "third_party/cub/thread/thread_operators.cuh" |
| #include "tensorflow/core/common_runtime/gpu/gpu_event_mgr.h" |
| #include "tensorflow/core/framework/op_kernel.h" |
| #include "tensorflow/core/framework/register_types.h" |
| #include "tensorflow/core/framework/tensor.h" |
| #include "tensorflow/core/framework/types.h" |
| #include "tensorflow/core/kernels/bounds_check.h" |
| #include "tensorflow/core/kernels/fill_functor.h" |
| #include "tensorflow/core/kernels/gather_functor_gpu.cu.h" |
| #include "tensorflow/core/util/cuda_kernel_helper.h" |
| #include "tensorflow/core/util/transform_output_iterator.h" |
| |
| namespace tensorflow { |
| |
| typedef Eigen::GpuDevice GPUDevice; |
| |
| namespace { |
| |
| template <typename T> |
| __global__ void RangeInitKernel(const T start, const T delta, const int32 size, |
| T* out) { |
| CUDA_1D_KERNEL_LOOP(i, size) { out[i] = start + i * delta; } |
| } |
| |
| __global__ void MoveValuesKernel(const int32* keys, const int32* values, |
| const int32* size, int32 out_size, |
| int32* out) { |
| int32 N = min(ldg(size), out_size); |
| CUDA_1D_KERNEL_LOOP(i, N) { |
| int32 key = ldg(keys + i); |
| int32 value = ldg(values + i); |
| if (FastBoundsCheck(key, out_size)) out[key] = value; |
| } |
| } |
| |
| // Initialize out with range start, start + delta, start + 2 * delta, ... |
| // This is needed because tf.range has no GPU implementation. |
| template <typename T> |
| void RangeInit(const GPUDevice& d, const T start, const T delta, |
| const int32 size, typename TTypes<T>::Flat out) { |
| CudaLaunchConfig config = GetCudaLaunchConfig(size, d); |
| RangeInitKernel<T> |
| <<<config.block_count, config.thread_per_block, 0, d.stream()>>>( |
| start, delta, size, out.data()); |
| } |
| |
| // Given *num_runs pairs (key, value), this function moves the value |
| // corresponding to key i at position i in the array out. |
| void MoveValues(const GPUDevice& d, int32* keys, int32* values, int32* num_runs, |
| int32 out_size, int32* out) { |
| // Because num_runs is located on the GPU, we can not access it directly. |
| // So we launch the kernel with size = out_size. |
| // This is valid for correct inputs, because then out_size >= *num_runs. |
| // For wrong inputs, we may have out_size < *num_runs. In this case we will |
| // only handle the first out_size values. |
| CudaLaunchConfig config = GetCudaLaunchConfig(out_size, d); |
| MoveValuesKernel<<<config.block_count, config.thread_per_block, 0, |
| d.stream()>>>(keys, values, num_runs, out_size, out); |
| } |
| |
| template <typename T> |
| void CallGatherKernel(const GPUDevice& d, const T* params, const int32* indices, |
| T* out, int64 gather_dim_size, int64 indices_size, |
| int64 slice_size, int64 out_size) { |
| CudaLaunchConfig config = GetCudaLaunchConfig(out_size, d); |
| GatherOpKernel<T, int32, true> |
| <<<config.block_count, config.thread_per_block, 0, d.stream()>>>( |
| params, indices, out, gather_dim_size, indices_size, slice_size, |
| out_size); |
| } |
| |
| struct IdentityOp { |
| __device__ int32 __forceinline__ operator()(const int32& a) const { |
| return a; |
| } |
| }; |
| |
| // Define an output iterator that only allows assignment to |
| // positions between [base, base + limit). |
| class BoundedOutputIterator |
| : public TransformOutputIterator<int32, int32, IdentityOp> { |
| private: |
| int32 limit; |
| int32* base; |
| |
| struct BoundedReference : Reference { |
| int32 limit; |
| int32* base; |
| // Constructor |
| __host__ __device__ __forceinline__ |
| BoundedReference(int32* ptr, int32* base, IdentityOp op, int32 limit) |
| : Reference(ptr, op), limit(limit), base(base) {} |
| |
| // Assignment |
| __host__ __device__ __forceinline__ int32 operator=(int32 val) { |
| if (ptr - base < limit && ptr - base >= 0) *ptr = val; |
| return val; |
| } |
| }; |
| |
| public: |
| typedef BoundedOutputIterator self_type; |
| typedef BoundedReference reference; |
| |
| __host__ __device__ __forceinline__ BoundedOutputIterator(int32* ptr, |
| IdentityOp op, |
| int32 size) |
| : TransformOutputIterator(ptr, op), limit(size), base(ptr) {} |
| |
| __host__ __device__ __forceinline__ |
| BoundedOutputIterator(int32* ptr, int32* base, IdentityOp op, int32 size) |
| : TransformOutputIterator(ptr, op), limit(size), base(base) {} |
| |
| // Indirection |
| __host__ __device__ __forceinline__ reference operator*() const { |
| return BoundedReference(ptr, base, conversion_op, limit); |
| } |
| |
| // Array subscript |
| __host__ __device__ __forceinline__ reference operator[](int32 n) const { |
| return BoundedReference(ptr + n, base, conversion_op, limit); |
| } |
| |
| // Addition |
| __host__ __device__ __forceinline__ self_type operator+(int32 n) const { |
| self_type retval(ptr + n, base, conversion_op, limit); |
| return retval; |
| } |
| |
| // Subtraction |
| __host__ __device__ __forceinline__ self_type operator-(int32 n) const { |
| self_type retval(ptr - n, base, conversion_op, limit); |
| return retval; |
| } |
| }; |
| |
| } // namespace |
| |
| // The current implementation has memory cost on GPU |
| // I + P + max(3N + R + P, O + N), where: |
| // I - the size of the input |
| // N - the size of the partitions tensor |
| // R - the temporary storage used by cub::RadixSort, about 2N |
| // P - the number of partitions |
| // O - the size of the output |
| // So roughly the cost is I + P + max(5N, O + N). |
| template <typename T> |
| class DynamicPartitionOpGPU : public AsyncOpKernel { |
| public: |
| explicit DynamicPartitionOpGPU(OpKernelConstruction* c) : AsyncOpKernel(c) { |
| OP_REQUIRES_OK(c, c->GetAttr("num_partitions", &num_partitions_)); |
| OP_REQUIRES(c, num_partitions_ >= 1, |
| errors::InvalidArgument("num_partitions must be at least 1")); |
| } |
| |
| void AllocateTempSpace(OpKernelContext* c, int32 N, Tensor* indices_in, |
| Tensor* partitions_out, Tensor* indices_out, |
| DoneCallback done) { |
| int32 M = std::max(N, num_partitions_); |
| // indices_in will be made slightly larger to accommodate |
| // later computations. |
| OP_REQUIRES_OK_ASYNC( |
| c, c->allocate_temp(DT_INT32, TensorShape({M}), indices_in), done); |
| OP_REQUIRES_OK_ASYNC( |
| c, c->allocate_temp(DT_INT32, TensorShape({N}), partitions_out), done); |
| OP_REQUIRES_OK_ASYNC( |
| c, c->allocate_temp(DT_INT32, TensorShape({N}), indices_out), done); |
| } |
| |
| void AllocateOutputs(OpKernelContext* c, const Tensor* data, |
| const Tensor* partitions, const Tensor* partition_count, |
| OpOutputList* Tout, DoneCallback done) { |
| auto e_part_count = partition_count->flat<int32>(); |
| // Allocate output tensors of the right size |
| OP_REQUIRES_OK_ASYNC(c, c->output_list("outputs", Tout), done); |
| for (int p = 0; p < num_partitions_; p++) { |
| TensorShape shape; |
| shape.AddDim(e_part_count(p)); |
| for (int i = partitions->dims(); i < data->dims(); i++) { |
| shape.AddDim(data->dim_size(i)); |
| } |
| Tensor* out; |
| OP_REQUIRES_OK_ASYNC(c, Tout->allocate(p, shape, &out), done); |
| } |
| } |
| |
| void ComputeAsync(OpKernelContext* c, DoneCallback done) { |
| const Tensor& data = c->input(0); |
| const Tensor& partitions = c->input(1); |
| |
| OP_REQUIRES_ASYNC( |
| c, TensorShapeUtils::StartsWith(data.shape(), partitions.shape()), |
| errors::InvalidArgument( |
| "data.shape must start with partitions.shape, ", |
| "got data.shape = ", data.shape().DebugString(), |
| ", partitions.shape = ", partitions.shape().DebugString()), |
| done); |
| |
| Tensor partition_count; |
| |
| // We must handle the case of empty partitions separately, |
| // because kernels don't work with 0-sized tensors. |
| if (partitions.NumElements() == 0) { |
| AllocatorAttributes alloc_attr; |
| alloc_attr.set_on_host(true); |
| OP_REQUIRES_OK_ASYNC( |
| c, |
| c->allocate_temp(DT_INT32, TensorShape({num_partitions_}), |
| &partition_count, alloc_attr), |
| done); |
| auto e_part_count = partition_count.flat<int32>(); |
| for (int i = 0; i < num_partitions_; i++) e_part_count(i) = 0; |
| OpOutputList outputs; |
| this->AllocateOutputs(c, &data, &partitions, &partition_count, &outputs, |
| done); |
| if (c->status().ok()) done(); |
| return; |
| } |
| |
| // Prepare for counting. |
| OP_REQUIRES_OK_ASYNC( |
| c, |
| c->allocate_temp(DT_INT32, TensorShape({num_partitions_}), |
| &partition_count), |
| done); |
| Tensor indices_out; |
| // Count how many times each partition index occurs. |
| // Also sort the info in partitions and output it in indices_out, |
| // in preparation for the next step. |
| this->CountAndSortParts(c, &partitions, &partition_count, &indices_out, |
| done); |
| if (!c->status().ok()) return; |
| |
| // In order to allocate the output tensor we have to move partition_count |
| // to CPU. |
| auto* stream = c->op_device_context()->stream(); |
| OP_REQUIRES_ASYNC(c, stream, errors::Internal("No GPU stream available."), |
| done); |
| Tensor cpu_tensor; |
| AllocatorAttributes alloc_attr; |
| alloc_attr.set_on_host(true); |
| alloc_attr.set_gpu_compatible(true); |
| OP_REQUIRES_OK_ASYNC( |
| c, |
| c->allocate_temp(partition_count.dtype(), partition_count.shape(), |
| &cpu_tensor, alloc_attr), |
| done); |
| se::DeviceMemoryBase wrapped(partition_count.flat<int32>().data(), |
| num_partitions_ * sizeof(int32)); |
| const bool status = |
| stream |
| ->ThenMemcpy(cpu_tensor.flat<int32>().data(), wrapped, |
| num_partitions_ * sizeof(int32)) |
| .ok(); |
| OP_REQUIRES_ASYNC( |
| c, status, |
| errors::Internal("Failed to launch copy from device to host."), done); |
| |
| // Keep a reference to partition_count so that the buffer |
| // is not deallocated at the end of the function, before |
| // memcpy is completed. |
| TensorReference partition_ref(partition_count); |
| auto wrapped_callback = [this, c, &data, &partitions, indices_out, |
| partition_ref, cpu_tensor, done]() { |
| OpOutputList outputs; |
| this->AllocateOutputs(c, &data, &partitions, &cpu_tensor, &outputs, done); |
| if (!c->status().ok()) { |
| partition_ref.Unref(); |
| return; |
| } |
| int32 N = partitions.NumElements(); |
| int64 slice_size = data.NumElements() / N; |
| this->GatherSlices(c, &data, &indices_out, N, slice_size, outputs); |
| partition_ref.Unref(); |
| done(); |
| }; |
| |
| c->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute( |
| stream, wrapped_callback); |
| } |
| |
| protected: |
| void RadixSort(OpKernelContext* c, const Tensor* partitions, |
| Tensor* indices_in, Tensor* partitions_out, |
| Tensor* indices_out, DoneCallback done) { |
| int32 N = partitions->NumElements(); |
| const GPUDevice& device = c->eigen_device<GPUDevice>(); |
| const cudaStream_t& cu_stream = GetCudaStream(c); |
| |
| // Initialize the indices_in tensor using the Range GPU kernel. |
| RangeInit(device, 0, 1, N, indices_in->flat<int32>()); |
| // Obtain the pointers to inner buffers. |
| const int32* partitions_ptr = partitions->flat<int32>().data(); |
| int32* partitions_out_ptr = partitions_out->flat<int32>().data(); |
| int32* indices_in_ptr = indices_in->flat<int32>().data(); |
| int32* indices_out_ptr = indices_out->flat<int32>().data(); |
| // Determine temporary device storage requirements. |
| Tensor cub_temp_storage; |
| size_t temp_storage_bytes = 0; |
| cub::DeviceRadixSort::SortPairs( |
| NULL, temp_storage_bytes, partitions_ptr, partitions_out_ptr, |
| indices_in_ptr, indices_out_ptr, N, 0, sizeof(int32) * 8, cu_stream); |
| // Allocate temporary storage. |
| OP_REQUIRES_OK_ASYNC( |
| c, |
| c->allocate_temp(DT_INT8, |
| TensorShape({static_cast<int64>(temp_storage_bytes)}), |
| &cub_temp_storage), |
| done); |
| // Radix-sort the partition information. |
| cub::DeviceRadixSort::SortPairs( |
| cub_temp_storage.flat<int8>().data(), temp_storage_bytes, |
| partitions_ptr, partitions_out_ptr, indices_in_ptr, indices_out_ptr, N, |
| 0, sizeof(int32) * 8, cu_stream); |
| } // At this point cub_temp_storage will be marked for deallocation. |
| |
| void CountAndSortParts(OpKernelContext* c, const Tensor* partitions, |
| Tensor* partition_count, Tensor* indices_out, |
| DoneCallback done) { |
| const GPUDevice& device = c->eigen_device<GPUDevice>(); |
| const cudaStream_t& cu_stream = GetCudaStream(c); |
| int32 N = partitions->NumElements(); |
| Tensor indices_in; |
| Tensor partitions_out; |
| Tensor aggregates_out; |
| |
| // Allocate memory for Radix-Sort. |
| this->AllocateTempSpace(c, N, &indices_in, &partitions_out, indices_out, |
| done); |
| if (!c->status().ok()) return; |
| this->RadixSort(c, partitions, &indices_in, &partitions_out, indices_out, |
| done); |
| if (!c->status().ok()) return; |
| // We will now apply a reduce operation to count how many times |
| // each index appears in partitions. |
| |
| // Zero-out the partition_count tensor. |
| functor::SetZeroFunctor<GPUDevice, int32> zero_functor; |
| zero_functor(device, partition_count->flat<int32>()); |
| // Allocate memory for aggregates_out. |
| OP_REQUIRES_OK_ASYNC( |
| c, |
| c->allocate_temp(DT_INT32, TensorShape({num_partitions_}), |
| &aggregates_out), |
| done); |
| // Obtain the pointers to inner buffers. |
| int32* keys_in_ptr = partitions_out.flat<int32>().data(); |
| // Here we reuse the indices_in tensor for the unique keys output. |
| int32* unique_out_ptr = indices_in.flat<int32>().data(); |
| int32* aggregates_out_ptr = aggregates_out.flat<int32>().data(); |
| // We wrap the pointers in bounded output iterators to guard against |
| // wrong inputs (more than num_partitions distinct indices). |
| IdentityOp id_op; |
| BoundedOutputIterator unique_out_it(unique_out_ptr, id_op, num_partitions_); |
| BoundedOutputIterator aggregates_out_it(aggregates_out_ptr, id_op, |
| num_partitions_); |
| |
| cub::ConstantInputIterator<int32> values_in(1); |
| cub::Sum reduction_op; |
| |
| // Allocate space on GPU for the number of runs. This is required by CUB. |
| Tensor num_runs; |
| OP_REQUIRES_OK_ASYNC( |
| c, c->allocate_temp(DT_INT32, TensorShape({1}), &num_runs), done); |
| int32* num_runs_ptr = num_runs.flat<int32>().data(); |
| |
| // Determine temporary device storage requirements |
| Tensor cub_temp_storage; |
| size_t temp_storage_bytes = 0; |
| cub::DeviceReduce::ReduceByKey(NULL, temp_storage_bytes, keys_in_ptr, |
| unique_out_it, values_in, aggregates_out_it, |
| num_runs_ptr, reduction_op, N, cu_stream); |
| // Allocate temporary storage. |
| OP_REQUIRES_OK_ASYNC( |
| c, |
| c->allocate_temp(DT_INT8, |
| TensorShape({static_cast<int64>(temp_storage_bytes)}), |
| &cub_temp_storage), |
| done); |
| // Run reduce-by-key. The effect is that we count how many times |
| // each index appears in partitions. The distinct indices are stored |
| // in unique_out, while the count is stored in aggregates_out. |
| // The total number of distinct indices is stored in num_runs. |
| cub::DeviceReduce::ReduceByKey(cub_temp_storage.flat<int8>().data(), |
| temp_storage_bytes, keys_in_ptr, |
| unique_out_it, values_in, aggregates_out_it, |
| num_runs_ptr, reduction_op, N, cu_stream); |
| // We are not done yet. unique_out only contains the indices that appeared |
| // at least once in partitions. We move each value from aggregates_out |
| // to the corresponding position in partition_count. This will handle |
| // possibly empty parts. |
| MoveValues(device, unique_out_ptr, aggregates_out_ptr, num_runs_ptr, |
| num_partitions_, partition_count->flat<int32>().data()); |
| } // At this point indices_in, partitions_out, aggregates_out |
| // and cub_temp_storage will be marked for deallocation. |
| |
| void GatherSlices(OpKernelContext* c, const Tensor* data, |
| const Tensor* indices, int32 N, int64 slice_size, |
| OpOutputList& outs) { |
| const GPUDevice& device = c->eigen_device<GPUDevice>(); |
| const int32* ind_base = indices->flat<int32>().data(); |
| const T* data_base = data->flat<T>().data(); |
| |
| for (int p = 0; p < num_partitions_; p++) { |
| int32 indices_size = outs[p]->dim_size(0); |
| int64 out_size = outs[p]->NumElements(); |
| T* out_base = outs[p]->flat<T>().data(); |
| if (out_size > 0) |
| CallGatherKernel<T>(device, data_base, ind_base, out_base, N, |
| indices_size, slice_size, out_size); |
| ind_base += indices_size; |
| } |
| } |
| |
| int32 num_partitions_; |
| }; |
| |
| #define REGISTER_DYNAMIC_PARTITION_GPU(T) \ |
| REGISTER_KERNEL_BUILDER( \ |
| Name("DynamicPartition").Device(DEVICE_GPU).TypeConstraint<T>("T"), \ |
| DynamicPartitionOpGPU<T>) |
| |
| TF_CALL_GPU_NUMBER_TYPES(REGISTER_DYNAMIC_PARTITION_GPU); |
| TF_CALL_complex64(REGISTER_DYNAMIC_PARTITION_GPU); |
| TF_CALL_complex128(REGISTER_DYNAMIC_PARTITION_GPU); |
| #undef REGISTER_DYNAMIC_PARTITION_GPU |
| |
| } // namespace tensorflow |
| |
| #endif // GOOGLE_CUDA |