| # Copyright 2013 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from file_system import FileSystem, FileNotFoundError, StatInfo |
| from future import Future |
| from path_util import AssertIsValid, AssertIsDirectory, IsDirectory |
| |
| |
| def MoveTo(base, obj): |
| '''Returns an object as |obj| moved to |base|. That is, |
| MoveTo('foo/bar', {'a': 'b'}) -> {'foo': {'bar': {'a': 'b'}}} |
| ''' |
| AssertIsDirectory(base) |
| result = {} |
| leaf = result |
| for k in base.rstrip('/').split('/'): |
| leaf[k] = {} |
| leaf = leaf[k] |
| leaf.update(obj) |
| return result |
| |
| |
| def MoveAllTo(base, obj): |
| '''Moves every value in |obj| to |base|. See MoveTo. |
| ''' |
| result = {} |
| for key, value in obj.iteritems(): |
| result[key] = MoveTo(base, value) |
| return result |
| |
| |
| def _List(file_system): |
| '''Returns a list of '/' separated paths derived from |file_system|. |
| For example, {'index.html': '', 'www': {'file.txt': ''}} would return |
| ['index.html', 'www/file.txt']. |
| ''' |
| assert isinstance(file_system, dict) |
| result = {} |
| def update_result(item, path): |
| AssertIsValid(path) |
| if isinstance(item, dict): |
| if path != '': |
| path += '/' |
| result[path] = [p if isinstance(content, basestring) else (p + '/') |
| for p, content in item.iteritems()] |
| for subpath, subitem in item.iteritems(): |
| update_result(subitem, path + subpath) |
| elif isinstance(item, basestring): |
| result[path] = item |
| else: |
| raise ValueError('Unsupported item type: %s' % type(item)) |
| update_result(file_system, '') |
| return result |
| |
| |
| class _StatTracker(object): |
| '''Maintains the versions of paths in a file system. The versions of files |
| are changed either by |Increment| or |SetVersion|. The versions of |
| directories are derived from the versions of files within it. |
| ''' |
| |
| def __init__(self): |
| self._path_stats = {} |
| self._global_stat = 0 |
| |
| def Increment(self, path=None, by=1): |
| if path is None: |
| self._global_stat += by |
| else: |
| self.SetVersion(path, self._path_stats.get(path, 0) + by) |
| |
| def SetVersion(self, path, new_version): |
| if IsDirectory(path): |
| raise ValueError('Only files have an incrementable stat, ' |
| 'but "%s" is a directory' % path) |
| |
| # Update version of that file. |
| self._path_stats[path] = new_version |
| |
| # Update all parent directory versions as well. |
| slash_index = 0 # (deliberately including '' in the dir paths) |
| while slash_index != -1: |
| dir_path = path[:slash_index] + '/' |
| self._path_stats[dir_path] = max(self._path_stats.get(dir_path, 0), |
| new_version) |
| if dir_path == '/': |
| # Legacy support for '/' being the root of the file system rather |
| # than ''. Eventually when the path normalisation logic is complete |
| # this will be impossible and this logic will change slightly. |
| self._path_stats[''] = self._path_stats['/'] |
| slash_index = path.find('/', slash_index + 1) |
| |
| def GetVersion(self, path): |
| return self._global_stat + self._path_stats.get(path, 0) |
| |
| |
| class TestFileSystem(FileSystem): |
| '''A FileSystem backed by an object. Create with an object representing file |
| paths such that {'a': {'b': 'hello'}} will resolve Read('a/b') as 'hello', |
| Read('a/') as ['b'], and Stat determined by a value incremented via |
| IncrementStat. |
| ''' |
| |
| def __init__(self, obj, relative_to=None, identity=None): |
| assert obj is not None |
| if relative_to is not None: |
| obj = MoveTo(relative_to, obj) |
| self._identity = identity or type(self).__name__ |
| self._path_values = _List(obj) |
| self._stat_tracker = _StatTracker() |
| |
| # |
| # FileSystem implementation. |
| # |
| |
| def Read(self, paths, skip_not_found=False): |
| for path in paths: |
| if path not in self._path_values: |
| if skip_not_found: continue |
| return FileNotFoundError.RaiseInFuture( |
| '%s not in %s' % (path, '\n'.join(self._path_values))) |
| return Future(value=dict((k, v) for k, v in self._path_values.iteritems() |
| if k in paths)) |
| |
| def Refresh(self): |
| return Future(value=()) |
| |
| def Stat(self, path): |
| read_result = self.ReadSingle(path).Get() |
| stat_result = StatInfo(str(self._stat_tracker.GetVersion(path))) |
| if isinstance(read_result, list): |
| stat_result.child_versions = dict( |
| (file_result, |
| str(self._stat_tracker.GetVersion('%s%s' % (path, file_result)))) |
| for file_result in read_result) |
| return stat_result |
| |
| # |
| # Testing methods. |
| # |
| |
| def IncrementStat(self, path=None, by=1): |
| self._stat_tracker.Increment(path, by=by) |
| |
| def GetIdentity(self): |
| return self._identity |