blob: 2cce22115ea41bba3f1b8ce1e15d7f520fd27c06 [file] [log] [blame]
#!/usr/bin/env python
# Copyright (c) 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#
import json
from tests.unit import unittest
from tests.unit import AWSMockServiceTestCase
from mock import Mock
from boto.sns.connection import SNSConnection
QUEUE_POLICY = {
u'Policy':
(u'{"Version":"2008-10-17","Id":"arn:aws:sqs:us-east-1:'
'idnum:testqueuepolicy/SQSDefaultPolicy","Statement":'
'[{"Sid":"sidnum","Effect":"Allow","Principal":{"AWS":"*"},'
'"Action":"SQS:GetQueueUrl","Resource":'
'"arn:aws:sqs:us-east-1:idnum:testqueuepolicy"}]}')}
class TestSNSConnection(AWSMockServiceTestCase):
connection_class = SNSConnection
def setUp(self):
super(TestSNSConnection, self).setUp()
def default_body(self):
return b"{}"
def test_sqs_with_existing_policy(self):
self.set_http_response(status_code=200)
queue = Mock()
queue.get_attributes.return_value = QUEUE_POLICY
queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename'
self.service_connection.subscribe_sqs_queue('topic_arn', queue)
self.assert_request_parameters({
'Action': 'Subscribe',
'ContentType': 'JSON',
'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename',
'Protocol': 'sqs',
'TopicArn': 'topic_arn',
'Version': '2010-03-31',
}, ignore_params_values=[])
# Verify that the queue policy was properly updated.
actual_policy = json.loads(queue.set_attribute.call_args[0][1])
self.assertEqual(actual_policy['Version'], '2008-10-17')
# A new statement should be appended to the end of the statement list.
self.assertEqual(len(actual_policy['Statement']), 2)
self.assertEqual(actual_policy['Statement'][1]['Action'],
'SQS:SendMessage')
def test_sqs_with_no_previous_policy(self):
self.set_http_response(status_code=200)
queue = Mock()
queue.get_attributes.return_value = {}
queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename'
self.service_connection.subscribe_sqs_queue('topic_arn', queue)
self.assert_request_parameters({
'Action': 'Subscribe',
'ContentType': 'JSON',
'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename',
'Protocol': 'sqs',
'TopicArn': 'topic_arn',
'Version': '2010-03-31',
}, ignore_params_values=[])
actual_policy = json.loads(queue.set_attribute.call_args[0][1])
# Only a single statement should be part of the policy.
self.assertEqual(len(actual_policy['Statement']), 1)
def test_publish_with_positional_args(self):
self.set_http_response(status_code=200)
self.service_connection.publish('topic', 'message', 'subject')
self.assert_request_parameters({
'Action': 'Publish',
'TopicArn': 'topic',
'Subject': 'subject',
'Message': 'message',
}, ignore_params_values=['Version', 'ContentType'])
def test_publish_with_kwargs(self):
self.set_http_response(status_code=200)
self.service_connection.publish(topic='topic',
message='message',
subject='subject')
self.assert_request_parameters({
'Action': 'Publish',
'TopicArn': 'topic',
'Subject': 'subject',
'Message': 'message',
}, ignore_params_values=['Version', 'ContentType'])
def test_publish_with_target_arn(self):
self.set_http_response(status_code=200)
self.service_connection.publish(target_arn='target_arn',
message='message',
subject='subject')
self.assert_request_parameters({
'Action': 'Publish',
'TargetArn': 'target_arn',
'Subject': 'subject',
'Message': 'message',
}, ignore_params_values=['Version', 'ContentType'])
def test_create_platform_application(self):
self.set_http_response(status_code=200)
self.service_connection.create_platform_application(
name='MyApp',
platform='APNS',
attributes={
'PlatformPrincipal': 'a ssl certificate',
'PlatformCredential': 'a private key'
}
)
self.assert_request_parameters({
'Action': 'CreatePlatformApplication',
'Name': 'MyApp',
'Platform': 'APNS',
'Attributes.entry.1.key': 'PlatformCredential',
'Attributes.entry.1.value': 'a private key',
'Attributes.entry.2.key': 'PlatformPrincipal',
'Attributes.entry.2.value': 'a ssl certificate',
}, ignore_params_values=['Version', 'ContentType'])
def test_set_platform_application_attributes(self):
self.set_http_response(status_code=200)
self.service_connection.set_platform_application_attributes(
platform_application_arn='arn:myapp',
attributes={'PlatformPrincipal': 'a ssl certificate',
'PlatformCredential': 'a private key'})
self.assert_request_parameters({
'Action': 'SetPlatformApplicationAttributes',
'PlatformApplicationArn': 'arn:myapp',
'Attributes.entry.1.key': 'PlatformCredential',
'Attributes.entry.1.value': 'a private key',
'Attributes.entry.2.key': 'PlatformPrincipal',
'Attributes.entry.2.value': 'a ssl certificate',
}, ignore_params_values=['Version', 'ContentType'])
def test_create_platform_endpoint(self):
self.set_http_response(status_code=200)
self.service_connection.create_platform_endpoint(
platform_application_arn='arn:myapp',
token='abcde12345',
custom_user_data='john',
attributes={'Enabled': False})
self.assert_request_parameters({
'Action': 'CreatePlatformEndpoint',
'PlatformApplicationArn': 'arn:myapp',
'Token': 'abcde12345',
'CustomUserData': 'john',
'Attributes.entry.1.key': 'Enabled',
'Attributes.entry.1.value': False,
}, ignore_params_values=['Version', 'ContentType'])
def test_set_endpoint_attributes(self):
self.set_http_response(status_code=200)
self.service_connection.set_endpoint_attributes(
endpoint_arn='arn:myendpoint',
attributes={'CustomUserData': 'john',
'Enabled': False})
self.assert_request_parameters({
'Action': 'SetEndpointAttributes',
'EndpointArn': 'arn:myendpoint',
'Attributes.entry.1.key': 'CustomUserData',
'Attributes.entry.1.value': 'john',
'Attributes.entry.2.key': 'Enabled',
'Attributes.entry.2.value': False,
}, ignore_params_values=['Version', 'ContentType'])
def test_message_is_required(self):
self.set_http_response(status_code=200)
with self.assertRaises(TypeError):
self.service_connection.publish(topic='topic', subject='subject')
def test_publish_with_json(self):
self.set_http_response(status_code=200)
self.service_connection.publish(
message=json.dumps({
'default': 'Ignored.',
'GCM': {
'data': 'goes here',
}
}),
message_structure='json',
subject='subject',
target_arn='target_arn'
)
self.assert_request_parameters({
'Action': 'Publish',
'TargetArn': 'target_arn',
'Subject': 'subject',
'MessageStructure': 'json',
}, ignore_params_values=['Version', 'ContentType', 'Message'])
self.assertDictEqual(
json.loads(self.actual_request.params["Message"]),
{"default": "Ignored.", "GCM": {"data": "goes here"}})
def test_publish_with_utf8_message(self):
self.set_http_response(status_code=200)
subject = message = u'We \u2665 utf-8'.encode('utf-8')
self.service_connection.publish('topic', message, subject)
self.assert_request_parameters({
'Action': 'Publish',
'TopicArn': 'topic',
'Subject': subject,
'Message': message,
}, ignore_params_values=['Version', 'ContentType'])
def test_publish_with_attributes(self):
self.set_http_response(status_code=200)
self.service_connection.publish(
message=json.dumps({
'default': 'Ignored.',
'GCM': {
'data': 'goes here',
}
}, sort_keys=True),
message_structure='json',
subject='subject',
target_arn='target_arn',
message_attributes={
'name1': {
'data_type': 'Number',
'string_value': '42'
},
'name2': {
'data_type': 'String',
'string_value': 'Bob'
},
},
)
self.assert_request_parameters({
'Action': 'Publish',
'TargetArn': 'target_arn',
'Subject': 'subject',
'Message': '{"GCM": {"data": "goes here"}, "default": "Ignored."}',
'MessageStructure': 'json',
'MessageAttributes.entry.1.Name': 'name1',
'MessageAttributes.entry.1.Value.DataType': 'Number',
'MessageAttributes.entry.1.Value.StringValue': '42',
'MessageAttributes.entry.2.Name': 'name2',
'MessageAttributes.entry.2.Value.DataType': 'String',
'MessageAttributes.entry.2.Value.StringValue': 'Bob',
}, ignore_params_values=['Version', 'ContentType'])
if __name__ == '__main__':
unittest.main()