blob: 513a02485a1035d3098d8d178e00b46bdc9b273e [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2013 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import json
from cStringIO import StringIO
import unittest
from zipfile import ZipFile
from caching_file_system import CachingFileSystem
from file_system import FileNotFoundError, StatInfo
from fake_url_fetcher import FakeURLFSFetcher
from local_file_system import LocalFileSystem
from new_github_file_system import GithubFileSystem
from object_store_creator import ObjectStoreCreator
from test_file_system import TestFileSystem
# A made up sha hash value.
FAKE_HASH = '7becb9f554dec76bd0fc12c1d32dbaff1d134a4d'
def _ZipFromFiles(file_dict):
string = StringIO()
zipfile = ZipFile(string, 'w')
for filename, contents in file_dict.iteritems():
zipfile.writestr(filename, contents)
zipfile.close()
return string.getvalue()
class TestGithubFileSystem(unittest.TestCase):
def setUp(self):
self._gfs = GithubFileSystem.ForTest('repo', FakeURLFSFetcher.CreateLocal())
# Start and finish the repository load.
self._cgfs = CachingFileSystem(
self._gfs, ObjectStoreCreator.ForTest())
def testReadDirectory(self):
self._gfs.Refresh().Get()
self.assertEqual(
sorted(['requirements.txt', '.gitignore', 'README.md', 'src/']),
sorted(self._gfs.ReadSingle('').Get()))
self.assertEqual(
sorted(['__init__.notpy', 'hello.notpy']),
sorted(self._gfs.ReadSingle('src/').Get()))
def testReadFile(self):
self._gfs.Refresh().Get()
expected = (
'# Compiled Python files\n'
'*.pyc\n'
)
self.assertEqual(expected, self._gfs.ReadSingle('.gitignore').Get())
def testMultipleReads(self):
self._gfs.Refresh().Get()
self.assertEqual(
self._gfs.ReadSingle('requirements.txt').Get(),
self._gfs.ReadSingle('requirements.txt').Get())
def testReads(self):
self._gfs.Refresh().Get()
expected = {
'src/': sorted(['hello.notpy', '__init__.notpy']),
'': sorted(['requirements.txt', '.gitignore', 'README.md', 'src/'])
}
read = self._gfs.Read(['', 'src/']).Get()
self.assertEqual(expected['src/'], sorted(read['src/']))
self.assertEqual(expected[''], sorted(read['']))
def testStat(self):
self._gfs.Refresh().Get()
dir_stat = StatInfo(FAKE_HASH, {
'hello.notpy': StatInfo(FAKE_HASH),
'__init__.notpy': StatInfo(FAKE_HASH)
})
self.assertEqual(StatInfo(FAKE_HASH), self._gfs.Stat('README.md'))
self.assertEqual(StatInfo(FAKE_HASH), self._gfs.Stat('src/hello.notpy'))
self.assertEqual(dir_stat, self._gfs.Stat('src/'))
def testBadReads(self):
self._gfs.Refresh().Get()
self.assertRaises(FileNotFoundError, self._gfs.Stat, 'DONT_README.md')
self.assertRaises(FileNotFoundError,
self._gfs.ReadSingle('DONT_README.md').Get)
def testCachingFileSystem(self):
self._cgfs.Refresh().Get()
initial_cgfs_read_one = self._cgfs.ReadSingle('src/hello.notpy').Get()
self.assertEqual(initial_cgfs_read_one,
self._gfs.ReadSingle('src/hello.notpy').Get())
self.assertEqual(initial_cgfs_read_one,
self._cgfs.ReadSingle('src/hello.notpy').Get())
initial_cgfs_read_two = self._cgfs.Read(
['README.md', 'requirements.txt']).Get()
self.assertEqual(
initial_cgfs_read_two,
self._gfs.Read(['README.md', 'requirements.txt']).Get())
self.assertEqual(
initial_cgfs_read_two,
self._cgfs.Read(['README.md', 'requirements.txt']).Get())
def testWithoutRefresh(self):
self.assertRaises(FileNotFoundError, self._gfs.ReadSingle('src/').Get)
def testRefresh(self):
def make_sha_json(hash_value):
from copy import deepcopy
commit_json = json.loads(deepcopy(LocalFileSystem('').ReadSingle(
'test_data/github_file_system/test_owner/repo/commits/HEAD').Get()))
commit_json['commit']['tree']['sha'] = 'hash' + FAKE_HASH[4:]
return json.dumps(commit_json)
files = {
'zipfile/': '',
'zipfile/hello.txt': 'world',
'zipfile/readme': 'test zip',
'zipfile/dir/file1': 'contents',
'zipfile/dir/file2': 'more contents'
}
string = _ZipFromFiles(files)
test_files = {
'test_owner': {
'changing-repo': {
'commits': {
'HEAD': make_sha_json(FAKE_HASH)
},
'zipball': string
}
}
}
test_file_system = TestFileSystem(test_files)
gfs = GithubFileSystem.ForTest(
'changing-repo', FakeURLFSFetcher.Create(test_file_system), path='')
gfs.Refresh().Get()
initial_dir_read = sorted(gfs.ReadSingle('').Get())
initial_file_read = gfs.ReadSingle('dir/file1').Get()
# Change the underlying data.
files['zipfile/hello.txt'] = 'hello world again'
files['zipfile/new-file'] = 'a new file'
files['zipfile/dir/file1'] = 'changed file'
test_files['test_owner']['changing-repo']['zipball'] = _ZipFromFiles(files)
test_files['test_owner']['changing-repo']['commits']['HEAD'] = (
make_sha_json(FAKE_HASH + 'hash'))
# Check that changes have not effected the file system yet.
self.assertEqual(initial_dir_read, sorted(gfs.ReadSingle('').Get()))
self.assertEqual(initial_file_read, gfs.ReadSingle('dir/file1').Get())
gfs.Refresh().Get()
# Check that the changes have effected the file system.
self.assertTrue('new-file' in gfs.ReadSingle('').Get())
self.assertEqual(files['zipfile/dir/file1'],
gfs.ReadSingle('dir/file1').Get())
if __name__ == '__main__':
unittest.main()