blob: 490618e1785d835cc0b6c9b8aabe3eefe0bf0980 [file] [log] [blame]
# Copyright (c) 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
import unittest
import time
from nose.plugins.attrib import attr
from boto.redshift.layer1 import RedshiftConnection
from boto.redshift.exceptions import ClusterNotFoundFault
from boto.redshift.exceptions import ResizeNotFoundFault
class TestRedshiftLayer1Management(unittest.TestCase):
redshift = True
def setUp(self):
self.api = RedshiftConnection()
self.cluster_prefix = 'boto-redshift-cluster-%s'
self.node_type = 'dw.hs1.xlarge'
self.master_username = 'mrtest'
self.master_password = 'P4ssword'
self.db_name = 'simon'
# Redshift was taking ~20 minutes to bring clusters up in testing.
self.wait_time = 60 * 20
def cluster_id(self):
# This need to be unique per-test method.
return self.cluster_prefix % str(int(time.time()))
def create_cluster(self):
cluster_id = self.cluster_id()
self.api.create_cluster(
cluster_id, self.node_type,
self.master_username, self.master_password,
db_name=self.db_name, number_of_nodes=3
)
# Wait for it to come up.
time.sleep(self.wait_time)
self.addCleanup(self.delete_cluster_the_slow_way, cluster_id)
return cluster_id
def delete_cluster_the_slow_way(self, cluster_id):
# Because there might be other operations in progress. :(
time.sleep(self.wait_time)
self.api.delete_cluster(cluster_id, skip_final_cluster_snapshot=True)
@attr('notdefault')
def test_create_delete_cluster(self):
cluster_id = self.cluster_id()
self.api.create_cluster(
cluster_id, self.node_type,
self.master_username, self.master_password,
db_name=self.db_name, number_of_nodes=3
)
# Wait for it to come up.
time.sleep(self.wait_time)
self.api.delete_cluster(cluster_id, skip_final_cluster_snapshot=True)
@attr('notdefault')
def test_as_much_as_possible_before_teardown(self):
# Per @garnaat, for the sake of suite time, we'll test as much as we
# can before we teardown.
# Test a non-existent cluster ID.
with self.assertRaises(ClusterNotFoundFault):
self.api.describe_clusters('badpipelineid')
# Now create the cluster & move on.
cluster_id = self.create_cluster()
# Test never resized.
with self.assertRaises(ResizeNotFoundFault):
self.api.describe_resize(cluster_id)
# The cluster shows up in describe_clusters
clusters = self.api.describe_clusters()['DescribeClustersResponse']\
['DescribeClustersResult']\
['Clusters']
cluster_ids = [c['ClusterIdentifier'] for c in clusters]
self.assertIn(cluster_id, cluster_ids)
# The cluster shows up in describe_clusters w/ id
response = self.api.describe_clusters(cluster_id)
self.assertEqual(response['DescribeClustersResponse']\
['DescribeClustersResult']['Clusters'][0]\
['ClusterIdentifier'], cluster_id)
snapshot_id = "snap-%s" % cluster_id
# Test creating a snapshot.
response = self.api.create_cluster_snapshot(snapshot_id, cluster_id)
self.assertEqual(response['CreateClusterSnapshotResponse']\
['CreateClusterSnapshotResult']['Snapshot']\
['SnapshotIdentifier'], snapshot_id)
self.assertEqual(response['CreateClusterSnapshotResponse']\
['CreateClusterSnapshotResult']['Snapshot']\
['Status'], 'creating')
self.addCleanup(self.api.delete_cluster_snapshot, snapshot_id)
# More waiting. :(
time.sleep(self.wait_time)
# Describe the snapshots.
response = self.api.describe_cluster_snapshots(
cluster_identifier=cluster_id
)
snap = response['DescribeClusterSnapshotsResponse']\
['DescribeClusterSnapshotsResult']['Snapshots'][-1]
self.assertEqual(snap['SnapshotType'], 'manual')
self.assertEqual(snap['DBName'], self.db_name)