netty: Respect io.netty.allocator.type="unpooled" when getting Netty Allocator (#10543)

Fixes #10292
diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java
index 4af2f84..96f19aa 100644
--- a/netty/src/main/java/io/grpc/netty/Utils.java
+++ b/netty/src/main/java/io/grpc/netty/Utils.java
@@ -37,6 +37,7 @@
 import io.grpc.netty.NettySocketSupport.NativeSocketOptions;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelConfig;
 import io.netty.channel.ChannelFactory;
@@ -60,6 +61,7 @@
 import java.lang.reflect.Constructor;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.UnresolvedAddressException;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -135,6 +137,13 @@
   public static ByteBufAllocator getByteBufAllocator(boolean forceHeapBuffer) {
     if (Boolean.parseBoolean(
             System.getProperty("io.grpc.netty.useCustomAllocator", "true"))) {
+
+      String allocType = System.getProperty("io.netty.allocator.type", "pooled");
+      if (allocType.toLowerCase(Locale.ROOT).equals("unpooled")) {
+        logger.log(Level.FINE, "Using unpooled allocator");
+        return UnpooledByteBufAllocator.DEFAULT;
+      }
+
       boolean defaultPreferDirect = PooledByteBufAllocator.defaultPreferDirect();
       logger.log(
           Level.FINE,
diff --git a/netty/src/test/java/io/grpc/netty/UtilsTest.java b/netty/src/test/java/io/grpc/netty/UtilsTest.java
index 90330a5..95f5773 100644
--- a/netty/src/test/java/io/grpc/netty/UtilsTest.java
+++ b/netty/src/test/java/io/grpc/netty/UtilsTest.java
@@ -19,6 +19,7 @@
 import static com.google.common.truth.Truth.assertThat;
 import static com.google.common.truth.TruthJUnit.assume;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 
@@ -28,6 +29,7 @@
 import io.grpc.Metadata;
 import io.grpc.Status;
 import io.grpc.internal.GrpcUtil;
+import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFactory;
 import io.netty.channel.ChannelOption;
@@ -71,6 +73,21 @@
   }
 
   @Test
+  public void testGetBufferAllocator() {
+    ByteBufAllocator heapAllocator = Utils.getByteBufAllocator(true);
+    ByteBufAllocator directAllocator = Utils.getByteBufAllocator(false);
+    assertNotEquals(heapAllocator, directAllocator);
+
+    System.setProperty("io.netty.allocator.type", "unpooled");
+    ByteBufAllocator unpooled1 = Utils.getByteBufAllocator(false);
+    assertThat(unpooled1.getClass().getName()).isNotEqualTo("UnpooledByteBufAllocator");
+
+    System.setProperty("io.netty.allocator.type", "pooled");
+    ByteBufAllocator unpooled2 = Utils.getByteBufAllocator(false);
+    assertEquals(directAllocator, unpooled2);
+  }
+
+  @Test
   public void convertClientHeaders_sanitizes() {
     Metadata metaData = new Metadata();