[caffe2] update load_save_test.py to also verify the chunking behavior (#53401)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/53401

This is a reland of D26641599 (https://github.com/pytorch/pytorch/commit/cd9ac54ea75d4a530b6b9dccaaf1167cabafc789) after rebasing onto D26802576 (https://github.com/pytorch/pytorch/commit/f595ba1bae9a75c6403e4624e7294b5635acd40c).

Add some small utility functions to read the blob names back from the minidb
file so that we can verify how many chunks were written for each blob.
ghstack-source-id: 123567033

Test Plan: buck test caffe2/caffe2/python/operator_test:load_save_test

Reviewed By: mraway

Differential Revision: D26853942

fbshipit-source-id: 0b45078fdd279f547752c8fdb771e296374a00da
diff --git a/caffe2/python/operator_test/load_save_test.py b/caffe2/python/operator_test/load_save_test.py
index 1dacbc6..01310f0 100644
--- a/caffe2/python/operator_test/load_save_test.py
+++ b/caffe2/python/operator_test/load_save_test.py
@@ -1,12 +1,15 @@
 import errno
 import hypothesis.strategies as st
 from hypothesis import given, assume, settings
+import io
+import math
 import numpy as np
 import os
 import shutil
+import struct
 import unittest
 from pathlib import Path
-from typing import List, Optional, Tuple, Type
+from typing import Dict, Generator, List, NamedTuple, Optional, Tuple, Type
 
 from caffe2.proto import caffe2_pb2
 from caffe2.python import core, test_util, workspace
@@ -19,6 +22,11 @@
     max_gpuid = 0
 
 
+class MiniDBEntry(NamedTuple):
+    key: str
+    value_size: int
+
+
 # Utility class for other loading tests, don't add test functions here
 # Inherit from this test instead. If you add a test here,
 # each derived class will inherit it as well and cause test duplication
@@ -473,11 +481,55 @@
         for name, data in blobs:
             np.testing.assert_array_equal(workspace.FetchBlob(name), data)
 
-    def testSaveWithChunkSize(self) -> None:
+    def _read_minidb_entries(
+        self, path: Path
+    ) -> Generator[MiniDBEntry, None, None]:
+        """Read the entry information out of a minidb file.
+        """
+        header = struct.Struct("=ii")
+        with path.open("rb") as f:
+            while True:
+                buf = f.read(header.size)
+                if not buf:
+                    break
+                if len(buf) < header.size:
+                    raise Exception("early EOF in minidb header")
+                (key_len, value_len) = header.unpack(buf)
+                if key_len < 0 or value_len < 0:
+                    raise Exception(
+                        f"invalid minidb header: ({key_len}, {value_len})"
+                    )
+                key = f.read(key_len)
+                if len(key) < key_len:
+                    raise Exception("early EOF in minidb key")
+                f.seek(value_len, io.SEEK_CUR)
+                yield MiniDBEntry(key=key.decode("utf-8"), value_size=value_len)
+
+    def _read_chunk_info(self, path: Path) -> Dict[str, List[MiniDBEntry]]:
+        """Read a minidb file and return the names of each blob and how many
+        chunks are stored for that blob.
+        """
+        chunk_id_separator = "#%"
+        results: Dict[str, List[MiniDBEntry]] = {}
+        for entry in self._read_minidb_entries(path):
+            parts = entry.key.rsplit(chunk_id_separator, 1)
+            if len(parts) == 0:
+                assert entry.key not in results
+                results[entry.key] = [entry]
+            else:
+                blob_name = parts[0]
+                results.setdefault(blob_name, [])
+                results[blob_name].append(entry)
+
+        return results
+
+    def _test_save_with_chunk_size(
+        self, num_elems: int, chunk_size: int, expected_num_chunks: int,
+    ) -> None:
         tmp_folder = self.make_tempdir()
         tmp_file = str(tmp_folder / "save.output")
 
-        blobs = self.create_test_blobs()
+        blobs = self.create_test_blobs(num_elems)
 
         # Saves the blobs to a local db.
         save_op = core.CreateOperator(
@@ -487,12 +539,46 @@
             absolute_path=1,
             db=tmp_file,
             db_type=self._db_type,
-            chunk_size=32,
+            chunk_size=chunk_size,
         )
         self.assertTrue(workspace.RunOperatorOnce(save_op))
 
         self.load_and_check_blobs(blobs, [tmp_file])
 
+        blob_chunks = self._read_chunk_info(Path(tmp_file))
+        for blob_name, chunks in blob_chunks.items():
+            self.assertEqual(len(chunks), expected_num_chunks)
+
+    def testSaveWithChunkSize(self) -> None:
+        num_elems = 1234
+        chunk_size = 32
+        expected_num_chunks = math.ceil(num_elems / chunk_size)
+        self._test_save_with_chunk_size(
+            num_elems=num_elems,
+            chunk_size=chunk_size,
+            expected_num_chunks=expected_num_chunks,
+        )
+
+    def testSaveWithDefaultChunkSize(self) -> None:
+        # This is the default value of the --caffe2_tensor_chunk_size flag from
+        # core/blob_serialization.cc
+        #
+        # Test with just slightly more than this to ensure that 2 chunks are
+        # used.
+        default_chunk_size = 1000000
+        self._test_save_with_chunk_size(
+            num_elems=default_chunk_size + 10,
+            chunk_size=-1,
+            expected_num_chunks=2,
+        )
+
+    def testSaveWithNoChunking(self) -> None:
+        default_chunk_size = 1000000
+        self._test_save_with_chunk_size(
+            num_elems=default_chunk_size + 10,
+            chunk_size=0,
+            expected_num_chunks=1,
+        )
 
 
 if __name__ == '__main__':