csharp: support slice-by-slice deserialization
diff --git a/src/csharp/Grpc.Core.Api/DeserializationContext.cs b/src/csharp/Grpc.Core.Api/DeserializationContext.cs
index d69e0db..966bcfa 100644
--- a/src/csharp/Grpc.Core.Api/DeserializationContext.cs
+++ b/src/csharp/Grpc.Core.Api/DeserializationContext.cs
@@ -39,7 +39,7 @@
/// Also, allocating a new buffer each time can put excessive pressure on GC, especially if
/// the payload is more than 86700 bytes large (which means the newly allocated buffer will be placed in LOH,
/// and LOH object can only be garbage collected via a full ("stop the world") GC run).
- /// NOTE: Deserializers are expected not to call this method more than once per received message
+ /// NOTE: Deserializers are expected not to call this method (or other payload accessor methods) more than once per received message
/// (as there is no practical reason for doing so) and <c>DeserializationContext</c> implementations are free to assume so.
/// </summary>
/// <returns>byte array containing the entire payload.</returns>
@@ -47,5 +47,22 @@
{
throw new NotImplementedException();
}
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ /// <summary>
+ /// Gets the entire payload as a ReadOnlySequence.
+ /// The ReadOnlySequence is only valid for the duration of the deserializer routine and the caller must not access it after the deserializer returns.
+ /// Using the read only sequence is the most efficient way to access the message payload. Where possible it allows directly
+ /// accessing the received payload without needing to perform any buffer copying or buffer allocations.
+ /// NOTE: This method is only available in the netstandard2.0 build of the library.
+ /// NOTE: Deserializers are expected not to call this method (or other payload accessor methods) more than once per received message
+ /// (as there is no practical reason for doing so) and <c>DeserializationContext</c> implementations are free to assume so.
+ /// </summary>
+ /// <returns>read only sequence containing the entire payload.</returns>
+ public virtual System.Buffers.ReadOnlySequence<byte> PayloadAsReadOnlySequence()
+ {
+ throw new NotImplementedException();
+ }
+#endif
}
}
diff --git a/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj b/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj
index 6c29530..8a32bc7 100755
--- a/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj
+++ b/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj
@@ -19,12 +19,20 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
+ <PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
+ <DefineConstants>$(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY</DefineConstants>
+ </PropertyGroup>
+
<Import Project="..\Grpc.Core\SourceLink.csproj.include" />
<ItemGroup>
<PackageReference Include="System.Interactive.Async" Version="3.2.0" />
</ItemGroup>
+ <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
+ <PackageReference Include="System.Memory" Version="4.5.2" />
+ </ItemGroup>
+
<ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 23e5d7f..7fef2c7 100755
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -8,6 +8,10 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
+ <PropertyGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.1' ">
+ <DefineConstants>$(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY</DefineConstants>
+ </PropertyGroup>
+
<ItemGroup>
<ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
</ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 5c7d48f..fd22161 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -35,6 +35,7 @@
Server server;
FakeNativeCall fakeCall;
AsyncCallServer<string, string> asyncCallServer;
+ FakeBufferReaderManager fakeBufferReaderManager;
[SetUp]
public void Init()
@@ -52,11 +53,13 @@
Marshallers.StringMarshaller.ContextualSerializer, Marshallers.StringMarshaller.ContextualDeserializer,
server);
asyncCallServer.InitializeForTesting(fakeCall);
+ fakeBufferReaderManager = new FakeBufferReaderManager();
}
[TearDown]
public void Cleanup()
{
+ fakeBufferReaderManager.Dispose();
server.ShutdownAsync().Wait();
}
@@ -77,7 +80,7 @@
var moveNextTask = requestStream.MoveNext();
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
Assert.IsFalse(moveNextTask.Result);
AssertFinished(asyncCallServer, fakeCall, finishedTask);
@@ -107,7 +110,7 @@
// if a read completion's success==false, the request stream will silently finish
// and we rely on C core cancelling the call.
var moveNextTask = requestStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse());
Assert.IsFalse(moveNextTask.Result);
fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
@@ -182,5 +185,10 @@
Assert.IsTrue(finishedTask.IsCompleted);
Assert.DoesNotThrow(() => finishedTask.Wait());
}
+
+ IBufferReader CreateNullResponse()
+ {
+ return fakeBufferReaderManager.CreateNullPayloadBufferReader();
+ }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 775849d..78c7f3a 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -21,6 +21,7 @@
using System.Threading.Tasks;
using Grpc.Core.Internal;
+using Grpc.Core.Utils;
using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
@@ -33,6 +34,7 @@
Channel channel;
FakeNativeCall fakeCall;
AsyncCall<string, string> asyncCall;
+ FakeBufferReaderManager fakeBufferReaderManager;
[SetUp]
public void Init()
@@ -43,12 +45,14 @@
var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions());
asyncCall = new AsyncCall<string, string>(callDetails, fakeCall);
+ fakeBufferReaderManager = new FakeBufferReaderManager();
}
[TearDown]
public void Cleanup()
{
channel.ShutdownAsync().Wait();
+ fakeBufferReaderManager.Dispose();
}
[Test]
@@ -87,7 +91,7 @@
var resultTask = asyncCall.UnaryCallAsync("request1");
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
- null,
+ CreateNullResponse(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -168,7 +172,7 @@
var resultTask = asyncCall.ClientStreamingCallAsync();
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.InvalidArgument),
- null,
+ CreateNullResponse(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -214,7 +218,7 @@
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
- null,
+ CreateNullResponse(),
new Metadata());
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
@@ -233,7 +237,7 @@
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
- null,
+ CreateNullResponse(),
new Metadata());
fakeCall.SendCompletionCallback.OnSendCompletion(false);
@@ -259,7 +263,7 @@
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Internal),
- null,
+ CreateNullResponse(),
new Metadata());
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
@@ -357,7 +361,7 @@
fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
CreateClientSideStatus(StatusCode.Cancelled),
- null,
+ CreateNullResponse(),
new Metadata());
AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
@@ -390,7 +394,7 @@
fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata());
Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -405,7 +409,7 @@
// try alternative order of completions
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
}
@@ -417,7 +421,7 @@
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse()); // after a failed read, we rely on C core to deliver appropriate status code.
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
@@ -441,7 +445,7 @@
var readTask3 = responseStream.MoveNext();
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
}
@@ -479,7 +483,7 @@
Assert.DoesNotThrowAsync(async () => await writeTask1);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -493,7 +497,7 @@
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -511,7 +515,7 @@
var responseStream = new ClientResponseStream<string, string>(asyncCall);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -533,7 +537,7 @@
Assert.IsFalse(writeTask.IsCompleted);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
@@ -552,7 +556,7 @@
var writeTask = requestStream.WriteAsync("request1");
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
fakeCall.SendCompletionCallback.OnSendCompletion(false);
@@ -576,7 +580,7 @@
Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
var readTask = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
@@ -597,7 +601,7 @@
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
@@ -618,7 +622,7 @@
Assert.AreEqual("response1", responseStream.Current);
var readTask2 = responseStream.MoveNext();
- fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+ fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
@@ -638,9 +642,14 @@
return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
}
- byte[] CreateResponsePayload()
+ IBufferReader CreateResponsePayload()
{
- return Marshallers.StringMarshaller.Serializer("response1");
+ return fakeBufferReaderManager.CreateSingleSegmentBufferReader(Marshallers.StringMarshaller.Serializer("response1"));
+ }
+
+ IBufferReader CreateNullResponse()
+ {
+ return fakeBufferReaderManager.CreateNullPayloadBufferReader();
}
static void AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)
diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs
new file mode 100644
index 0000000..63baab3
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs
@@ -0,0 +1,240 @@
+#region Copyright notice and license
+
+// Copyright 2019 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+using System.Runtime.InteropServices;
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+using System.Buffers;
+#endif
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class DefaultDeserializationContextTest
+ {
+ FakeBufferReaderManager fakeBufferReaderManager;
+
+ [SetUp]
+ public void Init()
+ {
+ fakeBufferReaderManager = new FakeBufferReaderManager();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ fakeBufferReaderManager.Dispose();
+ }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ [TestCase]
+ public void PayloadAsReadOnlySequence_ZeroSegmentPayload()
+ {
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {}));
+
+ Assert.AreEqual(0, context.PayloadLength);
+
+ var sequence = context.PayloadAsReadOnlySequence();
+
+ Assert.AreEqual(ReadOnlySequence<byte>.Empty, sequence);
+ Assert.IsTrue(sequence.IsEmpty);
+ Assert.IsTrue(sequence.IsSingleSegment);
+ }
+
+ [TestCase(0)]
+ [TestCase(1)]
+ [TestCase(10)]
+ [TestCase(100)]
+ [TestCase(1000)]
+ public void PayloadAsReadOnlySequence_SingleSegmentPayload(int segmentLength)
+ {
+ var origBuffer = GetTestBuffer(segmentLength);
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+ Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+ var sequence = context.PayloadAsReadOnlySequence();
+
+ Assert.AreEqual(origBuffer.Length, sequence.Length);
+ Assert.AreEqual(origBuffer.Length, sequence.First.Length);
+ Assert.IsTrue(sequence.IsSingleSegment);
+ CollectionAssert.AreEqual(origBuffer, sequence.First.ToArray());
+ }
+
+ [TestCase(0, 5, 10)]
+ [TestCase(1, 1, 1)]
+ [TestCase(10, 100, 1000)]
+ [TestCase(100, 100, 10)]
+ [TestCase(1000, 1000, 1000)]
+ public void PayloadAsReadOnlySequence_MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3)
+ {
+ var origBuffer1 = GetTestBuffer(segmentLen1);
+ var origBuffer2 = GetTestBuffer(segmentLen2);
+ var origBuffer3 = GetTestBuffer(segmentLen3);
+ int totalLen = origBuffer1.Length + origBuffer2.Length + origBuffer3.Length;
+
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer1, origBuffer2, origBuffer3 }));
+
+ Assert.AreEqual(totalLen, context.PayloadLength);
+
+ var sequence = context.PayloadAsReadOnlySequence();
+
+ Assert.AreEqual(totalLen, sequence.Length);
+
+ var segmentEnumerator = sequence.GetEnumerator();
+
+ Assert.IsTrue(segmentEnumerator.MoveNext());
+ CollectionAssert.AreEqual(origBuffer1, segmentEnumerator.Current.ToArray());
+
+ Assert.IsTrue(segmentEnumerator.MoveNext());
+ CollectionAssert.AreEqual(origBuffer2, segmentEnumerator.Current.ToArray());
+
+ Assert.IsTrue(segmentEnumerator.MoveNext());
+ CollectionAssert.AreEqual(origBuffer3, segmentEnumerator.Current.ToArray());
+
+ Assert.IsFalse(segmentEnumerator.MoveNext());
+ }
+#endif
+
+ [TestCase]
+ public void NullPayloadNotAllowed()
+ {
+ var context = new DefaultDeserializationContext();
+ Assert.Throws(typeof(InvalidOperationException), () => context.Initialize(fakeBufferReaderManager.CreateNullPayloadBufferReader()));
+ }
+
+ [TestCase]
+ public void PayloadAsNewByteBuffer_ZeroSegmentPayload()
+ {
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {}));
+
+ Assert.AreEqual(0, context.PayloadLength);
+
+ var payload = context.PayloadAsNewBuffer();
+ Assert.AreEqual(0, payload.Length);
+ }
+
+ [TestCase(0)]
+ [TestCase(1)]
+ [TestCase(10)]
+ [TestCase(100)]
+ [TestCase(1000)]
+ public void PayloadAsNewByteBuffer_SingleSegmentPayload(int segmentLength)
+ {
+ var origBuffer = GetTestBuffer(segmentLength);
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+ Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+ var payload = context.PayloadAsNewBuffer();
+ CollectionAssert.AreEqual(origBuffer, payload);
+ }
+
+ [TestCase(0, 5, 10)]
+ [TestCase(1, 1, 1)]
+ [TestCase(10, 100, 1000)]
+ [TestCase(100, 100, 10)]
+ [TestCase(1000, 1000, 1000)]
+ public void PayloadAsNewByteBuffer_MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3)
+ {
+ var origBuffer1 = GetTestBuffer(segmentLen1);
+ var origBuffer2 = GetTestBuffer(segmentLen2);
+ var origBuffer3 = GetTestBuffer(segmentLen3);
+
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer1, origBuffer2, origBuffer3 }));
+
+ var payload = context.PayloadAsNewBuffer();
+
+ var concatenatedOrigBuffers = new List<byte>();
+ concatenatedOrigBuffers.AddRange(origBuffer1);
+ concatenatedOrigBuffers.AddRange(origBuffer2);
+ concatenatedOrigBuffers.AddRange(origBuffer3);
+
+ Assert.AreEqual(concatenatedOrigBuffers.Count, context.PayloadLength);
+ Assert.AreEqual(concatenatedOrigBuffers.Count, payload.Length);
+ CollectionAssert.AreEqual(concatenatedOrigBuffers, payload);
+ }
+
+ [TestCase]
+ public void GetPayloadMultipleTimesIsIllegal()
+ {
+ var origBuffer = GetTestBuffer(100);
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+ Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+ var payload = context.PayloadAsNewBuffer();
+ CollectionAssert.AreEqual(origBuffer, payload);
+
+ // Getting payload multiple times is illegal
+ Assert.Throws(typeof(InvalidOperationException), () => context.PayloadAsNewBuffer());
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ Assert.Throws(typeof(InvalidOperationException), () => context.PayloadAsReadOnlySequence());
+#endif
+ }
+
+ [TestCase]
+ public void ResetContextAndReinitialize()
+ {
+ var origBuffer = GetTestBuffer(100);
+ var context = new DefaultDeserializationContext();
+ context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+ Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+ // Reset invalidates context
+ context.Reset();
+
+ Assert.AreEqual(0, context.PayloadLength);
+ Assert.Throws(typeof(NullReferenceException), () => context.PayloadAsNewBuffer());
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ Assert.Throws(typeof(NullReferenceException), () => context.PayloadAsReadOnlySequence());
+#endif
+
+ // Previously reset context can be initialized again
+ var origBuffer2 = GetTestBuffer(50);
+ context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer2));
+
+ Assert.AreEqual(origBuffer2.Length, context.PayloadLength);
+ CollectionAssert.AreEqual(origBuffer2, context.PayloadAsNewBuffer());
+ }
+
+ private byte[] GetTestBuffer(int length)
+ {
+ var testBuffer = new byte[length];
+ for (int i = 0; i < testBuffer.Length; i++)
+ {
+ testBuffer[i] = (byte) i;
+ }
+ return testBuffer;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs
new file mode 100644
index 0000000..d8d0c0a
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs
@@ -0,0 +1,118 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal.Tests
+{
+ // Creates instances of fake IBufferReader. All created instances will become invalid once Dispose is called.
+ internal class FakeBufferReaderManager : IDisposable
+ {
+ List<GCHandle> pinnedHandles = new List<GCHandle>();
+ bool disposed = false;
+ public IBufferReader CreateSingleSegmentBufferReader(byte[] data)
+ {
+ return CreateMultiSegmentBufferReader(new List<byte[]> { data });
+ }
+
+ public IBufferReader CreateMultiSegmentBufferReader(IEnumerable<byte[]> dataSegments)
+ {
+ GrpcPreconditions.CheckState(!disposed);
+ GrpcPreconditions.CheckNotNull(dataSegments);
+ var segments = new List<GCHandle>();
+ foreach (var data in dataSegments)
+ {
+ GrpcPreconditions.CheckNotNull(data);
+ segments.Add(GCHandle.Alloc(data, GCHandleType.Pinned));
+ }
+ pinnedHandles.AddRange(segments); // all the allocated GCHandles will be freed on Dispose()
+ return new FakeBufferReader(segments);
+ }
+
+ public IBufferReader CreateNullPayloadBufferReader()
+ {
+ GrpcPreconditions.CheckState(!disposed);
+ return new FakeBufferReader(null);
+ }
+
+ public void Dispose()
+ {
+ if (!disposed)
+ {
+ disposed = true;
+ for (int i = 0; i < pinnedHandles.Count; i++)
+ {
+ pinnedHandles[i].Free();
+ }
+ }
+ }
+
+ private class FakeBufferReader : IBufferReader
+ {
+ readonly List<GCHandle> bufferSegments;
+ readonly int? totalLength;
+ readonly IEnumerator<GCHandle> segmentEnumerator;
+
+ public FakeBufferReader(List<GCHandle> bufferSegments)
+ {
+ this.bufferSegments = bufferSegments;
+ this.totalLength = ComputeTotalLength(bufferSegments);
+ this.segmentEnumerator = bufferSegments?.GetEnumerator();
+ }
+
+ public int? TotalLength => totalLength;
+
+ public bool TryGetNextSlice(out Slice slice)
+ {
+ GrpcPreconditions.CheckNotNull(bufferSegments);
+ if (!segmentEnumerator.MoveNext())
+ {
+ slice = default(Slice);
+ return false;
+ }
+
+ var segment = segmentEnumerator.Current;
+ int sliceLen = ((byte[]) segment.Target).Length;
+ slice = new Slice(segment.AddrOfPinnedObject(), sliceLen);
+ return true;
+ }
+
+ static int? ComputeTotalLength(List<GCHandle> bufferSegments)
+ {
+ if (bufferSegments == null)
+ {
+ return null;
+ }
+
+ int sum = 0;
+ foreach (var segment in bufferSegments)
+ {
+ var data = (byte[]) segment.Target;
+ sum += data.Length;
+ }
+ return sum;
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs
new file mode 100644
index 0000000..7c4ff65
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs
@@ -0,0 +1,121 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class FakeBufferReaderManagerTest
+ {
+ FakeBufferReaderManager fakeBufferReaderManager;
+
+ [SetUp]
+ public void Init()
+ {
+ fakeBufferReaderManager = new FakeBufferReaderManager();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ fakeBufferReaderManager.Dispose();
+ }
+
+ [TestCase]
+ public void NullPayload()
+ {
+ var fakeBufferReader = fakeBufferReaderManager.CreateNullPayloadBufferReader();
+ Assert.IsFalse(fakeBufferReader.TotalLength.HasValue);
+ Assert.Throws(typeof(ArgumentNullException), () => fakeBufferReader.TryGetNextSlice(out Slice slice));
+ }
+ [TestCase]
+ public void ZeroSegmentPayload()
+ {
+ var fakeBufferReader = fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {});
+ Assert.AreEqual(0, fakeBufferReader.TotalLength.Value);
+ Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice));
+ }
+
+ [TestCase(0)]
+ [TestCase(1)]
+ [TestCase(10)]
+ [TestCase(30)]
+ [TestCase(100)]
+ [TestCase(1000)]
+ public void SingleSegmentPayload(int bufferLen)
+ {
+ var origBuffer = GetTestBuffer(bufferLen);
+ var fakeBufferReader = fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer);
+ Assert.AreEqual(origBuffer.Length, fakeBufferReader.TotalLength.Value);
+
+ Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice));
+ AssertSliceDataEqual(origBuffer, slice);
+
+ Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice2));
+ }
+
+ [TestCase(0, 5, 10)]
+ [TestCase(1, 1, 1)]
+ [TestCase(10, 100, 1000)]
+ [TestCase(100, 100, 10)]
+ [TestCase(1000, 1000, 1000)]
+ public void MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3)
+ {
+ var origBuffer1 = GetTestBuffer(segmentLen1);
+ var origBuffer2 = GetTestBuffer(segmentLen2);
+ var origBuffer3 = GetTestBuffer(segmentLen3);
+ var fakeBufferReader = fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer1, origBuffer2, origBuffer3 });
+
+ Assert.AreEqual(origBuffer1.Length + origBuffer2.Length + origBuffer3.Length, fakeBufferReader.TotalLength.Value);
+
+ Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice1));
+ AssertSliceDataEqual(origBuffer1, slice1);
+
+ Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice2));
+ AssertSliceDataEqual(origBuffer2, slice2);
+
+ Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice3));
+ AssertSliceDataEqual(origBuffer3, slice3);
+
+ Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice4));
+ }
+
+ private void AssertSliceDataEqual(byte[] expected, Slice actual)
+ {
+ var actualSliceData = new byte[actual.Length];
+ actual.CopyTo(new ArraySegment<byte>(actualSliceData));
+ CollectionAssert.AreEqual(expected, actualSliceData);
+ }
+
+ // create a buffer of given size and fill it with some data
+ private byte[] GetTestBuffer(int length)
+ {
+ var testBuffer = new byte[length];
+ for (int i = 0; i < testBuffer.Length; i++)
+ {
+ testBuffer[i] = (byte) i;
+ }
+ return testBuffer;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs b/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs
new file mode 100644
index 0000000..7630785
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs
@@ -0,0 +1,151 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+using System.Runtime.InteropServices;
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+using System.Buffers;
+#endif
+
+namespace Grpc.Core.Internal.Tests
+{
+ // Converts IBufferReader into instances of ReadOnlySequence<byte>
+ // Objects representing the sequence segments are cached to decrease GC load.
+ public class ReusableSliceBufferTest
+ {
+ FakeBufferReaderManager fakeBufferReaderManager;
+
+ [SetUp]
+ public void Init()
+ {
+ fakeBufferReaderManager = new FakeBufferReaderManager();
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ fakeBufferReaderManager.Dispose();
+ }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ [TestCase]
+ public void NullPayload()
+ {
+ var sliceBuffer = new ReusableSliceBuffer();
+ Assert.Throws(typeof(ArgumentNullException), () => sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateNullPayloadBufferReader()));
+ }
+
+ [TestCase]
+ public void ZeroSegmentPayload()
+ {
+ var sliceBuffer = new ReusableSliceBuffer();
+ var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {}));
+
+ Assert.AreEqual(ReadOnlySequence<byte>.Empty, sequence);
+ Assert.IsTrue(sequence.IsEmpty);
+ Assert.IsTrue(sequence.IsSingleSegment);
+ }
+
+ [TestCase]
+ public void SegmentsAreCached()
+ {
+ var bufferSegments1 = Enumerable.Range(0, 100).Select((_) => GetTestBuffer(50)).ToList();
+ var bufferSegments2 = Enumerable.Range(0, 100).Select((_) => GetTestBuffer(50)).ToList();
+
+ var sliceBuffer = new ReusableSliceBuffer();
+
+ var sequence1 = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments1));
+ var memoryManagers1 = GetMemoryManagersForSequenceSegments(sequence1);
+
+ sliceBuffer.Invalidate();
+
+ var sequence2 = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments2));
+ var memoryManagers2 = GetMemoryManagersForSequenceSegments(sequence2);
+
+ // check memory managers are identical objects (i.e. they've been reused)
+ CollectionAssert.AreEquivalent(memoryManagers1, memoryManagers2);
+ }
+
+ [TestCase]
+ public void MultiSegmentPayload_LotsOfSegments()
+ {
+ var bufferSegments = Enumerable.Range(0, ReusableSliceBuffer.MaxCachedSegments + 100).Select((_) => GetTestBuffer(10)).ToList();
+
+ var sliceBuffer = new ReusableSliceBuffer();
+ var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments));
+
+ int index = 0;
+ foreach (var memory in sequence)
+ {
+ CollectionAssert.AreEqual(bufferSegments[index], memory.ToArray());
+ index ++;
+ }
+ }
+
+ [TestCase]
+ public void InvalidateMakesSequenceUnusable()
+ {
+ var origBuffer = GetTestBuffer(100);
+
+ var sliceBuffer = new ReusableSliceBuffer();
+ var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer }));
+
+ Assert.AreEqual(origBuffer.Length, sequence.Length);
+
+ sliceBuffer.Invalidate();
+
+ // Invalidate with make the returned sequence completely unusable and broken, users must not use it beyond the deserializer functions.
+ Assert.Throws(typeof(ArgumentOutOfRangeException), () => { var first = sequence.First; });
+ }
+
+ private List<MemoryManager<byte>> GetMemoryManagersForSequenceSegments(ReadOnlySequence<byte> sequence)
+ {
+ var result = new List<MemoryManager<byte>>();
+ foreach (var memory in sequence)
+ {
+ Assert.IsTrue(MemoryMarshal.TryGetMemoryManager(memory, out MemoryManager<byte> memoryManager));
+ result.Add(memoryManager);
+ }
+ return result;
+ }
+#else
+ [TestCase]
+ public void OnlySupportedOnNetCore()
+ {
+ // Test case needs to exist to make C# sanity test happy.
+ }
+#endif
+ private byte[] GetTestBuffer(int length)
+ {
+ var testBuffer = new byte[length];
+ for (int i = 0; i < testBuffer.Length; i++)
+ {
+ testBuffer[i] = (byte) i;
+ }
+ return testBuffer;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs b/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs
new file mode 100644
index 0000000..eb090bb
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs
@@ -0,0 +1,83 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+using System.Runtime.InteropServices;
+
+namespace Grpc.Core.Internal.Tests
+{
+ public class SliceTest
+ {
+ [TestCase(0)]
+ [TestCase(1)]
+ [TestCase(10)]
+ [TestCase(100)]
+ [TestCase(1000)]
+ public void SliceFromNativePtr_CopyToArraySegment(int bufferLength)
+ {
+ var origBuffer = GetTestBuffer(bufferLength);
+ var gcHandle = GCHandle.Alloc(origBuffer, GCHandleType.Pinned);
+ try
+ {
+ var slice = new Slice(gcHandle.AddrOfPinnedObject(), origBuffer.Length);
+ Assert.AreEqual(bufferLength, slice.Length);
+
+ var newBuffer = new byte[bufferLength];
+ slice.CopyTo(new ArraySegment<byte>(newBuffer));
+ CollectionAssert.AreEqual(origBuffer, newBuffer);
+ }
+ finally
+ {
+ gcHandle.Free();
+ }
+ }
+
+ [TestCase]
+ public void SliceFromNativePtr_CopyToArraySegmentTooSmall()
+ {
+ var origBuffer = GetTestBuffer(100);
+ var gcHandle = GCHandle.Alloc(origBuffer, GCHandleType.Pinned);
+ try
+ {
+ var slice = new Slice(gcHandle.AddrOfPinnedObject(), origBuffer.Length);
+ var tooSmall = new byte[origBuffer.Length - 1];
+ Assert.Catch(typeof(ArgumentException), () => slice.CopyTo(new ArraySegment<byte>(tooSmall)));
+ }
+ finally
+ {
+ gcHandle.Free();
+ }
+ }
+
+ // create a buffer of given size and fill it with some data
+ private byte[] GetTestBuffer(int length)
+ {
+ var testBuffer = new byte[length];
+ for (int i = 0; i < testBuffer.Length; i++)
+ {
+ testBuffer[i] = (byte) i;
+ }
+ return testBuffer;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index b7c191e..afd60e7 100755
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -19,6 +19,15 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
</PropertyGroup>
+ <PropertyGroup>
+ <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
+ </PropertyGroup>
+
+ <PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
+ <LangVersion>7.2</LangVersion>
+ <DefineConstants>$(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY</DefineConstants>
+ </PropertyGroup>
+
<ItemGroup>
<Compile Include="..\Grpc.Core.Api\Version.cs" />
</ItemGroup>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 785081c..a1c6881 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -111,7 +111,7 @@
{
using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
{
- HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+ HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessageReader(), ctx.GetReceivedInitialMetadata());
}
}
catch (Exception e)
@@ -537,14 +537,14 @@
/// <summary>
/// Handler for unary response completion.
/// </summary>
- private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
+ private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)
{
// NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
// success will be always set to true.
TaskCompletionSource<object> delayedStreamingWriteTcs = null;
TResponse msg = default(TResponse);
- var deserializeException = TryDeserialize(receivedMessage, out msg);
+ var deserializeException = TryDeserialize(receivedMessageReader, out msg);
bool releasedResources;
lock (myLock)
@@ -634,9 +634,9 @@
IUnaryResponseClientCallback UnaryResponseClientCallback => this;
- void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
+ void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)
{
- HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders);
+ HandleUnaryResponse(success, receivedStatus, receivedMessageReader, responseHeaders);
}
IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this;
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 39c9f7c..9497371 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -228,12 +228,12 @@
}
}
- protected Exception TryDeserialize(byte[] payload, out TRead msg)
+ protected Exception TryDeserialize(IBufferReader reader, out TRead msg)
{
DefaultDeserializationContext context = null;
try
{
- context = DefaultDeserializationContext.GetInitializedThreadLocal(payload);
+ context = DefaultDeserializationContext.GetInitializedThreadLocal(reader);
msg = deserializer(context);
return null;
}
@@ -245,7 +245,6 @@
finally
{
context?.Reset();
-
}
}
@@ -333,21 +332,21 @@
/// <summary>
/// Handles streaming read completion.
/// </summary>
- protected void HandleReadFinished(bool success, byte[] receivedMessage)
+ protected void HandleReadFinished(bool success, IBufferReader receivedMessageReader)
{
// if success == false, received message will be null. It that case we will
// treat this completion as the last read an rely on C core to handle the failed
// read (e.g. deliver approriate statusCode on the clientside).
TRead msg = default(TRead);
- var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
+ var deserializeException = (success && receivedMessageReader.TotalLength.HasValue) ? TryDeserialize(receivedMessageReader, out msg) : null;
TaskCompletionSource<TRead> origTcs = null;
bool releasedResources;
lock (myLock)
{
origTcs = streamingReadTcs;
- if (receivedMessage == null)
+ if (!receivedMessageReader.TotalLength.HasValue)
{
// This was the last read.
readingDone = true;
@@ -391,9 +390,9 @@
IReceivedMessageCallback ReceivedMessageCallback => this;
- void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)
+ void IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader)
{
- HandleReadFinished(success, receivedMessage);
+ HandleReadFinished(success, receivedMessageReader);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 085e7fa..61af26e 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -30,10 +30,17 @@
void OnComplete(bool success);
}
+ internal interface IBufferReader
+ {
+ int? TotalLength { get; }
+
+ bool TryGetNextSlice(out Slice slice);
+ }
+
/// <summary>
/// grpcsharp_batch_context
/// </summary>
- internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<BatchContextSafeHandle>
+ internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<BatchContextSafeHandle>, IBufferReader
{
static readonly NativeMethods Native = NativeMethods.Get();
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
@@ -106,6 +113,25 @@
return data;
}
+ public bool GetReceivedMessageNextSlicePeek(out Slice slice)
+ {
+ UIntPtr sliceLen;
+ IntPtr sliceDataPtr;
+
+ if (0 == Native.grpcsharp_batch_context_recv_message_next_slice_peek(this, out sliceLen, out sliceDataPtr))
+ {
+ slice = default(Slice);
+ return false;
+ }
+ slice = new Slice(sliceDataPtr, (int) sliceLen);
+ return true;
+ }
+
+ public IBufferReader GetReceivedMessageReader()
+ {
+ return this;
+ }
+
// Gets data of receive_close_on_server completion.
public bool GetReceivedCloseOnServerCancelled()
{
@@ -153,6 +179,20 @@
}
}
+ int? IBufferReader.TotalLength
+ {
+ get
+ {
+ var len = Native.grpcsharp_batch_context_recv_message_length(this);
+ return len != new IntPtr(-1) ? (int?) len : null;
+ }
+ }
+
+ bool IBufferReader.TryGetNextSlice(out Slice slice)
+ {
+ return GetReceivedMessageNextSlicePeek(out slice);
+ }
+
struct CompletionCallbackData
{
public CompletionCallbackData(BatchCompletionDelegate callback, object state)
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index a3ef3e6..858d2a6 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -35,11 +35,11 @@
// Completion handlers are pre-allocated to avoid unneccessary delegate allocations.
// The "state" field is used to store the actual callback to invoke.
static readonly BatchCompletionDelegate CompletionHandler_IUnaryResponseClientCallback =
- (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata());
+ (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessageReader(), context.GetReceivedInitialMetadata());
static readonly BatchCompletionDelegate CompletionHandler_IReceivedStatusOnClientCallback =
(success, context, state) => ((IReceivedStatusOnClientCallback)state).OnReceivedStatusOnClient(success, context.GetReceivedStatusOnClient());
static readonly BatchCompletionDelegate CompletionHandler_IReceivedMessageCallback =
- (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessage());
+ (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessageReader());
static readonly BatchCompletionDelegate CompletionHandler_IReceivedResponseHeadersCallback =
(success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata());
static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback =
diff --git a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
index 7ace80e..bac7bbe 100644
--- a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
+++ b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
@@ -20,6 +20,10 @@
using System;
using System.Threading;
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+using System.Buffers;
+#endif
+
namespace Grpc.Core.Internal
{
internal class DefaultDeserializationContext : DeserializationContext
@@ -27,40 +31,74 @@
static readonly ThreadLocal<DefaultDeserializationContext> threadLocalInstance =
new ThreadLocal<DefaultDeserializationContext>(() => new DefaultDeserializationContext(), false);
- byte[] payload;
- bool alreadyCalledPayloadAsNewBuffer;
+ IBufferReader bufferReader;
+ int payloadLength;
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ ReusableSliceBuffer cachedSliceBuffer = new ReusableSliceBuffer();
+#endif
public DefaultDeserializationContext()
{
Reset();
}
- public override int PayloadLength => payload.Length;
+ public override int PayloadLength => payloadLength;
public override byte[] PayloadAsNewBuffer()
{
- GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer);
- alreadyCalledPayloadAsNewBuffer = true;
- return payload;
+ var buffer = new byte[payloadLength];
+ FillContinguousBuffer(bufferReader, buffer);
+ return buffer;
}
- public void Initialize(byte[] payload)
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ public override ReadOnlySequence<byte> PayloadAsReadOnlySequence()
{
- this.payload = GrpcPreconditions.CheckNotNull(payload);
- this.alreadyCalledPayloadAsNewBuffer = false;
+ var sequence = cachedSliceBuffer.PopulateFrom(bufferReader);
+ GrpcPreconditions.CheckState(sequence.Length == payloadLength);
+ return sequence;
+ }
+#endif
+
+ public void Initialize(IBufferReader bufferReader)
+ {
+ this.bufferReader = GrpcPreconditions.CheckNotNull(bufferReader);
+ this.payloadLength = bufferReader.TotalLength.Value; // payload must not be null
}
public void Reset()
{
- this.payload = null;
- this.alreadyCalledPayloadAsNewBuffer = true; // mark payload as read
+ this.bufferReader = null;
+ this.payloadLength = 0;
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ this.cachedSliceBuffer.Invalidate();
+#endif
}
- public static DefaultDeserializationContext GetInitializedThreadLocal(byte[] payload)
+ public static DefaultDeserializationContext GetInitializedThreadLocal(IBufferReader bufferReader)
{
var instance = threadLocalInstance.Value;
- instance.Initialize(payload);
+ instance.Initialize(bufferReader);
return instance;
}
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ private void FillContinguousBuffer(IBufferReader reader, byte[] destination)
+ {
+ PayloadAsReadOnlySequence().CopyTo(new Span<byte>(destination));
+ }
+#else
+ private void FillContinguousBuffer(IBufferReader reader, byte[] destination)
+ {
+ int offset = 0;
+ while (reader.TryGetNextSlice(out Slice slice))
+ {
+ slice.CopyTo(new ArraySegment<byte>(destination, offset, (int)slice.Length));
+ offset += (int)slice.Length;
+ }
+ // check that we filled the entire destination
+ GrpcPreconditions.CheckState(offset == payloadLength);
+ }
+#endif
}
}
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index 5c35b2b..98117c6 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -22,7 +22,7 @@
{
internal interface IUnaryResponseClientCallback
{
- void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
+ void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders);
}
// Received status for streaming response calls.
@@ -33,7 +33,7 @@
internal interface IReceivedMessageCallback
{
- void OnReceivedMessage(bool success, byte[] receivedMessage);
+ void OnReceivedMessage(bool success, IBufferReader receivedMessageReader);
}
internal interface IReceivedResponseHeadersCallback
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs
index a1387af..9752a8e 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs
@@ -41,6 +41,7 @@
public readonly Delegates.grpcsharp_batch_context_recv_initial_metadata_delegate grpcsharp_batch_context_recv_initial_metadata;
public readonly Delegates.grpcsharp_batch_context_recv_message_length_delegate grpcsharp_batch_context_recv_message_length;
public readonly Delegates.grpcsharp_batch_context_recv_message_to_buffer_delegate grpcsharp_batch_context_recv_message_to_buffer;
+ public readonly Delegates.grpcsharp_batch_context_recv_message_next_slice_peek_delegate grpcsharp_batch_context_recv_message_next_slice_peek;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate grpcsharp_batch_context_recv_status_on_client_status;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details;
public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
@@ -142,6 +143,7 @@
this.grpcsharp_batch_context_recv_initial_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_initial_metadata_delegate>(library);
this.grpcsharp_batch_context_recv_message_length = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_message_length_delegate>(library);
this.grpcsharp_batch_context_recv_message_to_buffer = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_message_to_buffer_delegate>(library);
+ this.grpcsharp_batch_context_recv_message_next_slice_peek = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_message_next_slice_peek_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_status = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library);
this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library);
@@ -242,6 +244,7 @@
this.grpcsharp_batch_context_recv_initial_metadata = DllImportsFromStaticLib.grpcsharp_batch_context_recv_initial_metadata;
this.grpcsharp_batch_context_recv_message_length = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_length;
this.grpcsharp_batch_context_recv_message_to_buffer = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_to_buffer;
+ this.grpcsharp_batch_context_recv_message_next_slice_peek = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_next_slice_peek;
this.grpcsharp_batch_context_recv_status_on_client_status = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_status;
this.grpcsharp_batch_context_recv_status_on_client_details = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_details;
this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
@@ -342,6 +345,7 @@
this.grpcsharp_batch_context_recv_initial_metadata = DllImportsFromSharedLib.grpcsharp_batch_context_recv_initial_metadata;
this.grpcsharp_batch_context_recv_message_length = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_length;
this.grpcsharp_batch_context_recv_message_to_buffer = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_to_buffer;
+ this.grpcsharp_batch_context_recv_message_next_slice_peek = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_next_slice_peek;
this.grpcsharp_batch_context_recv_status_on_client_status = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_status;
this.grpcsharp_batch_context_recv_status_on_client_details = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_details;
this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
@@ -445,6 +449,7 @@
public delegate IntPtr grpcsharp_batch_context_recv_initial_metadata_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_recv_message_length_delegate(BatchContextSafeHandle ctx);
public delegate void grpcsharp_batch_context_recv_message_to_buffer_delegate(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
+ public delegate int grpcsharp_batch_context_recv_message_next_slice_peek_delegate(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr);
public delegate StatusCode grpcsharp_batch_context_recv_status_on_client_status_delegate(BatchContextSafeHandle ctx);
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength);
public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx);
@@ -565,6 +570,9 @@
public static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport(ImportName)]
+ public static extern int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr);
+
+ [DllImport(ImportName)]
public static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
[DllImport(ImportName)]
@@ -861,6 +869,9 @@
public static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
[DllImport(ImportName)]
+ public static extern int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr);
+
+ [DllImport(ImportName)]
public static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
[DllImport(ImportName)]
diff --git a/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs b/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs
new file mode 100644
index 0000000..fb8d2e7
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs
@@ -0,0 +1,148 @@
+#region Copyright notice and license
+
+// Copyright 2019 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+
+using Grpc.Core.Utils;
+using System;
+using System.Threading;
+
+using System.Buffers;
+
+namespace Grpc.Core.Internal
+{
+ internal class ReusableSliceBuffer
+ {
+ public const int MaxCachedSegments = 1024; // ~4MB payload for 4K slices
+
+ readonly SliceSegment[] cachedSegments = new SliceSegment[MaxCachedSegments];
+ int populatedSegmentCount = 0;
+
+ public ReadOnlySequence<byte> PopulateFrom(IBufferReader bufferReader)
+ {
+ long offset = 0;
+ int index = 0;
+ SliceSegment prevSegment = null;
+ while (bufferReader.TryGetNextSlice(out Slice slice))
+ {
+ // Initialize cached segment if still null or just allocate a new segment if we already reached MaxCachedSegments
+ var current = index < cachedSegments.Length ? cachedSegments[index] : new SliceSegment();
+ if (current == null)
+ {
+ current = cachedSegments[index] = new SliceSegment();
+ }
+
+ current.Reset(slice, offset);
+ prevSegment?.SetNext(current);
+
+ index ++;
+ offset += slice.Length;
+ prevSegment = current;
+ }
+ populatedSegmentCount = index;
+
+ // Not necessary for ending the ReadOnlySequence, but for making sure we
+ // don't keep more than MaxCachedSegments alive.
+ prevSegment?.SetNext(null);
+
+ if (index == 0)
+ {
+ return ReadOnlySequence<byte>.Empty;
+ }
+
+ var firstSegment = cachedSegments[0];
+ var lastSegment = prevSegment;
+ return new ReadOnlySequence<byte>(firstSegment, 0, lastSegment, lastSegment.Memory.Length);
+ }
+
+ public void Invalidate()
+ {
+ if (populatedSegmentCount == 0)
+ {
+ return;
+ }
+ var segment = cachedSegments[0];
+ while (segment != null)
+ {
+ segment.Reset(new Slice(IntPtr.Zero, 0), 0);
+ segment.SetNext(null);
+ segment = (SliceSegment) segment.Next;
+ }
+ populatedSegmentCount = 0;
+ }
+
+ // Represents a segment in ReadOnlySequence
+ // Segment is backed by Slice and the instances are reusable.
+ private class SliceSegment : ReadOnlySequenceSegment<byte>
+ {
+ readonly SliceMemoryManager pointerMemoryManager = new SliceMemoryManager();
+
+ public void Reset(Slice slice, long runningIndex)
+ {
+ pointerMemoryManager.Reset(slice);
+ Memory = pointerMemoryManager.Memory; // maybe not always necessary
+ RunningIndex = runningIndex;
+ }
+
+ public void SetNext(ReadOnlySequenceSegment<byte> next)
+ {
+ Next = next;
+ }
+ }
+
+ // Allow creating instances of Memory<byte> from Slice.
+ // Represents a chunk of native memory, but doesn't manage its lifetime.
+ // Instances of this class are reuseable - they can be reset to point to a different memory chunk.
+ // That is important to make the instances cacheable (rather then creating new instances
+ // the old ones will be reused to reduce GC pressure).
+ private class SliceMemoryManager : MemoryManager<byte>
+ {
+ private Slice slice;
+
+ public void Reset(Slice slice)
+ {
+ this.slice = slice;
+ }
+
+ public void Reset()
+ {
+ Reset(new Slice(IntPtr.Zero, 0));
+ }
+
+ public override Span<byte> GetSpan()
+ {
+ return slice.ToSpanUnsafe();
+ }
+
+ public override MemoryHandle Pin(int elementIndex = 0)
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void Unpin()
+ {
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ // NOP
+ }
+ }
+ }
+}
+#endif
diff --git a/src/csharp/Grpc.Core/Internal/Slice.cs b/src/csharp/Grpc.Core/Internal/Slice.cs
new file mode 100644
index 0000000..22eb953
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/Slice.cs
@@ -0,0 +1,68 @@
+#region Copyright notice and license
+
+// Copyright 2019 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+ /// <summary>
+ /// Slice of native memory.
+ /// Rough equivalent of grpc_slice (but doesn't support inlined slices, just a pointer to data and length)
+ /// </summary>
+ internal struct Slice
+ {
+ private readonly IntPtr dataPtr;
+ private readonly int length;
+
+ public Slice(IntPtr dataPtr, int length)
+ {
+ this.dataPtr = dataPtr;
+ this.length = length;
+ }
+
+ public int Length => length;
+
+ // copies data of the slice to given span.
+ // there needs to be enough space in the destination buffer
+ public void CopyTo(ArraySegment<byte> destination)
+ {
+ Marshal.Copy(dataPtr, destination.Array, destination.Offset, length);
+ }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+ public Span<byte> ToSpanUnsafe()
+ {
+ unsafe
+ {
+ return new Span<byte>((byte*) dataPtr, length);
+ }
+ }
+#endif
+
+ /// <summary>
+ /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Internal.Slice"/>.
+ /// </summary>
+ public override string ToString()
+ {
+ return string.Format("[Slice: dataPtr={0}, length={1}]", dataPtr, length);
+ }
+ }
+}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 91d3957..b42e1e8 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -59,12 +59,16 @@
} send_status_from_server;
grpc_metadata_array recv_initial_metadata;
grpc_byte_buffer* recv_message;
+ grpc_byte_buffer_reader* recv_message_reader;
struct {
grpc_metadata_array trailing_metadata;
grpc_status_code status;
grpc_slice status_details;
} recv_status_on_client;
int recv_close_on_server_cancelled;
+
+ /* reserve space for byte_buffer_reader */
+ grpc_byte_buffer_reader reserved_recv_message_reader;
} grpcsharp_batch_context;
GPR_EXPORT grpcsharp_batch_context* GPR_CALLTYPE
@@ -206,6 +210,9 @@
grpcsharp_metadata_array_destroy_metadata_only(&(ctx->recv_initial_metadata));
+ if (ctx->recv_message_reader) {
+ grpc_byte_buffer_reader_destroy(ctx->recv_message_reader);
+ }
grpc_byte_buffer_destroy(ctx->recv_message);
grpcsharp_metadata_array_destroy_metadata_only(
@@ -287,6 +294,45 @@
grpc_byte_buffer_reader_destroy(&reader);
}
+/*
+ * Gets the next slice from recv_message byte buffer.
+ * Returns 1 if a slice was get successfully, 0 if there are no more slices to
+ * read. Set slice_len to the length of the slice and the slice_data_ptr to
+ * point to slice's data. Caller must ensure that the byte buffer being read
+ * from stays alive as long as the data of the slice are being accessed
+ * (grpc_byte_buffer_reader_peek method is used internally)
+ *
+ * Remarks:
+ * Slices can only be iterated once.
+ * Initializes recv_message_buffer_reader if it was not initialized yet.
+ */
+GPR_EXPORT int GPR_CALLTYPE
+grpcsharp_batch_context_recv_message_next_slice_peek(
+ grpcsharp_batch_context* ctx, size_t* slice_len, uint8_t** slice_data_ptr) {
+ *slice_len = 0;
+ *slice_data_ptr = NULL;
+
+ if (!ctx->recv_message) {
+ return 0;
+ }
+
+ if (!ctx->recv_message_reader) {
+ ctx->recv_message_reader = &ctx->reserved_recv_message_reader;
+ GPR_ASSERT(grpc_byte_buffer_reader_init(ctx->recv_message_reader,
+ ctx->recv_message));
+ }
+
+ grpc_slice* slice_ptr;
+ if (!grpc_byte_buffer_reader_peek(ctx->recv_message_reader, &slice_ptr)) {
+ return 0;
+ }
+
+ /* recv_message buffer must not be deleted before all the data is read */
+ *slice_len = GRPC_SLICE_LENGTH(*slice_ptr);
+ *slice_data_ptr = GRPC_SLICE_START_PTR(*slice_ptr);
+ return 1;
+}
+
GPR_EXPORT grpc_status_code GPR_CALLTYPE
grpcsharp_batch_context_recv_status_on_client_status(
const grpcsharp_batch_context* ctx) {
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index c1e7fc1..cacdb30 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -7,8 +7,12 @@
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",
"Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest",
+ "Grpc.Core.Internal.Tests.DefaultDeserializationContextTest",
"Grpc.Core.Internal.Tests.DefaultObjectPoolTest",
+ "Grpc.Core.Internal.Tests.FakeBufferReaderManagerTest",
"Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest",
+ "Grpc.Core.Internal.Tests.ReusableSliceBufferTest",
+ "Grpc.Core.Internal.Tests.SliceTest",
"Grpc.Core.Internal.Tests.TimespecTest",
"Grpc.Core.Tests.AppDomainUnloadTest",
"Grpc.Core.Tests.AuthContextTest",
diff --git a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c
index 0e9d56f..1da79e2 100644
--- a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c
+++ b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c
@@ -50,6 +50,10 @@
fprintf(stderr, "Should never reach here");
abort();
}
+void grpcsharp_batch_context_recv_message_next_slice_peek() {
+ fprintf(stderr, "Should never reach here");
+ abort();
+}
void grpcsharp_batch_context_recv_status_on_client_status() {
fprintf(stderr, "Should never reach here");
abort();
diff --git a/templates/src/csharp/Grpc.Core/Internal/native_methods.include b/templates/src/csharp/Grpc.Core/Internal/native_methods.include
index e8ec4c8..dab9599 100644
--- a/templates/src/csharp/Grpc.Core/Internal/native_methods.include
+++ b/templates/src/csharp/Grpc.Core/Internal/native_methods.include
@@ -7,6 +7,7 @@
'IntPtr grpcsharp_batch_context_recv_initial_metadata(BatchContextSafeHandle ctx)',
'IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx)',
'void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen)',
+ 'int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr)',
'StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx)',
'IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx, out UIntPtr detailsLength)',
'IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata(BatchContextSafeHandle ctx)',