blob: 0ef0a0d15b29d31cb0b1fe8ea3f3c1866ca5d5be [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import unittest
import webapp2
import webtest
from dashboard import shrink_timestamp_revisions
from dashboard import testing_common
from dashboard import utils
from dashboard.models import anomaly
from dashboard.models import graph_data
class ShrinkTimestampRevisionsTest(testing_common.TestCase):
def setUp(self):
super(ShrinkTimestampRevisionsTest, self).setUp()
app = webapp2.WSGIApplication(
[('/shrink_timestamp_revisions',
shrink_timestamp_revisions.ShrinkTimestampRevisionsHandler)])
self.testapp = webtest.TestApp(app)
def testConvertTimestamp(self):
convert = shrink_timestamp_revisions._ConvertTimestamp
self.assertEqual(-5139, convert(1320001000)) # Before 2013
self.assertEqual(417, convert(1360001000)) # In Feb 2013
self.assertEqual(5972, convert(1400001000)) # In 2014
self.assertEqual(5972, convert(1400001010)) # 10 seconds later
self.assertEqual(12917, convert(1450001010)) # In 2015
self.assertEqual(19861, convert(1500001010)) # In 2017
def testPost_NoParameters_ReportsError(self):
response = self.testapp.post('/shrink_timestamp_revisions', status=500)
self.assertEqual('Missing ancestor parameter.\n', response.body)
def testGet_SameAsPost(self):
response1 = self.testapp.get('/shrink_timestamp_revisions', status=500)
response2 = self.testapp.post('/shrink_timestamp_revisions', status=500)
self.assertEqual(response1.body, response2.body)
def testPost_WithAncestor_AllRowsMoved(self):
testing_common.AddTests(
['M'], ['b1', 'b2'], {'foo': {'bar': {}, 'baz': {}}})
for test_path in ('M/b1/foo/bar', 'M/b1/foo/baz', 'M/b2/foo/bar'):
# range(1425001000, 1430001000, 6000) includes 834 numbers.
testing_common.AddRows(
test_path,
{i for i in range(1425001000, 1430001000, 6000)})
self.testapp.post(
'/shrink_timestamp_revisions', {'ancestor': 'M/b1'})
self.ExecuteTaskQueueTasks(
'/shrink_timestamp_revisions', shrink_timestamp_revisions._QUEUE_NAME)
b1_bar_rows = graph_data.Row.query(
graph_data.Row.parent_test == utils.TestKey('M/b1/foo/bar')).fetch()
b1_baz_rows = graph_data.Row.query(
graph_data.Row.parent_test == utils.TestKey('M/b1/foo/baz')).fetch()
b2_bar_rows = graph_data.Row.query(
graph_data.Row.parent_test == utils.TestKey('M/b2/foo/bar')).fetch()
self.assertGreater(len(b1_bar_rows), 600)
self.assertGreater(len(b1_baz_rows), 600)
self.assertEqual(834, len(b2_bar_rows))
for r in b1_bar_rows:
self.assertLess(r.revision, 300000)
for r in b1_baz_rows:
self.assertLess(r.revision, 300000)
for r in b2_bar_rows:
self.assertGreater(r.revision, 300000)
def testGet_WithAncestor_AllAlertsUpdated(self):
testing_common.AddTests(
['M'], ['b1', 'b2'], {'foo': {'bar': {}, 'baz': {}}})
testing_common.AddRows(
'M/b1/foo/bar',
{i for i in range(1431001000, 1432001000, 6000)})
test_key = utils.TestKey('M/b1/foo/bar')
# range(1431001000, 1431081000, 6000) includes 14 numbers.
for i in range(1431001000, 1431081000, 6000):
anomaly.Anomaly(
start_revision=i, end_revision=i+12000, test=test_key,
median_before_anomaly=100, median_after_anomaly=200).put()
self.testapp.post(
'/shrink_timestamp_revisions', {'ancestor': 'M'})
self.ExecuteTaskQueueTasks(
'/shrink_timestamp_revisions', shrink_timestamp_revisions._QUEUE_NAME)
anomalies = anomaly.Anomaly.query().fetch()
self.assertEqual(14, len(anomalies))
for a in anomalies:
self.assertLess(a.start_revision, 300000)
self.assertLess(a.end_revision, 300000)
if __name__ == '__main__':
unittest.main()