|  | """ | 
|  | Unit tests for CLI entry points. | 
|  | """ | 
|  |  | 
|  | from __future__ import print_function | 
|  |  | 
|  | import functools | 
|  | import io | 
|  | import os | 
|  | import sys | 
|  | import typing | 
|  | import unittest | 
|  | from contextlib import contextmanager, redirect_stdout, redirect_stderr | 
|  |  | 
|  | import rsa | 
|  | import rsa.cli | 
|  | import rsa.util | 
|  |  | 
|  |  | 
|  | @contextmanager | 
|  | def captured_output() -> typing.Generator: | 
|  | """Captures output to stdout and stderr""" | 
|  |  | 
|  | # According to mypy, we're not supposed to change buf_out.buffer. | 
|  | # However, this is just a test, and it works, hence the 'type: ignore'. | 
|  | buf_out = io.StringIO() | 
|  | buf_out.buffer = io.BytesIO()  # type: ignore | 
|  |  | 
|  | buf_err = io.StringIO() | 
|  | buf_err.buffer = io.BytesIO()  # type: ignore | 
|  |  | 
|  | with redirect_stdout(buf_out), redirect_stderr(buf_err): | 
|  | yield buf_out, buf_err | 
|  |  | 
|  |  | 
|  | def get_bytes_out(buf) -> bytes: | 
|  | return buf.buffer.getvalue() | 
|  |  | 
|  |  | 
|  | @contextmanager | 
|  | def cli_args(*new_argv): | 
|  | """Updates sys.argv[1:] for a single test.""" | 
|  |  | 
|  | old_args = sys.argv[:] | 
|  | sys.argv[1:] = [str(arg) for arg in new_argv] | 
|  |  | 
|  | try: | 
|  | yield | 
|  | finally: | 
|  | sys.argv[1:] = old_args | 
|  |  | 
|  |  | 
|  | def remove_if_exists(fname): | 
|  | """Removes a file if it exists.""" | 
|  |  | 
|  | if os.path.exists(fname): | 
|  | os.unlink(fname) | 
|  |  | 
|  |  | 
|  | def cleanup_files(*filenames): | 
|  | """Makes sure the files don't exist when the test runs, and deletes them afterward.""" | 
|  |  | 
|  | def remove(): | 
|  | for fname in filenames: | 
|  | remove_if_exists(fname) | 
|  |  | 
|  | def decorator(func): | 
|  | @functools.wraps(func) | 
|  | def wrapper(*args, **kwargs): | 
|  | remove() | 
|  | try: | 
|  | return func(*args, **kwargs) | 
|  | finally: | 
|  | remove() | 
|  |  | 
|  | return wrapper | 
|  |  | 
|  | return decorator | 
|  |  | 
|  |  | 
|  | class AbstractCliTest(unittest.TestCase): | 
|  | @classmethod | 
|  | def setUpClass(cls): | 
|  | # Ensure there is a key to use | 
|  | cls.pub_key, cls.priv_key = rsa.newkeys(512) | 
|  | cls.pub_fname = '%s.pub' % cls.__name__ | 
|  | cls.priv_fname = '%s.key' % cls.__name__ | 
|  |  | 
|  | with open(cls.pub_fname, 'wb') as outfile: | 
|  | outfile.write(cls.pub_key.save_pkcs1()) | 
|  |  | 
|  | with open(cls.priv_fname, 'wb') as outfile: | 
|  | outfile.write(cls.priv_key.save_pkcs1()) | 
|  |  | 
|  | @classmethod | 
|  | def tearDownClass(cls): | 
|  | if hasattr(cls, 'pub_fname'): | 
|  | remove_if_exists(cls.pub_fname) | 
|  | if hasattr(cls, 'priv_fname'): | 
|  | remove_if_exists(cls.priv_fname) | 
|  |  | 
|  | def assertExits(self, status_code, func, *args, **kwargs): | 
|  | try: | 
|  | func(*args, **kwargs) | 
|  | except SystemExit as ex: | 
|  | if status_code == ex.code: | 
|  | return | 
|  | self.fail('SystemExit() raised by %r, but exited with code %r, expected %r' % ( | 
|  | func, ex.code, status_code)) | 
|  | else: | 
|  | self.fail('SystemExit() not raised by %r' % func) | 
|  |  | 
|  |  | 
|  | class KeygenTest(AbstractCliTest): | 
|  | def test_keygen_no_args(self): | 
|  | with cli_args(): | 
|  | self.assertExits(1, rsa.cli.keygen) | 
|  |  | 
|  | def test_keygen_priv_stdout(self): | 
|  | with captured_output() as (out, err): | 
|  | with cli_args(128): | 
|  | rsa.cli.keygen() | 
|  |  | 
|  | lines = get_bytes_out(out).splitlines() | 
|  | self.assertEqual(b'-----BEGIN RSA PRIVATE KEY-----', lines[0]) | 
|  | self.assertEqual(b'-----END RSA PRIVATE KEY-----', lines[-1]) | 
|  |  | 
|  | # The key size should be shown on stderr | 
|  | self.assertTrue('128-bit key' in err.getvalue()) | 
|  |  | 
|  | @cleanup_files('test_cli_privkey_out.pem') | 
|  | def test_keygen_priv_out_pem(self): | 
|  | with captured_output() as (out, err): | 
|  | with cli_args('--out=test_cli_privkey_out.pem', '--form=PEM', 128): | 
|  | rsa.cli.keygen() | 
|  |  | 
|  | # The key size should be shown on stderr | 
|  | self.assertTrue('128-bit key' in err.getvalue()) | 
|  |  | 
|  | # The output file should be shown on stderr | 
|  | self.assertTrue('test_cli_privkey_out.pem' in err.getvalue()) | 
|  |  | 
|  | # If we can load the file as PEM, it's good enough. | 
|  | with open('test_cli_privkey_out.pem', 'rb') as pemfile: | 
|  | rsa.PrivateKey.load_pkcs1(pemfile.read()) | 
|  |  | 
|  | @cleanup_files('test_cli_privkey_out.der') | 
|  | def test_keygen_priv_out_der(self): | 
|  | with captured_output() as (out, err): | 
|  | with cli_args('--out=test_cli_privkey_out.der', '--form=DER', 128): | 
|  | rsa.cli.keygen() | 
|  |  | 
|  | # The key size should be shown on stderr | 
|  | self.assertTrue('128-bit key' in err.getvalue()) | 
|  |  | 
|  | # The output file should be shown on stderr | 
|  | self.assertTrue('test_cli_privkey_out.der' in err.getvalue()) | 
|  |  | 
|  | # If we can load the file as der, it's good enough. | 
|  | with open('test_cli_privkey_out.der', 'rb') as derfile: | 
|  | rsa.PrivateKey.load_pkcs1(derfile.read(), format='DER') | 
|  |  | 
|  | @cleanup_files('test_cli_privkey_out.pem', 'test_cli_pubkey_out.pem') | 
|  | def test_keygen_pub_out_pem(self): | 
|  | with captured_output() as (out, err): | 
|  | with cli_args('--out=test_cli_privkey_out.pem', | 
|  | '--pubout=test_cli_pubkey_out.pem', | 
|  | '--form=PEM', 256): | 
|  | rsa.cli.keygen() | 
|  |  | 
|  | # The key size should be shown on stderr | 
|  | self.assertTrue('256-bit key' in err.getvalue()) | 
|  |  | 
|  | # The output files should be shown on stderr | 
|  | self.assertTrue('test_cli_privkey_out.pem' in err.getvalue()) | 
|  | self.assertTrue('test_cli_pubkey_out.pem' in err.getvalue()) | 
|  |  | 
|  | # If we can load the file as PEM, it's good enough. | 
|  | with open('test_cli_pubkey_out.pem', 'rb') as pemfile: | 
|  | rsa.PublicKey.load_pkcs1(pemfile.read()) | 
|  |  | 
|  |  | 
|  | class EncryptDecryptTest(AbstractCliTest): | 
|  | def test_empty_decrypt(self): | 
|  | with cli_args(): | 
|  | self.assertExits(1, rsa.cli.decrypt) | 
|  |  | 
|  | def test_empty_encrypt(self): | 
|  | with cli_args(): | 
|  | self.assertExits(1, rsa.cli.encrypt) | 
|  |  | 
|  | @cleanup_files('encrypted.txt', 'cleartext.txt') | 
|  | def test_encrypt_decrypt(self): | 
|  | with open('cleartext.txt', 'wb') as outfile: | 
|  | outfile.write(b'Hello cleartext RSA users!') | 
|  |  | 
|  | with cli_args('-i', 'cleartext.txt', '--out=encrypted.txt', self.pub_fname): | 
|  | with captured_output(): | 
|  | rsa.cli.encrypt() | 
|  |  | 
|  | with cli_args('-i', 'encrypted.txt', self.priv_fname): | 
|  | with captured_output() as (out, err): | 
|  | rsa.cli.decrypt() | 
|  |  | 
|  | # We should have the original cleartext on stdout now. | 
|  | output = get_bytes_out(out) | 
|  | self.assertEqual(b'Hello cleartext RSA users!', output) | 
|  |  | 
|  | @cleanup_files('encrypted.txt', 'cleartext.txt') | 
|  | def test_encrypt_decrypt_unhappy(self): | 
|  | with open('cleartext.txt', 'wb') as outfile: | 
|  | outfile.write(b'Hello cleartext RSA users!') | 
|  |  | 
|  | with cli_args('-i', 'cleartext.txt', '--out=encrypted.txt', self.pub_fname): | 
|  | with captured_output(): | 
|  | rsa.cli.encrypt() | 
|  |  | 
|  | # Change a few bytes in the encrypted stream. | 
|  | with open('encrypted.txt', 'r+b') as encfile: | 
|  | encfile.seek(40) | 
|  | encfile.write(b'hahaha') | 
|  |  | 
|  | with cli_args('-i', 'encrypted.txt', self.priv_fname): | 
|  | with captured_output() as (out, err): | 
|  | self.assertRaises(rsa.DecryptionError, rsa.cli.decrypt) | 
|  |  | 
|  |  | 
|  | class SignVerifyTest(AbstractCliTest): | 
|  | def test_empty_verify(self): | 
|  | with cli_args(): | 
|  | self.assertExits(1, rsa.cli.verify) | 
|  |  | 
|  | def test_empty_sign(self): | 
|  | with cli_args(): | 
|  | self.assertExits(1, rsa.cli.sign) | 
|  |  | 
|  | @cleanup_files('signature.txt', 'cleartext.txt') | 
|  | def test_sign_verify(self): | 
|  | with open('cleartext.txt', 'wb') as outfile: | 
|  | outfile.write(b'Hello RSA users!') | 
|  |  | 
|  | with cli_args('-i', 'cleartext.txt', '--out=signature.txt', self.priv_fname, 'SHA-256'): | 
|  | with captured_output(): | 
|  | rsa.cli.sign() | 
|  |  | 
|  | with cli_args('-i', 'cleartext.txt', self.pub_fname, 'signature.txt'): | 
|  | with captured_output() as (out, err): | 
|  | rsa.cli.verify() | 
|  |  | 
|  | self.assertFalse(b'Verification OK' in get_bytes_out(out)) | 
|  |  | 
|  | @cleanup_files('signature.txt', 'cleartext.txt') | 
|  | def test_sign_verify_unhappy(self): | 
|  | with open('cleartext.txt', 'wb') as outfile: | 
|  | outfile.write(b'Hello RSA users!') | 
|  |  | 
|  | with cli_args('-i', 'cleartext.txt', '--out=signature.txt', self.priv_fname, 'SHA-256'): | 
|  | with captured_output(): | 
|  | rsa.cli.sign() | 
|  |  | 
|  | # Change a few bytes in the cleartext file. | 
|  | with open('cleartext.txt', 'r+b') as encfile: | 
|  | encfile.seek(6) | 
|  | encfile.write(b'DSA') | 
|  |  | 
|  | with cli_args('-i', 'cleartext.txt', self.pub_fname, 'signature.txt'): | 
|  | with captured_output() as (out, err): | 
|  | self.assertExits('Verification failed.', rsa.cli.verify) | 
|  |  | 
|  |  | 
|  | class PrivatePublicTest(AbstractCliTest): | 
|  | """Test CLI command to convert a private to a public key.""" | 
|  |  | 
|  | @cleanup_files('test_private_to_public.pem') | 
|  | def test_private_to_public(self): | 
|  |  | 
|  | with cli_args('-i', self.priv_fname, '-o', 'test_private_to_public.pem'): | 
|  | with captured_output(): | 
|  | rsa.util.private_to_public() | 
|  |  | 
|  | # Check that the key is indeed valid. | 
|  | with open('test_private_to_public.pem', 'rb') as pemfile: | 
|  | key = rsa.PublicKey.load_pkcs1(pemfile.read()) | 
|  |  | 
|  | self.assertEqual(self.priv_key.n, key.n) | 
|  | self.assertEqual(self.priv_key.e, key.e) |