blob: f768d2234aeab282278f2e8e43e7212e27d36f8c [file] [edit]
#!/usr/bin/env python
import sys
import os
import shutil
import tempfile
import unittest
if sys.platform != 'win32':
from pexpect import pxssh
from .PexpectTestCase import PexpectTestCase
class SSHTestBase(PexpectTestCase):
def setUp(self):
super(SSHTestBase, self).setUp()
self.tempdir = tempfile.mkdtemp()
self.orig_path = os.environ.get('PATH')
os.symlink(self.PYTHONBIN, os.path.join(self.tempdir, 'python'))
fakessh_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'fakessh'))
os.environ['PATH'] = self.tempdir + os.pathsep + fakessh_dir + \
((os.pathsep + self.orig_path) if self.orig_path else '')
def tearDown(self):
shutil.rmtree(self.tempdir)
if self.orig_path:
os.environ['PATH'] = self.orig_path
else:
del os.environ['PATH']
class PxsshTestCase(SSHTestBase):
def test_fake_ssh(self):
ssh = pxssh.pxssh()
#ssh.logfile_read = sys.stdout # DEBUG
ssh.login('server', 'me', password='s3cret')
ssh.sendline('ping')
ssh.expect('pong', timeout=10)
assert ssh.prompt(timeout=10)
ssh.logout()
def test_wrong_pw(self):
ssh = pxssh.pxssh()
try:
ssh.login('server', 'me', password='wr0ng')
except pxssh.ExceptionPxssh:
pass
else:
assert False, 'Password should have been refused'
def test_failed_set_unique_prompt(self):
ssh = pxssh.pxssh()
ssh.set_unique_prompt = lambda: False
try:
ssh.login('server', 'me', password='s3cret',
auto_prompt_reset=True)
except pxssh.ExceptionPxssh:
pass
else:
assert False, 'should have raised exception, pxssh.ExceptionPxssh'
def test_connection_refused(self):
ssh = pxssh.pxssh()
try:
ssh.login('noserver', 'me', password='s3cret')
except pxssh.ExceptionPxssh:
pass
else:
assert False, 'should have raised exception, pxssh.ExceptionPxssh'
def test_ssh_tunnel_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
tunnels = { 'local': ['2424:localhost:22'],'remote': ['2525:localhost:22'],
'dynamic': [8888] }
confirmation_strings = 0
confirmation_array = ['-R 2525:localhost:22','-L 2424:localhost:22','-D 8888']
string = ssh.login('server', 'me', password='s3cret', ssh_tunnels=tunnels)
for confirmation in confirmation_array:
if confirmation in string:
confirmation_strings+=1
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated from tunneling is incorrect.'
def test_remote_ssh_tunnel_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
tunnels = { 'local': ['2424:localhost:22'],'remote': ['2525:localhost:22'],
'dynamic': [8888] }
confirmation_strings = 0
confirmation_array = ['-R 2525:localhost:22','-L 2424:localhost:22','-D 8888']
string = ssh.login('server', 'me', password='s3cret', ssh_tunnels=tunnels, spawn_local_ssh=False)
for confirmation in confirmation_array:
if confirmation in string:
confirmation_strings+=1
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated from remote tunneling is incorrect.'
def test_ssh_config_passing_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
temp_file = tempfile.NamedTemporaryFile()
config_path = temp_file.name
string = ssh.login('server', 'me', password='s3cret', spawn_local_ssh=False, ssh_config=config_path)
if not '-F '+config_path in string:
assert False, 'String generated from SSH config passing is incorrect.'
def test_username_or_ssh_config(self):
try:
ssh = pxssh.pxssh(debug_command_string=True)
temp_file = tempfile.NamedTemporaryFile()
config_path = temp_file.name
string = ssh.login('server')
raise AssertionError('Should have failed due to missing username and missing ssh_config.')
except TypeError:
pass
def test_ssh_config_user(self):
ssh = pxssh.pxssh(debug_command_string=True)
temp_file = tempfile.NamedTemporaryFile()
config_path = temp_file.name
temp_file.write(b'HosT server\n'
b'UsEr me\n'
b'hOSt not-server\n')
temp_file.seek(0)
string = ssh.login('server', ssh_config=config_path)
def test_ssh_config_no_username_empty_config(self):
ssh = pxssh.pxssh(debug_command_string=True)
temp_file = tempfile.NamedTemporaryFile()
config_path = temp_file.name
try:
string = ssh.login('server', ssh_config=config_path)
raise AssertionError('Should have failed due to no Host.')
except TypeError:
pass
def test_ssh_config_wrong_Host(self):
ssh = pxssh.pxssh(debug_command_string=True)
temp_file = tempfile.NamedTemporaryFile()
config_path = temp_file.name
temp_file.write(b'Host not-server\n'
b'Host also-not-server\n')
temp_file.seek(0)
try:
string = ssh.login('server', ssh_config=config_path)
raise AssertionError('Should have failed due to no matching Host.')
except TypeError:
pass
def test_ssh_config_no_user(self):
ssh = pxssh.pxssh(debug_command_string=True)
temp_file = tempfile.NamedTemporaryFile()
config_path = temp_file.name
temp_file.write(b'Host server\n'
b'Host not-server\n')
temp_file.seek(0)
try:
string = ssh.login('server', ssh_config=config_path)
raise AssertionError('Should have failed due to no user.')
except TypeError:
pass
def test_ssh_config_empty_user(self):
ssh = pxssh.pxssh(debug_command_string=True)
temp_file = tempfile.NamedTemporaryFile()
config_path = temp_file.name
temp_file.write(b'Host server\n'
b'user \n'
b'Host not-server\n')
temp_file.seek(0)
try:
string = ssh.login('server', ssh_config=config_path)
raise AssertionError('Should have failed due to empty user.')
except TypeError:
pass
def test_ssh_key_string(self):
ssh = pxssh.pxssh(debug_command_string=True)
confirmation_strings = 0
confirmation_array = [' -A']
string = ssh.login('server', 'me', password='s3cret', ssh_key=True)
for confirmation in confirmation_array:
if confirmation in string:
confirmation_strings+=1
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated from forcing the SSH agent sock is incorrect.'
confirmation_strings = 0
temp_file = tempfile.NamedTemporaryFile()
ssh_key = temp_file.name
confirmation_array = [' -i '+ssh_key]
string = ssh.login('server', 'me', password='s3cret', ssh_key=ssh_key)
for confirmation in confirmation_array:
if confirmation in string:
confirmation_strings+=1
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated from adding an SSH key is incorrect.'
def test_custom_ssh_cmd_debug(self):
ssh = pxssh.pxssh(debug_command_string=True)
cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ 'aes256-cbc,arcfour'
confirmation_strings = 0
confirmation_array = [cipher_string, '-2']
string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
for confirmation in confirmation_array:
if confirmation in string:
confirmation_strings+=1
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated for custom ssh client command is incorrect.'
def test_custom_ssh_cmd_debug(self):
ssh = pxssh.pxssh(debug_command_string=True)
cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ 'aes256-cbc,arcfour'
confirmation_strings = 0
confirmation_array = [cipher_string, '-2']
string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
for confirmation in confirmation_array:
if confirmation in string:
confirmation_strings+=1
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated for custom ssh client command is incorrect.'
def test_failed_custom_ssh_cmd_debug(self):
ssh = pxssh.pxssh(debug_command_string=True)
cipher_string = '-c invalid_cipher'
confirmation_strings = 0
confirmation_array = [cipher_string, '-2']
string = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
for confirmation in confirmation_array:
if confirmation in string:
confirmation_strings+=1
if confirmation_strings!=len(confirmation_array):
assert False, 'String generated for custom ssh client command is incorrect.'
def test_custom_ssh_cmd(self):
try:
ssh = pxssh.pxssh()
cipher_string = '-c aes128-ctr,aes192-ctr,aes256-ctr,arcfour256,arcfour128,' \
+ 'aes128-cbc,3des-cbc,blowfish-cbc,cast128-cbc,aes192-cbc,' \
+ 'aes256-cbc,arcfour'
result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
ssh.PROMPT = r'Closed connection'
ssh.sendline('exit')
ssh.prompt(timeout=5)
string = str(ssh.before) + str(ssh.after)
if 'Closed connection' not in string:
assert False, 'should have logged into Mock SSH client and exited'
except pxssh.ExceptionPxssh as e:
assert False, 'should not have raised exception, pxssh.ExceptionPxssh'
else:
pass
def test_failed_custom_ssh_cmd(self):
try:
ssh = pxssh.pxssh()
cipher_string = '-c invalid_cipher'
result = ssh.login('server', 'me', password='s3cret', cmd='ssh ' + cipher_string + ' -2')
ssh.PROMPT = r'Closed connection'
ssh.sendline('exit')
ssh.prompt(timeout=5)
string = str(ssh.before) + str(ssh.after)
if 'Closed connection' not in string:
assert False, 'should not have completed logging into Mock SSH client and exited'
except pxssh.ExceptionPxssh as e:
pass
else:
assert False, 'should have raised exception, pxssh.ExceptionPxssh'
def test_login_bash(self):
ssh = pxssh.pxssh()
result = ssh.login('server bash', 'me', password='s3cret')
ssh.sendline('ping')
ssh.expect('pong', timeout=10)
assert ssh.prompt(timeout=10)
ssh.logout()
def test_login_zsh(self):
ssh = pxssh.pxssh()
result = ssh.login('server zsh', 'me', password='s3cret')
ssh.sendline('ping')
ssh.expect('pong', timeout=10)
assert ssh.prompt(timeout=10)
ssh.logout()
def test_login_tcsh(self):
ssh = pxssh.pxssh()
result = ssh.login('server tcsh', 'me', password='s3cret')
ssh.sendline('ping')
ssh.expect('pong', timeout=10)
assert ssh.prompt(timeout=10)
ssh.logout()
if __name__ == '__main__':
unittest.main()