blob: 1096690b24807f289657977d56a5ae4c509ff23d [file] [log] [blame]
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the commander module."""
import datetime
import hamcrest
import mock
import webtest
from tradefed_cluster import command_event_test_util
from tradefed_cluster import command_manager
from tradefed_cluster import command_monitor
from tradefed_cluster import command_task_store
from tradefed_cluster import commander
from tradefed_cluster import common
from tradefed_cluster import datastore_entities
from tradefed_cluster import datastore_test_util
from tradefed_cluster import env_config
from tradefed_cluster import metric
from tradefed_cluster import request_manager
from tradefed_cluster import testbed_dependent_test
from tradefed_cluster.plugins import base as plugin_base
from tradefed_cluster.util import command_util
import unittest
REQUEST_ID = "1"
COMMAND_ID = 5629499534213120
TIMESTAMP = datetime.datetime(2017, 3, 8)
class CommanderTest(testbed_dependent_test.TestbedDependentTest):
def setUp(self):
super(CommanderTest, self).setUp()
datastore_test_util.CreateRequest(
user="user1",
command_infos=[
datastore_entities.CommandInfo(
command_line="command_line",
cluster="cluster",
run_target="run_target")
],
request_id=REQUEST_ID)
@mock.patch.object(command_monitor, "Monitor")
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequest(self, plugin, schedule_tasks, monitor):
request_id1 = "1001"
request_id2 = "1002"
datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line=(
"command_line0 --run-target run_target --cluster cluster"),
cluster="cluster",
run_target="run_target")
],
request_id=request_id1,
plugin_data={"ants_invocation_id": "i123",
"ants_work_unit_id": "w123"})
datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line=(
"command_line0 --run-target run_target --cluster cluster"),
cluster="cluster",
run_target="run_target")
],
request_id=request_id2,
priority=100,
queue_timeout_seconds=86400,
affinity_tag="affinity_tag")
commander._ProcessRequest(request_id1)
commander._ProcessRequest(request_id2)
self.assertEqual(2, schedule_tasks.call_count)
self.assertEqual(2, monitor.call_count)
commands_0 = request_manager.GetCommands(request_id1)
self.assertEqual(1, len(commands_0))
command = commands_0[0]
self.assertEqual("command_line0", command.command_line)
self.assertEqual("run_target", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
self.assertIsNone(command.affinity_tag)
commands_1 = request_manager.GetCommands(request_id2)
self.assertEqual(1, len(commands_1))
command = commands_1[0]
self.assertEqual("command_line0", command.command_line)
self.assertEqual("run_target", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
self.assertEqual(100, command.priority)
self.assertEqual(86400, command.queue_timeout_seconds)
self.assertEqual("affinity_tag", command.affinity_tag)
monitor.assert_has_calls([mock.call(commands_0), mock.call(commands_1)])
plugin.assert_has_calls([
mock.call.OnCreateCommands([
plugin_base.CommandInfo(
command_id=5629499534213120,
command_line="command_line0",
run_count=1,
shard_count=1,
shard_index=0)
], {
"ants_invocation_id": "i123",
"ants_work_unit_id": "w123"
}, {}),
mock.call.OnCreateCommands([
plugin_base.CommandInfo(
command_id=5066549580791808,
command_line="command_line0",
run_count=1,
shard_count=1,
shard_index=0)
], None, {}),
])
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(request_manager, "CancelRequest")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequest_invalidRequest(self,
plugin,
cancel_request,
schedule_tasks):
request_id1 = "1001"
datastore_test_util.CreateRequest(
request_id=request_id1,
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line="command_line0",
cluster="cluster",
run_target=None)
])
commander._ProcessRequest(request_id1)
self.assertFalse(schedule_tasks.called)
commands = request_manager.GetCommands(request_id1)
self.assertEqual(0, len(commands))
cancel_request.assert_called_once_with(
request_id1,
common.CancelReason.INVALID_REQUEST)
plugin.assert_has_calls([])
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequests_shardedRequests(self, plugin, schedule_tasks):
"""Tests processing of sharded requests."""
request_id = "1001"
datastore_test_util.CreateRequest(
request_id=request_id,
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line="command_line0",
cluster="cluster",
run_target="bullhead",
shard_count=3)
])
commander._ProcessRequest(request_id)
commands = request_manager.GetCommands(request_id)
self.assertEqual(3, len(commands))
shards = []
for command in commands:
command_line = command_util.CommandLine(command.command_line)
shards.append(command_line.GetOption("--shard-index"))
command_line.RemoveOptions(["--shard-index"])
self.assertEqual("command_line0 --shard-count 3",
command_line.ToTFString())
self.assertEqual("bullhead", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
self.assertCountEqual(["0", "1", "2"], shards)
plugin.assert_has_calls([])
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequests_RequestlocalSharding(self, plugin, schedule_tasks):
"""Tests processing of sharded requests with local sharding."""
request_id = "1001"
datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line="command_line0 --shard-count 3",
cluster="cluster",
run_target="bullhead")
],
request_id=request_id)
commander._ProcessRequest(request_id)
commands = request_manager.GetCommands(request_id)
self.assertEqual(1, len(commands))
for command in commands:
command_line = command_util.CommandLine(command.command_line)
command_line.RemoveOptions(["--shard-index"])
self.assertEqual("command_line0 --shard-count 3",
command_line.ToTFString())
self.assertEqual("bullhead", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
plugin.assert_has_calls([])
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequests_shardedRequests_oneShard(self,
plugin,
schedule_tasks):
"""Tests processing of sharded requests with a single shard."""
request_id = "1001"
datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line="command_line0",
cluster="cluster",
run_target="bullhead",
shard_count=1)
],
request_id=request_id)
commander._ProcessRequest(request_id)
commands = request_manager.GetCommands(request_id)
self.assertEqual(1, len(commands))
command = commands[0]
command_line = command_util.CommandLine(command.command_line)
command_line.RemoveOptions(["--shard-index"])
# If only one shard is specified, the --shard-count option is removed.
self.assertEqual("command_line0", command_line.ToTFString())
self.assertEqual("bullhead", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
plugin.assert_has_calls([])
@mock.patch.object(request_manager, "CancelRequest")
def testProcessRequests_missingRunTarget(self, cancel_request):
request_id = "1001"
datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line="command_line0 --shard-count 1",
cluster="cluster",
run_target=None)
],
request_id=request_id)
commander._ProcessRequest(request_id)
cancel_request.assert_called_once_with(
request_id, common.CancelReason.INVALID_REQUEST)
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequest_escapeInCommandLine(self, plugin, schedule_tasks):
request_id1 = "1001"
datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line=(
"command_line0"' --arg \'option=\'"\'"\'value\'"\'"\'\''),
cluster="cluster",
run_target="run_target")
],
request_id=request_id1)
commander._ProcessRequest(request_id1)
self.assertEqual(1, schedule_tasks.call_count)
commands = request_manager.GetCommands(request_id1)
self.assertEqual(1, len(commands))
command = commands[0]
self.assertEqual(
"command_line0 --arg option='value'",
command.command_line)
self.assertEqual("run_target", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
plugin.assert_has_calls([])
@mock.patch.object(command_monitor, "Monitor")
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequest_withMultipleCommands(
self, plugin, schedule_tasks, monitor):
request_id = "1001"
command_infos = [
datastore_entities.CommandInfo(
command_line="command_line %04d" % i,
cluster="cluster %04d" % i,
run_target="run_target %04d" % i,
run_count=1,
shard_count=1)
for i in range(500)
]
request = datastore_test_util.CreateRequest(
request_id=request_id,
user="user",
command_infos=command_infos,
max_concurrent_tasks=100,
plugin_data={
"FOO": "foo",
"BAR": "'bar",
})
commander._ProcessRequest(request_id)
commands = request_manager.GetCommands(request_id)
commands.sort(key=lambda x: x.command_line)
self.assertEqual(len(command_infos), len(commands))
for command_info, command in zip(command_infos, commands):
self.assertEqual(command_info.command_line, command.command_line)
self.assertEqual(command_info.cluster, command.cluster)
self.assertEqual(command_info.run_target, command.run_target)
self.assertEqual(command_info.run_count, command.run_count)
self.assertIsNone(command.shard_count)
plugin.OnCreateCommands.assert_has_calls([
mock.call([
plugin_base.CommandInfo(
command_id=int(command.key.id()),
command_line=command.command_line,
run_count=command.run_count,
shard_count=1,
shard_index=0)
for command in commands
], request.plugin_data, {}),
])
schedule_tasks.assert_called_once_with(
commands[:request.max_concurrent_tasks], update_request_state=False)
monitor.assert_called_once_with(
commands[:request.max_concurrent_tasks])
@mock.patch.object(command_monitor, "Monitor")
@mock.patch.object(command_manager, "ScheduleTasks")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessRequest_withTestBench(self, plugin, schedule_tasks, monitor):
request_id = "1001"
test_bench = datastore_test_util.CreateTestBench("cluster", "run_target")
datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line=(
"command_line0"),
cluster="cluster",
run_target="run_target",
test_bench=test_bench)
],
request_id=request_id,
plugin_data={"ants_invocation_id": "i123",
"ants_work_unit_id": "w123"})
commander._ProcessRequest(request_id)
self.assertEqual(1, schedule_tasks.call_count)
self.assertEqual(1, monitor.call_count)
commands = request_manager.GetCommands(request_id)
self.assertEqual(1, len(commands))
command = commands[0]
self.assertEqual("command_line0", command.command_line)
self.assertEqual("run_target", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
self.assertEqual(test_bench, command.test_bench)
def _CreateCommand(
self, request_id=REQUEST_ID, run_count=1, priority=None,
command_line="command_line1"):
"""Helper to create a command."""
command = command_manager.CreateCommands(
request_id=request_id,
command_infos=[
datastore_entities.CommandInfo(
command_line=command_line,
cluster="cluster",
run_target="run_target",
run_count=run_count,
shard_count=1),
],
priority=priority,
shard_indexes=[0],
request_plugin_data={
"ants_invocation_id": "i123",
"command_ants_work_unit_id": "w123"
})[0]
return command
@mock.patch.object(metric, "RecordCommandAttemptMetric")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessCommandEvent_notUpdated(
self, plugin, attempt_metric):
"""Test ProcessCommandEvent with no update."""
command = self._CreateCommand()
command_manager.ScheduleTasks([command])
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
command_task_store.LeaseTask(tasks[0].task_id)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.QUEUED, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.QUEUED, request.state)
_, request_id, _, command_id = command.key.flat()
event = command_event_test_util.CreateTestCommandEvent(
request_id, command_id, "attempt0",
common.InvocationEventType.INVOCATION_STARTED, time=TIMESTAMP)
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
self.assertEqual(tasks[0].leasable, False)
self.assertEqual(tasks[0].attempt_index, 0)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.QUEUED, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.QUEUED, request.state)
attempt_metric.assert_not_called()
plugin.assert_has_calls([
mock.call.OnCreateCommands([
plugin_base.CommandInfo(
command_id=COMMAND_ID,
command_line="command_line1",
run_count=1,
shard_count=1,
shard_index=0)
], {
"ants_invocation_id": "i123",
"command_ants_work_unit_id": "w123"
}, {}),
])
@mock.patch.object(metric, "RecordCommandAttemptMetric")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessCommandEvent_notUpdatedButFinal(self, plugin, attempt_metric):
command = self._CreateCommand()
command_manager.ScheduleTasks([command])
_, request_id, _, command_id = command.key.flat()
# Setup to ensure we are properly in the error state
for i in range(command_manager.MAX_ERROR_COUNT_BASE):
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
command_task_store.LeaseTask(tasks[0].task_id)
attempt = command_event_test_util.CreateCommandAttempt(
command, str(i), common.CommandState.RUNNING, task=tasks[0])
event = command_event_test_util.CreateTestCommandEvent(
request_id, command_id, str(i),
common.InvocationEventType.INVOCATION_COMPLETED,
error="error", task=tasks[0], time=TIMESTAMP)
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 0)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.ERROR, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.ERROR, request.state)
attempt_metric.reset_mock()
plugin.reset_mock()
# Process last event again to ensure that we don't update
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 0)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.ERROR, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.ERROR, request.state)
attempt_metric.assert_called_once_with(
cluster_id=command.cluster,
run_target=command.run_target,
state="ERROR")
plugin.assert_has_calls([
mock.call.OnProcessCommandEvent(
command,
hamcrest.match_equality(
hamcrest.all_of(
hamcrest.has_property("command_id", attempt.command_id),
hamcrest.has_property("attempt_id", attempt.attempt_id),
hamcrest.has_property("task_id", attempt.task_id),
)),
event_data={
"summary": "summary",
"total_test_count": 1000,
"failed_test_count": 100,
"passed_test_count": 900,
"failed_test_run_count": 10,
"device_lost_detected": 0,
"error": "error"
}),
])
@mock.patch.object(metric, "RecordCommandAttemptMetric")
def testProcessCommandEvent_nonFinal_reschedule(self, attempt_metric):
# Test ProcessCommandEvent for a non-final state with rescheduling
command = self._CreateCommand()
command_manager.ScheduleTasks([command])
_, request_id, _, command_id = command.key.flat()
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
command_task_store.LeaseTask(tasks[0].task_id)
command_event_test_util.CreateCommandAttempt(
command, "attempt0", common.CommandState.UNKNOWN, task=tasks[0])
event = command_event_test_util.CreateTestCommandEvent(
request_id, command_id, "attempt0",
common.InvocationEventType.INVOCATION_COMPLETED,
error="error", task=tasks[0], time=TIMESTAMP)
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
self.assertEqual(tasks[0].leasable, True)
self.assertEqual(tasks[0].attempt_index, 1)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.QUEUED, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.QUEUED, request.state)
attempt_metric.assert_called_once_with(
cluster_id=command.cluster,
run_target=command.run_target,
state="ERROR")
@mock.patch.object(metric, "RecordCommandAttemptMetric")
def testProcessCommandEvent_nonFinal_delete(self, attempt_metric):
# Test ProcessCommandEvent for a non-final state with deletion
command = self._CreateCommand(run_count=2)
command_manager.ScheduleTasks([command])
_, request_id, _, command_id = command.key.flat()
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 2)
command_task_store.LeaseTask(tasks[0].task_id)
command_event_test_util.CreateCommandAttempt(
command, "attempt0", common.CommandState.UNKNOWN, task=tasks[0])
event = command_event_test_util.CreateTestCommandEvent(
request_id, command_id, "attempt0",
common.InvocationEventType.INVOCATION_COMPLETED,
task=tasks[0], time=TIMESTAMP)
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
self.assertEqual(tasks[0].task_id, "%s-%s-1" % (request_id, command_id))
self.assertEqual(tasks[0].leasable, True)
self.assertEqual(tasks[0].run_index, 1)
self.assertEqual(tasks[0].attempt_index, 0)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.QUEUED, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.QUEUED, request.state)
attempt_metric.assert_called_once_with(
cluster_id=command.cluster,
run_target=command.run_target,
state="COMPLETED")
@mock.patch.object(metric, "RecordCommandAttemptMetric")
@mock.patch.object(env_config.CONFIG, "plugin")
def testProcessCommandEvent_final(self, plugin, attempt_metric):
# Test ProcessCommandEvent for a final state
command = self._CreateCommand()
command_manager.ScheduleTasks([command])
_, request_id, _, command_id = command.key.flat()
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
command_task_store.LeaseTask(tasks[0].task_id)
attempt = command_event_test_util.CreateCommandAttempt(
command, "attempt0", common.CommandState.UNKNOWN, task=tasks[0])
event = command_event_test_util.CreateTestCommandEvent(
request_id, command_id, "attempt0",
common.InvocationEventType.INVOCATION_COMPLETED,
task=tasks[0], time=TIMESTAMP)
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 0)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.COMPLETED, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.COMPLETED, request.state)
attempt_metric.assert_called_once_with(
cluster_id=command.cluster,
run_target=command.run_target,
state="COMPLETED")
plugin.assert_has_calls([
mock.call.OnCreateCommands([
plugin_base.CommandInfo(
command_id=COMMAND_ID,
command_line="command_line1",
run_count=1,
shard_count=1,
shard_index=0)
], {
"ants_invocation_id": "i123",
"command_ants_work_unit_id": "w123"
}, {}),
mock.call.OnProcessCommandEvent(
command,
hamcrest.match_equality(
hamcrest.all_of(
hamcrest.has_property("command_id", attempt.command_id),
hamcrest.has_property("attempt_id", attempt.attempt_id),
hamcrest.has_property("task_id", attempt.task_id),
)),
event_data={
"total_test_count": 1000,
"device_lost_detected": 0,
"failed_test_run_count": 10,
"passed_test_count": 900,
"failed_test_count": 100,
"summary": "summary"
}),
])
@mock.patch.object(command_monitor, "Monitor")
@mock.patch.object(metric, "RecordCommandAttemptMetric")
def testProcessCommandEvent_pendingCommands(
self, attempt_metric, monitor):
# Test ProcessCommandEvent for a non-final state with deletion
request_id = "1001"
command_infos = [
datastore_entities.CommandInfo(
command_line="command_line %04d" % i,
cluster="cluster %04d" % i,
run_target="run_target %04d" % i,
run_count=1,
shard_count=1)
for i in range(10)
]
request = datastore_test_util.CreateRequest(
request_id=request_id,
user="user",
command_infos=command_infos,
max_concurrent_tasks=5,
plugin_data={
"FOO": "foo",
"BAR": "'bar",
})
commands = command_manager.CreateCommands(
request_id=request_id,
command_infos=command_infos,
priority=request.priority,
shard_indexes=[0] * len(command_infos))
command_manager.ScheduleTasks(commands[:5])
_, request_id, _, command_id = commands[0].key.flat()
pending_commands = command_manager.GetCommands(
request_id, common.CommandState.UNKNOWN)
self.assertEqual(5, len(pending_commands))
queued_commands = command_manager.GetCommands(
request_id, common.CommandState.QUEUED)
self.assertEqual(5, len(queued_commands))
tasks = command_manager.GetActiveTasks(commands[0])
self.assertEqual(1, len(tasks))
command_task_store.LeaseTask(tasks[0].task_id)
command_event_test_util.CreateCommandAttempt(
commands[0], "attempt0", common.CommandState.UNKNOWN, task=tasks[0])
event = command_event_test_util.CreateTestCommandEvent(
request_id, command_id, "attempt0",
common.InvocationEventType.INVOCATION_COMPLETED,
task=tasks[0], time=TIMESTAMP)
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(commands[0])
self.assertEqual(0, len(tasks))
command = commands[0].key.get(use_cache=False)
self.assertEqual(common.CommandState.COMPLETED, command.state)
attempt_metric.assert_called_once_with(
cluster_id=command.cluster,
run_target=command.run_target,
state="COMPLETED")
next_command = pending_commands[0]
monitor.assert_called_once_with([next_command])
next_command = pending_commands[0].key.get(use_cache=False)
self.assertEqual(common.CommandState.QUEUED, next_command.state)
pending_commands = command_manager.GetCommands(
request_id, common.CommandState.UNKNOWN)
self.assertEqual(4, len(pending_commands))
queued_commands = command_manager.GetCommands(
request_id, common.CommandState.QUEUED)
self.assertEqual(5, len(queued_commands))
completed_commands = command_manager.GetCommands(
request_id, common.CommandState.COMPLETED)
self.assertEqual(1, len(completed_commands))
@mock.patch.object(metric, "RecordCommandAttemptMetric")
def testProcessCommandEvent_differentAttemptId(self, attempt_metric):
# Test ProcessCommandEvent for an event that does not match the task attempt
command = self._CreateCommand()
command_manager.ScheduleTasks([command])
_, request_id, _, command_id = command.key.flat()
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
tasks[0].attempt_id = "attempt1" # different attempt
tasks[0].put()
command_task_store.LeaseTask(tasks[0].task_id)
command_event_test_util.CreateCommandAttempt(
command, "attempt0", common.CommandState.UNKNOWN, task=tasks[0])
event = command_event_test_util.CreateTestCommandEvent(
request_id,
command_id,
"attempt0",
common.InvocationEventType.INVOCATION_COMPLETED,
error="error",
task=tasks[0],
time=TIMESTAMP)
commander.ProcessCommandEvent(event)
tasks = command_manager.GetActiveTasks(command)
self.assertEqual(len(tasks), 1)
# Task should not be rescheduled
self.assertEqual(tasks[0].leasable, False)
self.assertEqual(tasks[0].attempt_index, 0)
command = command.key.get(use_cache=False)
self.assertEqual(common.CommandState.QUEUED, command.state)
request = command.key.parent().get(use_cache=False)
self.assertEqual(common.RequestState.QUEUED, request.state)
attempt_metric.assert_called_once_with(
cluster_id=command.cluster,
run_target=command.run_target,
state="ERROR")
@mock.patch.object(command_monitor, "Monitor")
@mock.patch.object(command_manager, "ScheduleTasks")
def testCheckPendingCommands(self, schedule_tasks, monitor):
request_id = "1001"
command_infos = [
datastore_entities.CommandInfo(
command_line="command_line %04d" % i,
cluster="cluster %04d" % i,
run_target="run_target %04d" % i,
run_count=1,
shard_count=1)
for i in range(10)
]
request = datastore_test_util.CreateRequest(
request_id=request_id,
user="user",
command_infos=command_infos,
max_concurrent_tasks=5,
plugin_data={
"FOO": "foo",
"BAR": "'bar",
})
command_manager.CreateCommands(
request_id=request_id,
command_infos=command_infos,
priority=request.priority,
shard_indexes=[0] * len(command_infos))
commands = command_manager.GetCommands(request_id)
for i, command in enumerate(commands):
if i < 2:
command.state = common.CommandState.COMPLETED
elif i < 5:
command.state = common.CommandState.QUEUED
else:
command.state = common.CommandState.UNKNOWN
command.put()
request_summary = request_manager.RequestSummary()
request_summary.completed_count = 2
request_summary.queued_count = 3
request_summary.pending_count = 5
commander._CheckPendingCommands(request, request_summary)
schedule_tasks.assert_called_once_with(commands[5:7])
monitor.assert_called_once_with(commands[5:7])
@mock.patch.object(command_monitor, "Monitor")
@mock.patch.object(command_manager, "ScheduleTasks")
def testCheckPendingCommands_canceledRequest(
self, schedule_tasks, monitor):
request_id = "1001"
command_infos = [
datastore_entities.CommandInfo(
command_line="command_line %04d" % i,
cluster="cluster %04d" % i,
run_target="run_target %04d" % i,
run_count=1,
shard_count=1)
for i in range(10)
]
request = datastore_test_util.CreateRequest(
request_id=request_id,
user="user",
command_infos=command_infos,
max_concurrent_tasks=5,
plugin_data={
"FOO": "foo",
"BAR": "'bar",
})
command_manager.CreateCommands(
request_id=request_id,
command_infos=command_infos,
priority=request.priority,
shard_indexes=[0] * len(command_infos))
request.state = common.RequestState.CANCELED
request.put()
commands = command_manager.GetCommands(request_id)
for i, command in enumerate(commands):
if i < 2:
command.state = common.CommandState.COMPLETED
elif i < 5:
command.state = common.CommandState.QUEUED
else:
command.state = common.CommandState.UNKNOWN
command.put()
request_summary = request_manager.RequestSummary()
request_summary.completed_count = 2
request_summary.queued_count = 3
request_summary.pending_count = 5
commander._CheckPendingCommands(request, request_summary)
schedule_tasks.assert_not_called()
monitor.assert_not_called()
class HandleRequestTest(testbed_dependent_test.TestbedDependentTest):
def setUp(self):
super(HandleRequestTest, self).setUp()
self.testapp = webtest.TestApp(commander.APP)
self.plugin_patcher = mock.patch(
"__main__.env_config.CONFIG.plugin")
self.plugin_patcher.start()
def tearDown(self):
super(HandleRequestTest, self).tearDown()
self.plugin_patcher.stop()
@mock.patch.object(command_manager, "ScheduleTasks")
def testPost(self, schedule_tasks):
request_id = "request_id"
request = datastore_test_util.CreateRequest(
user="user",
command_infos=[
datastore_entities.CommandInfo(
command_line="command_line0",
cluster="cluster",
run_target="bullhead")
],
request_id=request_id,
plugin_data={"ants_invocation_id": "i123", "ants_work_unit_id": "w123"})
request_manager.AddToQueue(request)
tasks = self.mock_task_scheduler.GetTasks()
self.assertEqual(len(tasks), 1)
self.testapp.post(commander.REQUEST_HANDLER_PATH, tasks[0].payload)
commands = request_manager.GetCommands(request_id)
self.assertEqual(1, len(commands))
command = commands[0]
self.assertEqual("command_line0",
command.command_line)
self.assertEqual("bullhead", command.run_target)
self.assertEqual(1, command.run_count)
self.assertEqual("cluster", command.cluster)
self.assertIsNone(command.priority)
self.assertIsNone(command.queue_timeout_seconds)
if __name__ == "__main__":
unittest.main()