blob: 07458bc385d61e359d6bb15618e573a27cba5dbe [file] [log] [blame]
# Copyright 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""This unittest covers both file_storage and serialization modules."""
import os
import tempfile
import time
import unittest
from memory_inspector.core import memory_map
from memory_inspector.core import native_heap
from memory_inspector.core import stacktrace
from memory_inspector.core import symbol
from memory_inspector.data import file_storage
class FileStorageTest(unittest.TestCase):
def setUp(self):
self._storage_path = tempfile.mkdtemp()
self._storage = file_storage.Storage(self._storage_path)
def tearDown(self):
os.removedirs(self._storage_path)
def testSettings(self):
settings_1 = { 'foo' : 1, 'bar' : 2 }
settings_2 = { 'foo' : 1, 'bar' : 2 }
self._storage.StoreSettings('one', settings_1)
self._storage.StoreSettings('two', settings_2)
self._DeepCompare(settings_1, self._storage.LoadSettings('one'))
self._DeepCompare(settings_2, self._storage.LoadSettings('two'))
self._storage.StoreSettings('one', {})
self._storage.StoreSettings('two', {})
def testArchives(self):
self._storage.OpenArchive('foo', create=True)
self._storage.OpenArchive('bar', create=True)
self._storage.OpenArchive('baz', create=True)
self._storage.DeleteArchive('bar')
self.assertTrue('foo' in self._storage.ListArchives())
self.assertFalse('bar' in self._storage.ListArchives())
self.assertTrue('baz' in self._storage.ListArchives())
self._storage.DeleteArchive('foo')
self._storage.DeleteArchive('baz')
def testSnapshots(self):
archive = self._storage.OpenArchive('snapshots', create=True)
t1 = archive.StartNewSnapshot()
archive.StoreMemMaps(memory_map.Map())
time.sleep(0.01) # Max snapshot resolution is in the order of usecs.
t2 = archive.StartNewSnapshot()
archive.StoreMemMaps(memory_map.Map())
archive.StoreNativeHeap(native_heap.NativeHeap())
self.assertIn(t1, archive.ListSnapshots())
self.assertIn(t2, archive.ListSnapshots())
self.assertTrue(archive.HasMemMaps(t1))
self.assertFalse(archive.HasNativeHeap(t1))
self.assertTrue(archive.HasMemMaps(t2))
self.assertTrue(archive.HasNativeHeap(t2))
self._storage.DeleteArchive('snapshots')
def testMmap(self):
archive = self._storage.OpenArchive('mmap', create=True)
timestamp = archive.StartNewSnapshot()
mmap = memory_map.Map()
map_entry1 = memory_map.MapEntry(4096, 8191, 'rw--', '/foo', 0)
map_entry2 = memory_map.MapEntry(65536, 81919, 'rw--', '/bar', 4096)
map_entry2.resident_pages = [5]
mmap.Add(map_entry1)
mmap.Add(map_entry2)
archive.StoreMemMaps(mmap)
mmap_deser = archive.LoadMemMaps(timestamp)
self._DeepCompare(mmap, mmap_deser)
self._storage.DeleteArchive('mmap')
def testNativeHeap(self):
archive = self._storage.OpenArchive('nheap', create=True)
timestamp = archive.StartNewSnapshot()
nh = native_heap.NativeHeap()
for i in xrange(1, 4):
stack_trace = stacktrace.Stacktrace()
frame = nh.GetStackFrame(i * 10 + 1)
frame.SetExecFileInfo('foo.so', 1)
stack_trace.Add(frame)
frame = nh.GetStackFrame(i * 10 + 2)
frame.SetExecFileInfo('bar.so', 2)
stack_trace.Add(frame)
nh.Add(native_heap.Allocation(i * 2, i * 3, stack_trace))
archive.StoreNativeHeap(nh)
nh_deser = archive.LoadNativeHeap(timestamp)
self._DeepCompare(nh, nh_deser)
self._storage.DeleteArchive('nheap')
def testSymbols(self):
archive = self._storage.OpenArchive('symbols', create=True)
symbols = symbol.Symbols()
# Symbol db is global per archive, no need to StartNewSnapshot.
symbols.Add('foo.so', 1, symbol.Symbol('sym1', 'file1.c', 11))
symbols.Add('bar.so', 2, symbol.Symbol('sym2', 'file2.c', 12))
sym3 = symbol.Symbol('sym3', 'file2.c', 13)
sym3.AddSourceLineInfo('outer_file.c', 23)
symbols.Add('baz.so', 3, sym3)
archive.StoreSymbols(symbols)
symbols_deser = archive.LoadSymbols()
self._DeepCompare(symbols, symbols_deser)
self._storage.DeleteArchive('symbols')
def _DeepCompare(self, a, b, prefix=''):
"""Recursively compares two objects (original and deserialized)."""
self.assertEqual(a is None, b is None)
if a is None:
return
_BASICTYPES = (int, basestring, float)
if isinstance(a, _BASICTYPES) and isinstance(b, _BASICTYPES):
return self.assertEqual(a, b, prefix)
self.assertEqual(type(a), type(b), prefix + ' type (%s vs %s' % (
type(a), type(b)))
if isinstance(a, list):
self.assertEqual(len(a), len(b), prefix + ' len (%d vs %d)' % (
len(a), len(b)))
for i in range(len(a)):
self._DeepCompare(a[i], b[i], prefix + '[%d]' % i)
return
if isinstance(a, dict):
self.assertEqual(a.keys(), b.keys(), prefix + ' keys (%s vs %s)' % (
str(a.keys()), str(b.keys())))
for k in a.iterkeys():
self._DeepCompare(a[k], b[k], prefix + '.' + str(k))
return
return self._DeepCompare(a.__dict__, b.__dict__, prefix)