csharp: support slice-by-slice deserialization
diff --git a/src/csharp/Grpc.Core.Api/DeserializationContext.cs b/src/csharp/Grpc.Core.Api/DeserializationContext.cs
index d69e0db..966bcfa 100644
--- a/src/csharp/Grpc.Core.Api/DeserializationContext.cs
+++ b/src/csharp/Grpc.Core.Api/DeserializationContext.cs
@@ -39,7 +39,7 @@
         /// Also, allocating a new buffer each time can put excessive pressure on GC, especially if
         /// the payload is more than 86700 bytes large (which means the newly allocated buffer will be placed in LOH,
         /// and LOH object can only be garbage collected via a full ("stop the world") GC run).
-        /// NOTE: Deserializers are expected not to call this method more than once per received message
+        /// NOTE: Deserializers are expected not to call this method (or other payload accessor methods) more than once per received message
         /// (as there is no practical reason for doing so) and <c>DeserializationContext</c> implementations are free to assume so.
         /// </summary>
         /// <returns>byte array containing the entire payload.</returns>
@@ -47,5 +47,22 @@
         {
             throw new NotImplementedException();
         }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+        /// <summary>
+        /// Gets the entire payload as a ReadOnlySequence.
+        /// The ReadOnlySequence is only valid for the duration of the deserializer routine and the caller must not access it after the deserializer returns.
+        /// Using the read only sequence is the most efficient way to access the message payload. Where possible it allows directly
+        /// accessing the received payload without needing to perform any buffer copying or buffer allocations.
+        /// NOTE: This method is only available in the netstandard2.0 build of the library.
+        /// NOTE: Deserializers are expected not to call this method (or other payload accessor methods) more than once per received message
+        /// (as there is no practical reason for doing so) and <c>DeserializationContext</c> implementations are free to assume so.
+        /// </summary>
+        /// <returns>read only sequence containing the entire payload.</returns>
+        public virtual System.Buffers.ReadOnlySequence<byte> PayloadAsReadOnlySequence()
+        {
+            throw new NotImplementedException();
+        }
+#endif        
     }
 }
diff --git a/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj b/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj
index 6c29530..8a32bc7 100755
--- a/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj
+++ b/src/csharp/Grpc.Core.Api/Grpc.Core.Api.csproj
@@ -19,12 +19,20 @@
     <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
   </PropertyGroup>
 
+  <PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
+    <DefineConstants>$(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY</DefineConstants>
+  </PropertyGroup>
+
   <Import Project="..\Grpc.Core\SourceLink.csproj.include" />
 
   <ItemGroup>
     <PackageReference Include="System.Interactive.Async" Version="3.2.0" />
   </ItemGroup>
 
+  <ItemGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
+    <PackageReference Include="System.Memory" Version="4.5.2" />
+  </ItemGroup>
+
   <ItemGroup Condition=" '$(TargetFramework)' == 'net45' ">
     <Reference Include="System" />
     <Reference Include="Microsoft.CSharp" />
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 23e5d7f..7fef2c7 100755
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -8,6 +8,10 @@
     <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
   </PropertyGroup>
 
+  <PropertyGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.1' ">
+    <DefineConstants>$(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY</DefineConstants>
+  </PropertyGroup>
+
   <ItemGroup>
     <ProjectReference Include="../Grpc.Core/Grpc.Core.csproj" />
   </ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 5c7d48f..fd22161 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -35,6 +35,7 @@
         Server server;
         FakeNativeCall fakeCall;
         AsyncCallServer<string, string> asyncCallServer;
+        FakeBufferReaderManager fakeBufferReaderManager;
 
         [SetUp]
         public void Init()
@@ -52,11 +53,13 @@
                 Marshallers.StringMarshaller.ContextualSerializer, Marshallers.StringMarshaller.ContextualDeserializer,
                 server);
             asyncCallServer.InitializeForTesting(fakeCall);
+            fakeBufferReaderManager = new FakeBufferReaderManager();
         }
 
         [TearDown]
         public void Cleanup()
         {
+            fakeBufferReaderManager.Dispose();
             server.ShutdownAsync().Wait();
         }
 
@@ -77,7 +80,7 @@
             var moveNextTask = requestStream.MoveNext();
 
             fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             Assert.IsFalse(moveNextTask.Result);
 
             AssertFinished(asyncCallServer, fakeCall, finishedTask);
@@ -107,7 +110,7 @@
             // if a read completion's success==false, the request stream will silently finish
             // and we rely on C core cancelling the call.
             var moveNextTask = requestStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse());
             Assert.IsFalse(moveNextTask.Result);
 
             fakeCall.ReceivedCloseOnServerCallback.OnReceivedCloseOnServer(true, cancelled: true);
@@ -182,5 +185,10 @@
             Assert.IsTrue(finishedTask.IsCompleted);
             Assert.DoesNotThrow(() => finishedTask.Wait());
         }
+
+        IBufferReader CreateNullResponse()
+        {
+            return fakeBufferReaderManager.CreateNullPayloadBufferReader();
+        }
     }
 }
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 775849d..78c7f3a 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -21,6 +21,7 @@
 using System.Threading.Tasks;
 
 using Grpc.Core.Internal;
+using Grpc.Core.Utils;
 using NUnit.Framework;
 
 namespace Grpc.Core.Internal.Tests
@@ -33,6 +34,7 @@
         Channel channel;
         FakeNativeCall fakeCall;
         AsyncCall<string, string> asyncCall;
+        FakeBufferReaderManager fakeBufferReaderManager;
 
         [SetUp]
         public void Init()
@@ -43,12 +45,14 @@
 
             var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions());
             asyncCall = new AsyncCall<string, string>(callDetails, fakeCall);
+            fakeBufferReaderManager = new FakeBufferReaderManager();
         }
 
         [TearDown]
         public void Cleanup()
         {
             channel.ShutdownAsync().Wait();
+            fakeBufferReaderManager.Dispose();
         }
 
         [Test]
@@ -87,7 +91,7 @@
             var resultTask = asyncCall.UnaryCallAsync("request1");
             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
                 CreateClientSideStatus(StatusCode.InvalidArgument),
-                null,
+                CreateNullResponse(),
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -168,7 +172,7 @@
             var resultTask = asyncCall.ClientStreamingCallAsync();
             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
                 CreateClientSideStatus(StatusCode.InvalidArgument),
-                null,
+                CreateNullResponse(),
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
@@ -214,7 +218,7 @@
 
             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
                 CreateClientSideStatus(StatusCode.Internal),
-                null,
+                CreateNullResponse(),
                 new Metadata());
 
             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
@@ -233,7 +237,7 @@
 
             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
                 CreateClientSideStatus(StatusCode.Internal),
-                null,
+                CreateNullResponse(),
                 new Metadata());
 
             fakeCall.SendCompletionCallback.OnSendCompletion(false);
@@ -259,7 +263,7 @@
 
             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
                 CreateClientSideStatus(StatusCode.Internal),
-                null,
+                CreateNullResponse(),
                 new Metadata());
 
             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
@@ -357,7 +361,7 @@
 
             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
                 CreateClientSideStatus(StatusCode.Cancelled),
-                null,
+                CreateNullResponse(),
                 new Metadata());
 
             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
@@ -390,7 +394,7 @@
             fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata());
             Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
 
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -405,7 +409,7 @@
 
             // try alternative order of completions
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
         }
@@ -417,7 +421,7 @@
             var responseStream = new ClientResponseStream<string, string>(asyncCall);
             var readTask = responseStream.MoveNext();
 
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, null);  // after a failed read, we rely on C core to deliver appropriate status code.
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse());  // after a failed read, we rely on C core to deliver appropriate status code.
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal));
 
             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
@@ -441,7 +445,7 @@
 
             var readTask3 = responseStream.MoveNext();
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
         }
@@ -479,7 +483,7 @@
             Assert.DoesNotThrowAsync(async () => await writeTask1);
 
             var readTask = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -493,7 +497,7 @@
             var responseStream = new ClientResponseStream<string, string>(asyncCall);
 
             var readTask = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -511,7 +515,7 @@
             var responseStream = new ClientResponseStream<string, string>(asyncCall);
 
             var readTask = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
 
             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
@@ -533,7 +537,7 @@
             Assert.IsFalse(writeTask.IsCompleted);
 
             var readTask = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
 
             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
@@ -552,7 +556,7 @@
             var writeTask = requestStream.WriteAsync("request1");
 
             var readTask = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
             fakeCall.SendCompletionCallback.OnSendCompletion(false);
 
@@ -576,7 +580,7 @@
             Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
 
             var readTask = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
 
             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
@@ -597,7 +601,7 @@
             Assert.AreEqual("response1", responseStream.Current);
 
             var readTask2 = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
 
             AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
@@ -618,7 +622,7 @@
             Assert.AreEqual("response1", responseStream.Current);
 
             var readTask2 = responseStream.MoveNext();
-            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, null);
+            fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
 
             AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
@@ -638,9 +642,14 @@
             return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
         }
 
-        byte[] CreateResponsePayload()
+        IBufferReader CreateResponsePayload()
         {
-            return Marshallers.StringMarshaller.Serializer("response1");
+            return fakeBufferReaderManager.CreateSingleSegmentBufferReader(Marshallers.StringMarshaller.Serializer("response1"));
+        }
+
+        IBufferReader CreateNullResponse()
+        {
+            return fakeBufferReaderManager.CreateNullPayloadBufferReader();
         }
 
         static void AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)
diff --git a/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs b/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs
new file mode 100644
index 0000000..63baab3
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/DefaultDeserializationContextTest.cs
@@ -0,0 +1,240 @@
+#region Copyright notice and license
+
+// Copyright 2019 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+using System.Runtime.InteropServices;
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+using System.Buffers;
+#endif
+
+namespace Grpc.Core.Internal.Tests
+{
+    public class DefaultDeserializationContextTest
+    {
+        FakeBufferReaderManager fakeBufferReaderManager;
+
+        [SetUp]
+        public void Init()
+        {
+            fakeBufferReaderManager = new FakeBufferReaderManager();
+        }
+
+        [TearDown]
+        public void Cleanup()
+        {
+            fakeBufferReaderManager.Dispose();
+        }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+        [TestCase]
+        public void PayloadAsReadOnlySequence_ZeroSegmentPayload()
+        {
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {}));
+
+            Assert.AreEqual(0, context.PayloadLength);
+
+            var sequence = context.PayloadAsReadOnlySequence();
+
+            Assert.AreEqual(ReadOnlySequence<byte>.Empty, sequence);
+            Assert.IsTrue(sequence.IsEmpty);
+            Assert.IsTrue(sequence.IsSingleSegment);
+        }
+
+        [TestCase(0)]
+        [TestCase(1)]
+        [TestCase(10)]
+        [TestCase(100)]
+        [TestCase(1000)]
+        public void PayloadAsReadOnlySequence_SingleSegmentPayload(int segmentLength)
+        {
+            var origBuffer = GetTestBuffer(segmentLength);
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+            Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+            var sequence = context.PayloadAsReadOnlySequence();
+
+            Assert.AreEqual(origBuffer.Length, sequence.Length);
+            Assert.AreEqual(origBuffer.Length, sequence.First.Length);
+            Assert.IsTrue(sequence.IsSingleSegment);
+            CollectionAssert.AreEqual(origBuffer, sequence.First.ToArray());
+        }
+
+        [TestCase(0, 5, 10)]
+        [TestCase(1, 1, 1)]
+        [TestCase(10, 100, 1000)]
+        [TestCase(100, 100, 10)]
+        [TestCase(1000, 1000, 1000)]
+        public void PayloadAsReadOnlySequence_MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3)
+        {
+            var origBuffer1 = GetTestBuffer(segmentLen1);
+            var origBuffer2 = GetTestBuffer(segmentLen2);
+            var origBuffer3 = GetTestBuffer(segmentLen3);
+            int totalLen = origBuffer1.Length + origBuffer2.Length + origBuffer3.Length;
+
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer1, origBuffer2, origBuffer3 }));
+
+            Assert.AreEqual(totalLen, context.PayloadLength);
+
+            var sequence = context.PayloadAsReadOnlySequence();
+
+            Assert.AreEqual(totalLen, sequence.Length);
+
+            var segmentEnumerator = sequence.GetEnumerator();
+
+            Assert.IsTrue(segmentEnumerator.MoveNext());
+            CollectionAssert.AreEqual(origBuffer1, segmentEnumerator.Current.ToArray());
+
+            Assert.IsTrue(segmentEnumerator.MoveNext());
+            CollectionAssert.AreEqual(origBuffer2, segmentEnumerator.Current.ToArray());
+
+            Assert.IsTrue(segmentEnumerator.MoveNext());
+            CollectionAssert.AreEqual(origBuffer3, segmentEnumerator.Current.ToArray());
+
+            Assert.IsFalse(segmentEnumerator.MoveNext());
+        }
+#endif
+
+        [TestCase]
+        public void NullPayloadNotAllowed()
+        {
+            var context = new DefaultDeserializationContext();
+            Assert.Throws(typeof(InvalidOperationException), () => context.Initialize(fakeBufferReaderManager.CreateNullPayloadBufferReader()));
+        }
+
+        [TestCase]
+        public void PayloadAsNewByteBuffer_ZeroSegmentPayload()
+        {
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {}));
+
+            Assert.AreEqual(0, context.PayloadLength);
+
+            var payload = context.PayloadAsNewBuffer();
+            Assert.AreEqual(0, payload.Length);
+        }
+
+        [TestCase(0)]
+        [TestCase(1)]
+        [TestCase(10)]
+        [TestCase(100)]
+        [TestCase(1000)]
+        public void PayloadAsNewByteBuffer_SingleSegmentPayload(int segmentLength)
+        {
+            var origBuffer = GetTestBuffer(segmentLength);
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+            Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+            var payload = context.PayloadAsNewBuffer();
+            CollectionAssert.AreEqual(origBuffer, payload);
+        }
+
+        [TestCase(0, 5, 10)]
+        [TestCase(1, 1, 1)]
+        [TestCase(10, 100, 1000)]
+        [TestCase(100, 100, 10)]
+        [TestCase(1000, 1000, 1000)]
+        public void PayloadAsNewByteBuffer_MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3)
+        {
+            var origBuffer1 = GetTestBuffer(segmentLen1);
+            var origBuffer2 = GetTestBuffer(segmentLen2);
+            var origBuffer3 = GetTestBuffer(segmentLen3);
+
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer1, origBuffer2, origBuffer3 }));
+
+            var payload = context.PayloadAsNewBuffer();
+
+            var concatenatedOrigBuffers = new List<byte>();
+            concatenatedOrigBuffers.AddRange(origBuffer1);
+            concatenatedOrigBuffers.AddRange(origBuffer2);
+            concatenatedOrigBuffers.AddRange(origBuffer3);
+
+            Assert.AreEqual(concatenatedOrigBuffers.Count, context.PayloadLength);
+            Assert.AreEqual(concatenatedOrigBuffers.Count, payload.Length);
+            CollectionAssert.AreEqual(concatenatedOrigBuffers, payload);
+        }
+
+        [TestCase]
+        public void GetPayloadMultipleTimesIsIllegal()
+        {
+            var origBuffer = GetTestBuffer(100);
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+            Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+            var payload = context.PayloadAsNewBuffer();
+            CollectionAssert.AreEqual(origBuffer, payload);
+
+            // Getting payload multiple times is illegal
+            Assert.Throws(typeof(InvalidOperationException), () => context.PayloadAsNewBuffer());
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+            Assert.Throws(typeof(InvalidOperationException), () => context.PayloadAsReadOnlySequence());
+#endif
+        }
+
+        [TestCase]
+        public void ResetContextAndReinitialize()
+        {
+            var origBuffer = GetTestBuffer(100);
+            var context = new DefaultDeserializationContext();
+            context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer));
+
+            Assert.AreEqual(origBuffer.Length, context.PayloadLength);
+
+            // Reset invalidates context
+            context.Reset();
+
+            Assert.AreEqual(0, context.PayloadLength);
+            Assert.Throws(typeof(NullReferenceException), () => context.PayloadAsNewBuffer());
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+            Assert.Throws(typeof(NullReferenceException), () => context.PayloadAsReadOnlySequence());
+#endif
+
+            // Previously reset context can be initialized again
+            var origBuffer2 = GetTestBuffer(50);
+            context.Initialize(fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer2));
+
+            Assert.AreEqual(origBuffer2.Length, context.PayloadLength);
+            CollectionAssert.AreEqual(origBuffer2, context.PayloadAsNewBuffer());
+        }
+
+        private byte[] GetTestBuffer(int length)
+        {
+            var testBuffer = new byte[length];
+            for (int i = 0; i < testBuffer.Length; i++)
+            {
+                testBuffer[i] = (byte) i;
+            }
+            return testBuffer;
+        }
+    }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs
new file mode 100644
index 0000000..d8d0c0a
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManager.cs
@@ -0,0 +1,118 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal.Tests
+{
+    // Creates instances of fake IBufferReader. All created instances will become invalid once Dispose is called.
+    internal class FakeBufferReaderManager : IDisposable
+    {
+        List<GCHandle> pinnedHandles = new List<GCHandle>();
+        bool disposed = false;
+        public IBufferReader CreateSingleSegmentBufferReader(byte[] data)
+        {
+            return CreateMultiSegmentBufferReader(new List<byte[]> { data });
+        }
+
+        public IBufferReader CreateMultiSegmentBufferReader(IEnumerable<byte[]> dataSegments)
+        {
+            GrpcPreconditions.CheckState(!disposed);
+            GrpcPreconditions.CheckNotNull(dataSegments);
+            var segments = new List<GCHandle>();
+            foreach (var data in dataSegments)
+            {
+                GrpcPreconditions.CheckNotNull(data);
+                segments.Add(GCHandle.Alloc(data, GCHandleType.Pinned));
+            }
+            pinnedHandles.AddRange(segments);  // all the allocated GCHandles will be freed on Dispose()
+            return new FakeBufferReader(segments);
+        }
+
+        public IBufferReader CreateNullPayloadBufferReader()
+        {
+            GrpcPreconditions.CheckState(!disposed);
+            return new FakeBufferReader(null);
+        }
+
+        public void Dispose()
+        {
+            if (!disposed)
+            {
+                disposed = true;
+                for (int i = 0; i < pinnedHandles.Count; i++)
+                {
+                    pinnedHandles[i].Free();
+                }
+            }
+        }
+
+        private class FakeBufferReader : IBufferReader
+        {
+            readonly List<GCHandle> bufferSegments;
+            readonly int? totalLength;
+            readonly IEnumerator<GCHandle> segmentEnumerator;
+
+            public FakeBufferReader(List<GCHandle> bufferSegments)
+            {
+                this.bufferSegments = bufferSegments;
+                this.totalLength = ComputeTotalLength(bufferSegments);
+                this.segmentEnumerator = bufferSegments?.GetEnumerator();
+            }
+
+            public int? TotalLength => totalLength;
+
+            public bool TryGetNextSlice(out Slice slice)
+            {
+                GrpcPreconditions.CheckNotNull(bufferSegments);
+                if (!segmentEnumerator.MoveNext())
+                {
+                    slice = default(Slice);
+                    return false;
+                }
+
+                var segment = segmentEnumerator.Current;
+                int sliceLen = ((byte[]) segment.Target).Length;
+                slice = new Slice(segment.AddrOfPinnedObject(), sliceLen);
+                return true;
+            }
+
+            static int? ComputeTotalLength(List<GCHandle> bufferSegments)
+            {
+                if (bufferSegments == null)
+                {
+                    return null;
+                }
+
+                int sum = 0;
+                foreach (var segment in bufferSegments)
+                {
+                    var data = (byte[]) segment.Target;
+                    sum += data.Length;
+                }
+                return sum;
+            }
+        }
+    }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs
new file mode 100644
index 0000000..7c4ff65
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeBufferReaderManagerTest.cs
@@ -0,0 +1,121 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+    public class FakeBufferReaderManagerTest
+    {
+        FakeBufferReaderManager fakeBufferReaderManager;
+
+        [SetUp]
+        public void Init()
+        {
+            fakeBufferReaderManager = new FakeBufferReaderManager();
+        }
+
+        [TearDown]
+        public void Cleanup()
+        {
+            fakeBufferReaderManager.Dispose();
+        }
+
+        [TestCase]
+        public void NullPayload()
+        {
+            var fakeBufferReader = fakeBufferReaderManager.CreateNullPayloadBufferReader();
+            Assert.IsFalse(fakeBufferReader.TotalLength.HasValue);
+            Assert.Throws(typeof(ArgumentNullException), () => fakeBufferReader.TryGetNextSlice(out Slice slice));
+        }
+        [TestCase]
+        public void ZeroSegmentPayload()
+        {
+            var fakeBufferReader = fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {});
+            Assert.AreEqual(0, fakeBufferReader.TotalLength.Value);
+            Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice));
+        }
+
+        [TestCase(0)]
+        [TestCase(1)]
+        [TestCase(10)]
+        [TestCase(30)]
+        [TestCase(100)]
+        [TestCase(1000)]
+        public void SingleSegmentPayload(int bufferLen)
+        {
+            var origBuffer = GetTestBuffer(bufferLen);
+            var fakeBufferReader = fakeBufferReaderManager.CreateSingleSegmentBufferReader(origBuffer);
+            Assert.AreEqual(origBuffer.Length, fakeBufferReader.TotalLength.Value);
+
+            Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice));
+            AssertSliceDataEqual(origBuffer, slice);
+
+            Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice2));
+        }
+
+        [TestCase(0, 5, 10)]
+        [TestCase(1, 1, 1)]
+        [TestCase(10, 100, 1000)]
+        [TestCase(100, 100, 10)]
+        [TestCase(1000, 1000, 1000)]
+        public void MultiSegmentPayload(int segmentLen1, int segmentLen2, int segmentLen3)
+        {
+            var origBuffer1 = GetTestBuffer(segmentLen1);
+            var origBuffer2 = GetTestBuffer(segmentLen2);
+            var origBuffer3 = GetTestBuffer(segmentLen3);
+            var fakeBufferReader = fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer1, origBuffer2, origBuffer3 });
+
+            Assert.AreEqual(origBuffer1.Length + origBuffer2.Length + origBuffer3.Length, fakeBufferReader.TotalLength.Value);
+
+            Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice1));
+            AssertSliceDataEqual(origBuffer1, slice1);
+
+            Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice2));
+            AssertSliceDataEqual(origBuffer2, slice2);
+
+            Assert.IsTrue(fakeBufferReader.TryGetNextSlice(out Slice slice3));
+            AssertSliceDataEqual(origBuffer3, slice3);
+
+            Assert.IsFalse(fakeBufferReader.TryGetNextSlice(out Slice slice4));
+        }
+
+        private void AssertSliceDataEqual(byte[] expected, Slice actual)
+        {
+            var actualSliceData = new byte[actual.Length];
+            actual.CopyTo(new ArraySegment<byte>(actualSliceData));
+            CollectionAssert.AreEqual(expected, actualSliceData);
+        }
+
+        // create a buffer of given size and fill it with some data
+        private byte[] GetTestBuffer(int length)
+        {
+            var testBuffer = new byte[length];
+            for (int i = 0; i < testBuffer.Length; i++)
+            {
+                testBuffer[i] = (byte) i;
+            }
+            return testBuffer;
+        }
+    }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs b/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs
new file mode 100644
index 0000000..7630785
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/ReusableSliceBufferTest.cs
@@ -0,0 +1,151 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+using System.Runtime.InteropServices;
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+using System.Buffers;
+#endif
+
+namespace Grpc.Core.Internal.Tests
+{
+    // Converts IBufferReader into instances of ReadOnlySequence<byte>
+    // Objects representing the sequence segments are cached to decrease GC load.
+    public class ReusableSliceBufferTest
+    {
+        FakeBufferReaderManager fakeBufferReaderManager;
+
+        [SetUp]
+        public void Init()
+        {
+            fakeBufferReaderManager = new FakeBufferReaderManager();
+        }
+
+        [TearDown]
+        public void Cleanup()
+        {
+            fakeBufferReaderManager.Dispose();
+        }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+        [TestCase]
+        public void NullPayload()
+        {
+            var sliceBuffer = new ReusableSliceBuffer();
+            Assert.Throws(typeof(ArgumentNullException), () => sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateNullPayloadBufferReader()));
+        }
+
+        [TestCase]
+        public void ZeroSegmentPayload()
+        {
+            var sliceBuffer = new ReusableSliceBuffer();
+            var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> {}));
+
+            Assert.AreEqual(ReadOnlySequence<byte>.Empty, sequence);
+            Assert.IsTrue(sequence.IsEmpty);
+            Assert.IsTrue(sequence.IsSingleSegment);
+        }
+
+        [TestCase]
+        public void SegmentsAreCached()
+        {
+            var bufferSegments1 = Enumerable.Range(0, 100).Select((_) => GetTestBuffer(50)).ToList();
+            var bufferSegments2 = Enumerable.Range(0, 100).Select((_) => GetTestBuffer(50)).ToList();
+
+            var sliceBuffer = new ReusableSliceBuffer();
+
+            var sequence1 = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments1));
+            var memoryManagers1 = GetMemoryManagersForSequenceSegments(sequence1);
+
+            sliceBuffer.Invalidate();
+
+            var sequence2 = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments2));
+            var memoryManagers2 = GetMemoryManagersForSequenceSegments(sequence2);
+
+            // check memory managers are identical objects (i.e. they've been reused)
+            CollectionAssert.AreEquivalent(memoryManagers1, memoryManagers2);
+        }
+
+        [TestCase]
+        public void MultiSegmentPayload_LotsOfSegments()
+        {
+            var bufferSegments = Enumerable.Range(0, ReusableSliceBuffer.MaxCachedSegments + 100).Select((_) => GetTestBuffer(10)).ToList();
+
+            var sliceBuffer = new ReusableSliceBuffer();
+            var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(bufferSegments));
+
+            int index = 0;
+            foreach (var memory in sequence)
+            {
+                CollectionAssert.AreEqual(bufferSegments[index], memory.ToArray());
+                index ++;
+            }
+        }
+
+        [TestCase]
+        public void InvalidateMakesSequenceUnusable()
+        {
+            var origBuffer = GetTestBuffer(100);
+
+            var sliceBuffer = new ReusableSliceBuffer();
+            var sequence = sliceBuffer.PopulateFrom(fakeBufferReaderManager.CreateMultiSegmentBufferReader(new List<byte[]> { origBuffer }));
+
+            Assert.AreEqual(origBuffer.Length, sequence.Length);
+
+            sliceBuffer.Invalidate();
+
+            // Invalidate with make the returned sequence completely unusable and broken, users must not use it beyond the deserializer functions.
+            Assert.Throws(typeof(ArgumentOutOfRangeException), () => { var first = sequence.First; });
+        }
+
+        private List<MemoryManager<byte>> GetMemoryManagersForSequenceSegments(ReadOnlySequence<byte> sequence)
+        {
+            var result = new List<MemoryManager<byte>>();
+            foreach (var memory in sequence)
+            {
+                Assert.IsTrue(MemoryMarshal.TryGetMemoryManager(memory, out MemoryManager<byte> memoryManager));
+                result.Add(memoryManager);
+            }
+            return result;
+        }
+#else
+        [TestCase]
+        public void OnlySupportedOnNetCore()
+        {
+            // Test case needs to exist to make C# sanity test happy.
+        }
+#endif
+        private byte[] GetTestBuffer(int length)
+        {
+            var testBuffer = new byte[length];
+            for (int i = 0; i < testBuffer.Length; i++)
+            {
+                testBuffer[i] = (byte) i;
+            }
+            return testBuffer;
+        }
+    }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs b/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs
new file mode 100644
index 0000000..eb090bb
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/SliceTest.cs
@@ -0,0 +1,83 @@
+#region Copyright notice and license
+
+// Copyright 2018 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+using System.Runtime.InteropServices;
+
+namespace Grpc.Core.Internal.Tests
+{
+    public class SliceTest
+    {
+        [TestCase(0)]
+        [TestCase(1)]
+        [TestCase(10)]
+        [TestCase(100)]
+        [TestCase(1000)]
+        public void SliceFromNativePtr_CopyToArraySegment(int bufferLength)
+        {
+            var origBuffer = GetTestBuffer(bufferLength);
+            var gcHandle = GCHandle.Alloc(origBuffer, GCHandleType.Pinned);
+            try
+            {
+                var slice = new Slice(gcHandle.AddrOfPinnedObject(), origBuffer.Length);
+                Assert.AreEqual(bufferLength, slice.Length);
+
+                var newBuffer = new byte[bufferLength];
+                slice.CopyTo(new ArraySegment<byte>(newBuffer));
+                CollectionAssert.AreEqual(origBuffer, newBuffer);
+            }
+            finally
+            {
+                gcHandle.Free();
+            }
+        }
+
+        [TestCase]
+        public void SliceFromNativePtr_CopyToArraySegmentTooSmall()
+        {
+            var origBuffer = GetTestBuffer(100);
+            var gcHandle = GCHandle.Alloc(origBuffer, GCHandleType.Pinned);
+            try
+            {
+                var slice = new Slice(gcHandle.AddrOfPinnedObject(), origBuffer.Length);
+                var tooSmall = new byte[origBuffer.Length - 1];
+                Assert.Catch(typeof(ArgumentException), () => slice.CopyTo(new ArraySegment<byte>(tooSmall)));
+            }
+            finally
+            {
+                gcHandle.Free();
+            }
+        }
+
+        // create a buffer of given size and fill it with some data
+        private byte[] GetTestBuffer(int length)
+        {
+            var testBuffer = new byte[length];
+            for (int i = 0; i < testBuffer.Length; i++)
+            {
+                testBuffer[i] = (byte) i;
+            }
+            return testBuffer;
+        }
+    }
+}
diff --git a/src/csharp/Grpc.Core/Grpc.Core.csproj b/src/csharp/Grpc.Core/Grpc.Core.csproj
index b7c191e..afd60e7 100755
--- a/src/csharp/Grpc.Core/Grpc.Core.csproj
+++ b/src/csharp/Grpc.Core/Grpc.Core.csproj
@@ -19,6 +19,15 @@
     <TreatWarningsAsErrors>true</TreatWarningsAsErrors>
   </PropertyGroup>
 
+  <PropertyGroup>
+    <AllowUnsafeBlocks>true</AllowUnsafeBlocks>
+  </PropertyGroup>
+
+  <PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard2.0' ">
+    <LangVersion>7.2</LangVersion>
+    <DefineConstants>$(DefineConstants);GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY</DefineConstants>
+  </PropertyGroup>
+
   <ItemGroup>
     <Compile Include="..\Grpc.Core.Api\Version.cs" />
   </ItemGroup>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 785081c..a1c6881 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -111,7 +111,7 @@
                             {
                                 using (profiler.NewScope("AsyncCall.UnaryCall.HandleBatch"))
                                 {
-                                    HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessage(), ctx.GetReceivedInitialMetadata());
+                                    HandleUnaryResponse(success, ctx.GetReceivedStatusOnClient(), ctx.GetReceivedMessageReader(), ctx.GetReceivedInitialMetadata());
                                 }
                             }
                             catch (Exception e)
@@ -537,14 +537,14 @@
         /// <summary>
         /// Handler for unary response completion.
         /// </summary>
-        private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
+        private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)
         {
             // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
             // success will be always set to true.
 
             TaskCompletionSource<object> delayedStreamingWriteTcs = null;
             TResponse msg = default(TResponse);
-            var deserializeException = TryDeserialize(receivedMessage, out msg);
+            var deserializeException = TryDeserialize(receivedMessageReader, out msg);
 
             bool releasedResources;
             lock (myLock)
@@ -634,9 +634,9 @@
 
         IUnaryResponseClientCallback UnaryResponseClientCallback => this;
 
-        void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
+        void IUnaryResponseClientCallback.OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders)
         {
-            HandleUnaryResponse(success, receivedStatus, receivedMessage, responseHeaders);
+            HandleUnaryResponse(success, receivedStatus, receivedMessageReader, responseHeaders);
         }
 
         IReceivedStatusOnClientCallback ReceivedStatusOnClientCallback => this;
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 39c9f7c..9497371 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -228,12 +228,12 @@
             }
         }
 
-        protected Exception TryDeserialize(byte[] payload, out TRead msg)
+        protected Exception TryDeserialize(IBufferReader reader, out TRead msg)
         {
             DefaultDeserializationContext context = null;
             try
             {
-                context = DefaultDeserializationContext.GetInitializedThreadLocal(payload);
+                context = DefaultDeserializationContext.GetInitializedThreadLocal(reader);
                 msg = deserializer(context);
                 return null;
             }
@@ -245,7 +245,6 @@
             finally
             {
                 context?.Reset();
-
             }
         }
 
@@ -333,21 +332,21 @@
         /// <summary>
         /// Handles streaming read completion.
         /// </summary>
-        protected void HandleReadFinished(bool success, byte[] receivedMessage)
+        protected void HandleReadFinished(bool success, IBufferReader receivedMessageReader)
         {
             // if success == false, received message will be null. It that case we will
             // treat this completion as the last read an rely on C core to handle the failed
             // read (e.g. deliver approriate statusCode on the clientside).
 
             TRead msg = default(TRead);
-            var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
+            var deserializeException = (success && receivedMessageReader.TotalLength.HasValue) ? TryDeserialize(receivedMessageReader, out msg) : null;
 
             TaskCompletionSource<TRead> origTcs = null;
             bool releasedResources;
             lock (myLock)
             {
                 origTcs = streamingReadTcs;
-                if (receivedMessage == null)
+                if (!receivedMessageReader.TotalLength.HasValue)
                 {
                     // This was the last read.
                     readingDone = true;
@@ -391,9 +390,9 @@
 
         IReceivedMessageCallback ReceivedMessageCallback => this;
 
-        void IReceivedMessageCallback.OnReceivedMessage(bool success, byte[] receivedMessage)
+        void IReceivedMessageCallback.OnReceivedMessage(bool success, IBufferReader receivedMessageReader)
         {
-            HandleReadFinished(success, receivedMessage);
+            HandleReadFinished(success, receivedMessageReader);
         }
     }
 }
diff --git a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
index 085e7fa..61af26e 100644
--- a/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/BatchContextSafeHandle.cs
@@ -30,10 +30,17 @@
         void OnComplete(bool success);
     }
 
+    internal interface IBufferReader
+    {
+        int? TotalLength { get; }
+
+        bool TryGetNextSlice(out Slice slice);
+    }
+
     /// <summary>
     /// grpcsharp_batch_context
     /// </summary>
-    internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<BatchContextSafeHandle>
+    internal class BatchContextSafeHandle : SafeHandleZeroIsInvalid, IOpCompletionCallback, IPooledObject<BatchContextSafeHandle>, IBufferReader
     {
         static readonly NativeMethods Native = NativeMethods.Get();
         static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<BatchContextSafeHandle>();
@@ -106,6 +113,25 @@
             return data;
         }
 
+        public bool GetReceivedMessageNextSlicePeek(out Slice slice)
+        {
+            UIntPtr sliceLen;
+            IntPtr sliceDataPtr;
+            
+            if (0 == Native.grpcsharp_batch_context_recv_message_next_slice_peek(this, out sliceLen, out sliceDataPtr))
+            {
+                slice = default(Slice);
+                return false;
+            }
+            slice = new Slice(sliceDataPtr, (int) sliceLen);
+            return true;
+        }
+
+        public IBufferReader GetReceivedMessageReader()
+        {
+            return this;
+        }
+
         // Gets data of receive_close_on_server completion.
         public bool GetReceivedCloseOnServerCancelled()
         {
@@ -153,6 +179,20 @@
             }
         }
 
+        int? IBufferReader.TotalLength
+        {
+            get
+            {
+                var len = Native.grpcsharp_batch_context_recv_message_length(this);
+                return len != new IntPtr(-1) ? (int?) len : null;
+            }
+        }
+
+        bool IBufferReader.TryGetNextSlice(out Slice slice)
+        {
+            return GetReceivedMessageNextSlicePeek(out slice);
+        }
+
         struct CompletionCallbackData
         {
             public CompletionCallbackData(BatchCompletionDelegate callback, object state)
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index a3ef3e6..858d2a6 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -35,11 +35,11 @@
         // Completion handlers are pre-allocated to avoid unneccessary delegate allocations.
         // The "state" field is used to store the actual callback to invoke.
         static readonly BatchCompletionDelegate CompletionHandler_IUnaryResponseClientCallback =
-            (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessage(), context.GetReceivedInitialMetadata());
+            (success, context, state) => ((IUnaryResponseClientCallback)state).OnUnaryResponseClient(success, context.GetReceivedStatusOnClient(), context.GetReceivedMessageReader(), context.GetReceivedInitialMetadata());
         static readonly BatchCompletionDelegate CompletionHandler_IReceivedStatusOnClientCallback =
             (success, context, state) => ((IReceivedStatusOnClientCallback)state).OnReceivedStatusOnClient(success, context.GetReceivedStatusOnClient());
         static readonly BatchCompletionDelegate CompletionHandler_IReceivedMessageCallback =
-            (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessage());
+            (success, context, state) => ((IReceivedMessageCallback)state).OnReceivedMessage(success, context.GetReceivedMessageReader());
         static readonly BatchCompletionDelegate CompletionHandler_IReceivedResponseHeadersCallback =
             (success, context, state) => ((IReceivedResponseHeadersCallback)state).OnReceivedResponseHeaders(success, context.GetReceivedInitialMetadata());
         static readonly BatchCompletionDelegate CompletionHandler_ISendCompletionCallback =
diff --git a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
index 7ace80e..bac7bbe 100644
--- a/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
+++ b/src/csharp/Grpc.Core/Internal/DefaultDeserializationContext.cs
@@ -20,6 +20,10 @@
 using System;
 using System.Threading;
 
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+using System.Buffers;
+#endif
+
 namespace Grpc.Core.Internal
 {
     internal class DefaultDeserializationContext : DeserializationContext
@@ -27,40 +31,74 @@
         static readonly ThreadLocal<DefaultDeserializationContext> threadLocalInstance =
             new ThreadLocal<DefaultDeserializationContext>(() => new DefaultDeserializationContext(), false);
 
-        byte[] payload;
-        bool alreadyCalledPayloadAsNewBuffer;
+        IBufferReader bufferReader;
+        int payloadLength;
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+        ReusableSliceBuffer cachedSliceBuffer = new ReusableSliceBuffer();
+#endif
 
         public DefaultDeserializationContext()
         {
             Reset();
         }
 
-        public override int PayloadLength => payload.Length;
+        public override int PayloadLength => payloadLength;
 
         public override byte[] PayloadAsNewBuffer()
         {
-            GrpcPreconditions.CheckState(!alreadyCalledPayloadAsNewBuffer);
-            alreadyCalledPayloadAsNewBuffer = true;
-            return payload;
+            var buffer = new byte[payloadLength];
+            FillContinguousBuffer(bufferReader, buffer);
+            return buffer;
         }
 
-        public void Initialize(byte[] payload)
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+        public override ReadOnlySequence<byte> PayloadAsReadOnlySequence()
         {
-            this.payload = GrpcPreconditions.CheckNotNull(payload);
-            this.alreadyCalledPayloadAsNewBuffer = false;
+            var sequence = cachedSliceBuffer.PopulateFrom(bufferReader);
+            GrpcPreconditions.CheckState(sequence.Length == payloadLength);
+            return sequence;
+        }
+#endif
+
+        public void Initialize(IBufferReader bufferReader)
+        {
+            this.bufferReader = GrpcPreconditions.CheckNotNull(bufferReader);
+            this.payloadLength = bufferReader.TotalLength.Value;  // payload must not be null
         }
 
         public void Reset()
         {
-            this.payload = null;
-            this.alreadyCalledPayloadAsNewBuffer = true;  // mark payload as read
+            this.bufferReader = null;
+            this.payloadLength = 0;
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+            this.cachedSliceBuffer.Invalidate();
+#endif
         }
 
-        public static DefaultDeserializationContext GetInitializedThreadLocal(byte[] payload)
+        public static DefaultDeserializationContext GetInitializedThreadLocal(IBufferReader bufferReader)
         {
             var instance = threadLocalInstance.Value;
-            instance.Initialize(payload);
+            instance.Initialize(bufferReader);
             return instance;
         }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+        private void FillContinguousBuffer(IBufferReader reader, byte[] destination)
+        {
+            PayloadAsReadOnlySequence().CopyTo(new Span<byte>(destination));
+        }
+#else
+        private void FillContinguousBuffer(IBufferReader reader, byte[] destination)
+        {
+            int offset = 0;
+            while (reader.TryGetNextSlice(out Slice slice))
+            {
+                slice.CopyTo(new ArraySegment<byte>(destination, offset, (int)slice.Length));
+                offset += (int)slice.Length;
+            }
+            // check that we filled the entire destination
+            GrpcPreconditions.CheckState(offset == payloadLength);
+        }
+#endif
     }
 }
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index 5c35b2b..98117c6 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -22,7 +22,7 @@
 {
     internal interface IUnaryResponseClientCallback
     {
-        void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders);
+        void OnUnaryResponseClient(bool success, ClientSideStatus receivedStatus, IBufferReader receivedMessageReader, Metadata responseHeaders);
     }
 
     // Received status for streaming response calls.
@@ -33,7 +33,7 @@
 
     internal interface IReceivedMessageCallback
     {
-        void OnReceivedMessage(bool success, byte[] receivedMessage);
+        void OnReceivedMessage(bool success, IBufferReader receivedMessageReader);
     }
 
     internal interface IReceivedResponseHeadersCallback
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs
index a1387af..9752a8e 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.Generated.cs
@@ -41,6 +41,7 @@
         public readonly Delegates.grpcsharp_batch_context_recv_initial_metadata_delegate grpcsharp_batch_context_recv_initial_metadata;
         public readonly Delegates.grpcsharp_batch_context_recv_message_length_delegate grpcsharp_batch_context_recv_message_length;
         public readonly Delegates.grpcsharp_batch_context_recv_message_to_buffer_delegate grpcsharp_batch_context_recv_message_to_buffer;
+        public readonly Delegates.grpcsharp_batch_context_recv_message_next_slice_peek_delegate grpcsharp_batch_context_recv_message_next_slice_peek;
         public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate grpcsharp_batch_context_recv_status_on_client_status;
         public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate grpcsharp_batch_context_recv_status_on_client_details;
         public readonly Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
@@ -142,6 +143,7 @@
             this.grpcsharp_batch_context_recv_initial_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_initial_metadata_delegate>(library);
             this.grpcsharp_batch_context_recv_message_length = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_message_length_delegate>(library);
             this.grpcsharp_batch_context_recv_message_to_buffer = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_message_to_buffer_delegate>(library);
+            this.grpcsharp_batch_context_recv_message_next_slice_peek = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_message_next_slice_peek_delegate>(library);
             this.grpcsharp_batch_context_recv_status_on_client_status = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_status_delegate>(library);
             this.grpcsharp_batch_context_recv_status_on_client_details = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_details_delegate>(library);
             this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = GetMethodDelegate<Delegates.grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate>(library);
@@ -242,6 +244,7 @@
             this.grpcsharp_batch_context_recv_initial_metadata = DllImportsFromStaticLib.grpcsharp_batch_context_recv_initial_metadata;
             this.grpcsharp_batch_context_recv_message_length = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_length;
             this.grpcsharp_batch_context_recv_message_to_buffer = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_to_buffer;
+            this.grpcsharp_batch_context_recv_message_next_slice_peek = DllImportsFromStaticLib.grpcsharp_batch_context_recv_message_next_slice_peek;
             this.grpcsharp_batch_context_recv_status_on_client_status = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_status;
             this.grpcsharp_batch_context_recv_status_on_client_details = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_details;
             this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = DllImportsFromStaticLib.grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
@@ -342,6 +345,7 @@
             this.grpcsharp_batch_context_recv_initial_metadata = DllImportsFromSharedLib.grpcsharp_batch_context_recv_initial_metadata;
             this.grpcsharp_batch_context_recv_message_length = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_length;
             this.grpcsharp_batch_context_recv_message_to_buffer = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_to_buffer;
+            this.grpcsharp_batch_context_recv_message_next_slice_peek = DllImportsFromSharedLib.grpcsharp_batch_context_recv_message_next_slice_peek;
             this.grpcsharp_batch_context_recv_status_on_client_status = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_status;
             this.grpcsharp_batch_context_recv_status_on_client_details = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_details;
             this.grpcsharp_batch_context_recv_status_on_client_trailing_metadata = DllImportsFromSharedLib.grpcsharp_batch_context_recv_status_on_client_trailing_metadata;
@@ -445,6 +449,7 @@
             public delegate IntPtr grpcsharp_batch_context_recv_initial_metadata_delegate(BatchContextSafeHandle ctx);
             public delegate IntPtr grpcsharp_batch_context_recv_message_length_delegate(BatchContextSafeHandle ctx);
             public delegate void grpcsharp_batch_context_recv_message_to_buffer_delegate(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
+            public delegate int grpcsharp_batch_context_recv_message_next_slice_peek_delegate(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr);
             public delegate StatusCode grpcsharp_batch_context_recv_status_on_client_status_delegate(BatchContextSafeHandle ctx);
             public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_details_delegate(BatchContextSafeHandle ctx, out UIntPtr detailsLength);
             public delegate IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata_delegate(BatchContextSafeHandle ctx);
@@ -565,6 +570,9 @@
             public static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
             
             [DllImport(ImportName)]
+            public static extern int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr);
+            
+            [DllImport(ImportName)]
             public static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
             
             [DllImport(ImportName)]
@@ -861,6 +869,9 @@
             public static extern void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen);
             
             [DllImport(ImportName)]
+            public static extern int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr);
+            
+            [DllImport(ImportName)]
             public static extern StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx);
             
             [DllImport(ImportName)]
diff --git a/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs b/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs
new file mode 100644
index 0000000..fb8d2e7
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/ReusableSliceBuffer.cs
@@ -0,0 +1,148 @@
+#region Copyright notice and license
+
+// Copyright 2019 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+
+using Grpc.Core.Utils;
+using System;
+using System.Threading;
+
+using System.Buffers;
+
+namespace Grpc.Core.Internal
+{
+    internal class ReusableSliceBuffer
+    {
+        public const int MaxCachedSegments = 1024;  // ~4MB payload for 4K slices
+
+        readonly SliceSegment[] cachedSegments = new SliceSegment[MaxCachedSegments];
+        int populatedSegmentCount = 0;
+
+        public ReadOnlySequence<byte> PopulateFrom(IBufferReader bufferReader)
+        {
+            long offset = 0;
+            int index = 0;
+            SliceSegment prevSegment = null;
+            while (bufferReader.TryGetNextSlice(out Slice slice))
+            {
+                // Initialize cached segment if still null or just allocate a new segment if we already reached MaxCachedSegments
+                var current = index < cachedSegments.Length ? cachedSegments[index] : new SliceSegment();
+                if (current == null)
+                {
+                    current = cachedSegments[index] = new SliceSegment();
+                }
+
+                current.Reset(slice, offset);
+                prevSegment?.SetNext(current);
+
+                index ++;
+                offset += slice.Length;
+                prevSegment = current;
+            }
+            populatedSegmentCount = index;
+
+            // Not necessary for ending the ReadOnlySequence, but for making sure we
+            // don't keep more than MaxCachedSegments alive.
+            prevSegment?.SetNext(null);
+
+            if (index == 0)
+            {
+                return ReadOnlySequence<byte>.Empty;
+            }
+
+            var firstSegment = cachedSegments[0];
+            var lastSegment = prevSegment;
+            return new ReadOnlySequence<byte>(firstSegment, 0, lastSegment, lastSegment.Memory.Length);
+        }
+
+        public void Invalidate()
+        {
+            if (populatedSegmentCount == 0)
+            {
+                return;
+            }
+            var segment = cachedSegments[0];
+            while (segment != null)
+            {
+                segment.Reset(new Slice(IntPtr.Zero, 0), 0);
+                segment.SetNext(null);
+                segment = (SliceSegment) segment.Next;
+            }
+            populatedSegmentCount = 0;
+        }
+
+        // Represents a segment in ReadOnlySequence
+        // Segment is backed by Slice and the instances are reusable.
+        private class SliceSegment : ReadOnlySequenceSegment<byte>
+        {
+            readonly SliceMemoryManager pointerMemoryManager = new SliceMemoryManager();
+
+            public void Reset(Slice slice, long runningIndex)
+            {
+                pointerMemoryManager.Reset(slice);
+                Memory = pointerMemoryManager.Memory;  // maybe not always necessary
+                RunningIndex = runningIndex;
+            }
+
+            public void SetNext(ReadOnlySequenceSegment<byte> next)
+            {
+                Next = next;
+            }        
+        }
+
+        // Allow creating instances of Memory<byte> from Slice.
+        // Represents a chunk of native memory, but doesn't manage its lifetime.
+        // Instances of this class are reuseable - they can be reset to point to a different memory chunk.
+        // That is important to make the instances cacheable (rather then creating new instances
+        // the old ones will be reused to reduce GC pressure).
+        private class SliceMemoryManager : MemoryManager<byte>
+        {
+            private Slice slice;
+
+            public void Reset(Slice slice)
+            {
+                this.slice = slice;
+            }
+
+            public void Reset()
+            {
+                Reset(new Slice(IntPtr.Zero, 0));
+            }
+
+            public override Span<byte> GetSpan()
+            {
+                return slice.ToSpanUnsafe();
+            }
+
+            public override MemoryHandle Pin(int elementIndex = 0)
+            {
+                throw new NotSupportedException();
+            }
+
+            public override void Unpin()
+            {
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                // NOP
+            }
+        }
+    }
+}
+#endif
diff --git a/src/csharp/Grpc.Core/Internal/Slice.cs b/src/csharp/Grpc.Core/Internal/Slice.cs
new file mode 100644
index 0000000..22eb953
--- /dev/null
+++ b/src/csharp/Grpc.Core/Internal/Slice.cs
@@ -0,0 +1,68 @@
+#region Copyright notice and license
+
+// Copyright 2019 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#endregion
+
+using System;
+using System.Runtime.InteropServices;
+using System.Threading;
+using Grpc.Core.Utils;
+
+namespace Grpc.Core.Internal
+{
+    /// <summary>
+    /// Slice of native memory.
+    /// Rough equivalent of grpc_slice (but doesn't support inlined slices, just a pointer to data and length)
+    /// </summary>
+    internal struct Slice
+    {
+        private readonly IntPtr dataPtr;
+        private readonly int length;
+     
+        public Slice(IntPtr dataPtr, int length)
+        {
+            this.dataPtr = dataPtr;
+            this.length = length;
+        }
+
+        public int Length => length;
+
+        // copies data of the slice to given span.
+        // there needs to be enough space in the destination buffer
+        public void CopyTo(ArraySegment<byte> destination)
+        {
+            Marshal.Copy(dataPtr, destination.Array, destination.Offset, length);
+        }
+
+#if GRPC_CSHARP_SUPPORT_SYSTEM_MEMORY
+        public Span<byte> ToSpanUnsafe()
+        {
+            unsafe
+            {
+                return new Span<byte>((byte*) dataPtr, length);
+            }
+        }
+#endif
+
+        /// <summary>
+        /// Returns a <see cref="System.String"/> that represents the current <see cref="Grpc.Core.Internal.Slice"/>.
+        /// </summary>
+        public override string ToString()
+        {
+            return string.Format("[Slice: dataPtr={0}, length={1}]", dataPtr, length);
+        }
+    }
+}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 91d3957..b42e1e8 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -59,12 +59,16 @@
   } send_status_from_server;
   grpc_metadata_array recv_initial_metadata;
   grpc_byte_buffer* recv_message;
+  grpc_byte_buffer_reader* recv_message_reader;
   struct {
     grpc_metadata_array trailing_metadata;
     grpc_status_code status;
     grpc_slice status_details;
   } recv_status_on_client;
   int recv_close_on_server_cancelled;
+
+  /* reserve space for byte_buffer_reader */
+  grpc_byte_buffer_reader reserved_recv_message_reader;
 } grpcsharp_batch_context;
 
 GPR_EXPORT grpcsharp_batch_context* GPR_CALLTYPE
@@ -206,6 +210,9 @@
 
   grpcsharp_metadata_array_destroy_metadata_only(&(ctx->recv_initial_metadata));
 
+  if (ctx->recv_message_reader) {
+    grpc_byte_buffer_reader_destroy(ctx->recv_message_reader);
+  }
   grpc_byte_buffer_destroy(ctx->recv_message);
 
   grpcsharp_metadata_array_destroy_metadata_only(
@@ -287,6 +294,45 @@
   grpc_byte_buffer_reader_destroy(&reader);
 }
 
+/*
+ * Gets the next slice from recv_message byte buffer.
+ * Returns 1 if a slice was get successfully, 0 if there are no more slices to
+ * read. Set slice_len to the length of the slice and the slice_data_ptr to
+ * point to slice's data. Caller must ensure that the byte buffer being read
+ * from stays alive as long as the data of the slice are being accessed
+ * (grpc_byte_buffer_reader_peek method is used internally)
+ *
+ * Remarks:
+ * Slices can only be iterated once.
+ * Initializes recv_message_buffer_reader if it was not initialized yet.
+ */
+GPR_EXPORT int GPR_CALLTYPE
+grpcsharp_batch_context_recv_message_next_slice_peek(
+    grpcsharp_batch_context* ctx, size_t* slice_len, uint8_t** slice_data_ptr) {
+  *slice_len = 0;
+  *slice_data_ptr = NULL;
+
+  if (!ctx->recv_message) {
+    return 0;
+  }
+
+  if (!ctx->recv_message_reader) {
+    ctx->recv_message_reader = &ctx->reserved_recv_message_reader;
+    GPR_ASSERT(grpc_byte_buffer_reader_init(ctx->recv_message_reader,
+                                            ctx->recv_message));
+  }
+
+  grpc_slice* slice_ptr;
+  if (!grpc_byte_buffer_reader_peek(ctx->recv_message_reader, &slice_ptr)) {
+    return 0;
+  }
+
+  /* recv_message buffer must not be deleted before all the data is read */
+  *slice_len = GRPC_SLICE_LENGTH(*slice_ptr);
+  *slice_data_ptr = GRPC_SLICE_START_PTR(*slice_ptr);
+  return 1;
+}
+
 GPR_EXPORT grpc_status_code GPR_CALLTYPE
 grpcsharp_batch_context_recv_status_on_client_status(
     const grpcsharp_batch_context* ctx) {
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index c1e7fc1..cacdb30 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -7,8 +7,12 @@
     "Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
     "Grpc.Core.Internal.Tests.CompletionQueueEventTest",
     "Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest",
+    "Grpc.Core.Internal.Tests.DefaultDeserializationContextTest",
     "Grpc.Core.Internal.Tests.DefaultObjectPoolTest",
+    "Grpc.Core.Internal.Tests.FakeBufferReaderManagerTest",
     "Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest",
+    "Grpc.Core.Internal.Tests.ReusableSliceBufferTest",
+    "Grpc.Core.Internal.Tests.SliceTest",
     "Grpc.Core.Internal.Tests.TimespecTest",
     "Grpc.Core.Tests.AppDomainUnloadTest",
     "Grpc.Core.Tests.AuthContextTest",
diff --git a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c
index 0e9d56f..1da79e2 100644
--- a/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c
+++ b/src/csharp/unitypackage/unitypackage_skeleton/Plugins/Grpc.Core/runtimes/grpc_csharp_ext_dummy_stubs.c
@@ -50,6 +50,10 @@
   fprintf(stderr, "Should never reach here");
   abort();
 }
+void grpcsharp_batch_context_recv_message_next_slice_peek() {
+  fprintf(stderr, "Should never reach here");
+  abort();
+}
 void grpcsharp_batch_context_recv_status_on_client_status() {
   fprintf(stderr, "Should never reach here");
   abort();
diff --git a/templates/src/csharp/Grpc.Core/Internal/native_methods.include b/templates/src/csharp/Grpc.Core/Internal/native_methods.include
index e8ec4c8..dab9599 100644
--- a/templates/src/csharp/Grpc.Core/Internal/native_methods.include
+++ b/templates/src/csharp/Grpc.Core/Internal/native_methods.include
@@ -7,6 +7,7 @@
     'IntPtr grpcsharp_batch_context_recv_initial_metadata(BatchContextSafeHandle ctx)',
     'IntPtr grpcsharp_batch_context_recv_message_length(BatchContextSafeHandle ctx)',
     'void grpcsharp_batch_context_recv_message_to_buffer(BatchContextSafeHandle ctx, byte[] buffer, UIntPtr bufferLen)',
+    'int grpcsharp_batch_context_recv_message_next_slice_peek(BatchContextSafeHandle ctx, out UIntPtr sliceLen, out IntPtr sliceDataPtr)',
     'StatusCode grpcsharp_batch_context_recv_status_on_client_status(BatchContextSafeHandle ctx)',
     'IntPtr grpcsharp_batch_context_recv_status_on_client_details(BatchContextSafeHandle ctx, out UIntPtr detailsLength)',
     'IntPtr grpcsharp_batch_context_recv_status_on_client_trailing_metadata(BatchContextSafeHandle ctx)',