[caffe2] update load_save_test.py to also verify the chunking behavior (#53401)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/53401
This is a reland of D26641599 (https://github.com/pytorch/pytorch/commit/cd9ac54ea75d4a530b6b9dccaaf1167cabafc789) after rebasing onto D26802576 (https://github.com/pytorch/pytorch/commit/f595ba1bae9a75c6403e4624e7294b5635acd40c).
Add some small utility functions to read the blob names back from the minidb
file so that we can verify how many chunks were written for each blob.
ghstack-source-id: 123567033
Test Plan: buck test caffe2/caffe2/python/operator_test:load_save_test
Reviewed By: mraway
Differential Revision: D26853942
fbshipit-source-id: 0b45078fdd279f547752c8fdb771e296374a00da
diff --git a/caffe2/python/operator_test/load_save_test.py b/caffe2/python/operator_test/load_save_test.py
index 1dacbc6..01310f0 100644
--- a/caffe2/python/operator_test/load_save_test.py
+++ b/caffe2/python/operator_test/load_save_test.py
@@ -1,12 +1,15 @@
import errno
import hypothesis.strategies as st
from hypothesis import given, assume, settings
+import io
+import math
import numpy as np
import os
import shutil
+import struct
import unittest
from pathlib import Path
-from typing import List, Optional, Tuple, Type
+from typing import Dict, Generator, List, NamedTuple, Optional, Tuple, Type
from caffe2.proto import caffe2_pb2
from caffe2.python import core, test_util, workspace
@@ -19,6 +22,11 @@
max_gpuid = 0
+class MiniDBEntry(NamedTuple):
+ key: str
+ value_size: int
+
+
# Utility class for other loading tests, don't add test functions here
# Inherit from this test instead. If you add a test here,
# each derived class will inherit it as well and cause test duplication
@@ -473,11 +481,55 @@
for name, data in blobs:
np.testing.assert_array_equal(workspace.FetchBlob(name), data)
- def testSaveWithChunkSize(self) -> None:
+ def _read_minidb_entries(
+ self, path: Path
+ ) -> Generator[MiniDBEntry, None, None]:
+ """Read the entry information out of a minidb file.
+ """
+ header = struct.Struct("=ii")
+ with path.open("rb") as f:
+ while True:
+ buf = f.read(header.size)
+ if not buf:
+ break
+ if len(buf) < header.size:
+ raise Exception("early EOF in minidb header")
+ (key_len, value_len) = header.unpack(buf)
+ if key_len < 0 or value_len < 0:
+ raise Exception(
+ f"invalid minidb header: ({key_len}, {value_len})"
+ )
+ key = f.read(key_len)
+ if len(key) < key_len:
+ raise Exception("early EOF in minidb key")
+ f.seek(value_len, io.SEEK_CUR)
+ yield MiniDBEntry(key=key.decode("utf-8"), value_size=value_len)
+
+ def _read_chunk_info(self, path: Path) -> Dict[str, List[MiniDBEntry]]:
+ """Read a minidb file and return the names of each blob and how many
+ chunks are stored for that blob.
+ """
+ chunk_id_separator = "#%"
+ results: Dict[str, List[MiniDBEntry]] = {}
+ for entry in self._read_minidb_entries(path):
+ parts = entry.key.rsplit(chunk_id_separator, 1)
+ if len(parts) == 0:
+ assert entry.key not in results
+ results[entry.key] = [entry]
+ else:
+ blob_name = parts[0]
+ results.setdefault(blob_name, [])
+ results[blob_name].append(entry)
+
+ return results
+
+ def _test_save_with_chunk_size(
+ self, num_elems: int, chunk_size: int, expected_num_chunks: int,
+ ) -> None:
tmp_folder = self.make_tempdir()
tmp_file = str(tmp_folder / "save.output")
- blobs = self.create_test_blobs()
+ blobs = self.create_test_blobs(num_elems)
# Saves the blobs to a local db.
save_op = core.CreateOperator(
@@ -487,12 +539,46 @@
absolute_path=1,
db=tmp_file,
db_type=self._db_type,
- chunk_size=32,
+ chunk_size=chunk_size,
)
self.assertTrue(workspace.RunOperatorOnce(save_op))
self.load_and_check_blobs(blobs, [tmp_file])
+ blob_chunks = self._read_chunk_info(Path(tmp_file))
+ for blob_name, chunks in blob_chunks.items():
+ self.assertEqual(len(chunks), expected_num_chunks)
+
+ def testSaveWithChunkSize(self) -> None:
+ num_elems = 1234
+ chunk_size = 32
+ expected_num_chunks = math.ceil(num_elems / chunk_size)
+ self._test_save_with_chunk_size(
+ num_elems=num_elems,
+ chunk_size=chunk_size,
+ expected_num_chunks=expected_num_chunks,
+ )
+
+ def testSaveWithDefaultChunkSize(self) -> None:
+ # This is the default value of the --caffe2_tensor_chunk_size flag from
+ # core/blob_serialization.cc
+ #
+ # Test with just slightly more than this to ensure that 2 chunks are
+ # used.
+ default_chunk_size = 1000000
+ self._test_save_with_chunk_size(
+ num_elems=default_chunk_size + 10,
+ chunk_size=-1,
+ expected_num_chunks=2,
+ )
+
+ def testSaveWithNoChunking(self) -> None:
+ default_chunk_size = 1000000
+ self._test_save_with_chunk_size(
+ num_elems=default_chunk_size + 10,
+ chunk_size=0,
+ expected_num_chunks=1,
+ )
if __name__ == '__main__':