| """Tests common to tarfile and zipfile.""" |
| |
| import os |
| import sys |
| |
| from test.support import swap_attr |
| from test.support import os_helper |
| |
| class OverwriteTests: |
| |
| def setUp(self): |
| os.makedirs(self.testdir) |
| self.addCleanup(os_helper.rmtree, self.testdir) |
| |
| def create_file(self, path, content=b''): |
| with open(path, 'wb') as f: |
| f.write(content) |
| |
| def open(self, path): |
| raise NotImplementedError |
| |
| def extractall(self, ar): |
| raise NotImplementedError |
| |
| |
| def test_overwrite_file_as_file(self): |
| target = os.path.join(self.testdir, 'test') |
| self.create_file(target, b'content') |
| with self.open(self.ar_with_file) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.isfile(target)) |
| with open(target, 'rb') as f: |
| self.assertEqual(f.read(), b'newcontent') |
| |
| def test_overwrite_dir_as_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| os.mkdir(target) |
| with self.open(self.ar_with_dir) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.isdir(target)) |
| |
| def test_overwrite_dir_as_implicit_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| os.mkdir(target) |
| with self.open(self.ar_with_implicit_dir) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.isdir(target)) |
| self.assertTrue(os.path.isfile(os.path.join(target, 'file'))) |
| with open(os.path.join(target, 'file'), 'rb') as f: |
| self.assertEqual(f.read(), b'newcontent') |
| |
| def test_overwrite_dir_as_file(self): |
| target = os.path.join(self.testdir, 'test') |
| os.mkdir(target) |
| with self.open(self.ar_with_file) as ar: |
| with self.assertRaises(PermissionError if sys.platform == 'win32' |
| else IsADirectoryError): |
| self.extractall(ar) |
| self.assertTrue(os.path.isdir(target)) |
| |
| def test_overwrite_file_as_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| self.create_file(target, b'content') |
| with self.open(self.ar_with_dir) as ar: |
| with self.assertRaises(FileExistsError): |
| self.extractall(ar) |
| self.assertTrue(os.path.isfile(target)) |
| with open(target, 'rb') as f: |
| self.assertEqual(f.read(), b'content') |
| |
| def test_overwrite_file_as_implicit_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| self.create_file(target, b'content') |
| with self.open(self.ar_with_implicit_dir) as ar: |
| with self.assertRaises(FileNotFoundError if sys.platform == 'win32' |
| else NotADirectoryError): |
| self.extractall(ar) |
| self.assertTrue(os.path.isfile(target)) |
| with open(target, 'rb') as f: |
| self.assertEqual(f.read(), b'content') |
| |
| @os_helper.skip_unless_symlink |
| def test_overwrite_file_symlink_as_file(self): |
| # XXX: It is potential security vulnerability. |
| target = os.path.join(self.testdir, 'test') |
| target2 = os.path.join(self.testdir, 'test2') |
| self.create_file(target2, b'content') |
| os.symlink('test2', target) |
| with self.open(self.ar_with_file) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.islink(target)) |
| self.assertTrue(os.path.isfile(target2)) |
| with open(target2, 'rb') as f: |
| self.assertEqual(f.read(), b'newcontent') |
| |
| @os_helper.skip_unless_symlink |
| def test_overwrite_broken_file_symlink_as_file(self): |
| # XXX: It is potential security vulnerability. |
| target = os.path.join(self.testdir, 'test') |
| target2 = os.path.join(self.testdir, 'test2') |
| os.symlink('test2', target) |
| with self.open(self.ar_with_file) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.islink(target)) |
| self.assertTrue(os.path.isfile(target2)) |
| with open(target2, 'rb') as f: |
| self.assertEqual(f.read(), b'newcontent') |
| |
| @os_helper.skip_unless_symlink |
| def test_overwrite_dir_symlink_as_dir(self): |
| # XXX: It is potential security vulnerability. |
| target = os.path.join(self.testdir, 'test') |
| target2 = os.path.join(self.testdir, 'test2') |
| os.mkdir(target2) |
| os.symlink('test2', target, target_is_directory=True) |
| with self.open(self.ar_with_dir) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.islink(target)) |
| self.assertTrue(os.path.isdir(target2)) |
| |
| @os_helper.skip_unless_symlink |
| def test_overwrite_dir_symlink_as_implicit_dir(self): |
| # XXX: It is potential security vulnerability. |
| target = os.path.join(self.testdir, 'test') |
| target2 = os.path.join(self.testdir, 'test2') |
| os.mkdir(target2) |
| os.symlink('test2', target, target_is_directory=True) |
| with self.open(self.ar_with_implicit_dir) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.islink(target)) |
| self.assertTrue(os.path.isdir(target2)) |
| self.assertTrue(os.path.isfile(os.path.join(target2, 'file'))) |
| with open(os.path.join(target2, 'file'), 'rb') as f: |
| self.assertEqual(f.read(), b'newcontent') |
| |
| @os_helper.skip_unless_symlink |
| def test_overwrite_broken_dir_symlink_as_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| target2 = os.path.join(self.testdir, 'test2') |
| os.symlink('test2', target, target_is_directory=True) |
| with self.open(self.ar_with_dir) as ar: |
| with self.assertRaises(FileExistsError): |
| self.extractall(ar) |
| self.assertTrue(os.path.islink(target)) |
| self.assertFalse(os.path.exists(target2)) |
| |
| @os_helper.skip_unless_symlink |
| def test_overwrite_broken_dir_symlink_as_implicit_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| target2 = os.path.join(self.testdir, 'test2') |
| os.symlink('test2', target, target_is_directory=True) |
| with self.open(self.ar_with_implicit_dir) as ar: |
| with self.assertRaises(FileExistsError): |
| self.extractall(ar) |
| self.assertTrue(os.path.islink(target)) |
| self.assertFalse(os.path.exists(target2)) |
| |
| def test_concurrent_extract_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| def concurrent_mkdir(*args, **kwargs): |
| orig_mkdir(*args, **kwargs) |
| orig_mkdir(*args, **kwargs) |
| with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir: |
| with self.open(self.ar_with_dir) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.isdir(target)) |
| |
| def test_concurrent_extract_implicit_dir(self): |
| target = os.path.join(self.testdir, 'test') |
| def concurrent_mkdir(*args, **kwargs): |
| orig_mkdir(*args, **kwargs) |
| orig_mkdir(*args, **kwargs) |
| with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir: |
| with self.open(self.ar_with_implicit_dir) as ar: |
| self.extractall(ar) |
| self.assertTrue(os.path.isdir(target)) |
| self.assertTrue(os.path.isfile(os.path.join(target, 'file'))) |