blob: 5e3cb6c04cc45f570c478ae37dbb4e2dc0ffbbba [file] [log] [blame]
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2015, ARM Limited and contributors.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import fileinput
import json
import os
import re
from time import sleep
import logging
class Workload(object):
"""
Base class for workload specifications
To use this class, you'll need to instantiate it, then call :meth:`conf` on
the instance. The details of that method are specific to each subclass.
:param target: Devlib target to run workload on. May be None, in which case
an RT-App configuration file can be generated but the
workload cannot be run, and calibration features will be
missing.
:param name: Human-readable name for the workload.
:param calibration: CPU calibration specification. Can be obtained from
:meth:`RTA.calibration`.
"""
def __init__(self,
target,
name):
# Target device confguration
self.target = target
# Specific class of this workload
self.wtype = None
# Name of this orkload
self.name = name
# The dictionary of tasks descriptors generated by this workload
self.tasks = {}
# The cpus on which the workload will be executed
self.cpus = None
# The cgroup on which the workload will be executed
# NOTE: requires cgroups to be properly configured and associated
# tools deployed on the target
self.cgroup = None
# The command to execute a workload (defined by a derived class)
self.command = None
# The workload executor, to be defined by a subclass
self.executor = None
# Output messages generated by commands executed on the device
self.output = {}
# Derived clasess callback methods
self.steps = {
'postrun': None,
}
# Task specific configuration parameters
self.duration = None
self.run_dir = None
self.exc_id = None
# Setup kind specific parameters
self.kind = None
# Scheduler class configuration
self.sched = None
# Map of task/s parameters
self.params = {}
# Setup logging
self._log = logging.getLogger('Workload')
self._log.info('Setup new workload %s', self.name)
def __callback(self, step, **kwords):
if step not in self.steps.keys():
raise ValueError('Callbacks for [%s] step not supported', step)
if self.steps[step] is None:
return
self._log.debug('Callback [%s]...', step)
self.steps[step](kwords)
def setCallback(self, step, func):
"""
Add a callback to be called during an execution stage.
Intended for use by subclasses. Only one callback can exist for each
stage. Available callback stages are:
"postrun"
Called after the workload has finished executing, unless it's being
run in the background. Receives a ``params`` dictionary with
``params["destdir"]`` set to the host directory to store workload
output in.
:param step: Name of the step at which to call the callback.
:param func: Callback function.
"""
self._log.debug('Setup step [%s] callback to [%s] function',
step, func.__name__)
self.steps[step] = func
def getCpusMask(self, cpus=None):
mask = 0x0
for cpu in (cpus or self.target.list_online_cpus()):
mask |= (1 << cpu)
self._log.debug('CPUs mask for %s: 0x%X', cpus, mask)
return mask
def conf(self,
kind,
params,
duration,
cpus=None,
sched={'policy': 'OTHER'},
run_dir=None,
exc_id=0):
"""Configure workload. See documentation for subclasses"""
self.cpus = cpus
self.sched = sched
self.duration = duration
self.run_dir = run_dir
self.exc_id = exc_id
# Setup kind specific parameters
self.kind = kind
# Map of task/s parameters
self.params = {}
# Initialize run folder
if self.run_dir is None:
self.run_dir = self.target.working_directory
self.target.execute('mkdir -p {}'.format(self.run_dir))
# Configure a profile workload
if kind == 'profile':
self._log.debug('Configuring a profile-based workload...')
self.params['profile'] = params
# Configure a custom workload
elif kind == 'custom':
self._log.debug('Configuring custom workload...')
self.params['custom'] = params
else:
self._log.error('%s is not a supported RTApp workload kind', kind)
raise ValueError('RTApp workload kind not supported')
def run(self,
ftrace=None,
cgroup=None,
cpus=None,
background=False,
out_dir='./',
as_root=False,
start_pause_s=None,
end_pause_s=None):
"""
This method starts the execution of the workload. If the user provides
an ftrace object, the method will also collect a trace.
:param ftrace: FTrace object to collect a trace. If using
:class:`TestEnv`, you can use :attr:`TestEnv.ftrace` for this.
:type ftrace: :mod:`devlib.trace.FTraceCollector`
:param cgroup: specifies the cgroup name in which the workload has to
run
:type cgroup: str
:param cpus: the CPUs on which to run the workload.
.. note:: if specified it overrides the CPUs specified at
configuration time
:type cpus: list(int)
:param background: run the workload in background. In this case the
method will not return a result. When used with
ftrace it is up to the caller to stop trace
collection
:type background: bool
:param out_dir: output directory where to store the collected trace or
other workload report (if any)
:type out_dir: str
:param as_root: run the workload as root on the target
:type as_root: bool
:param start_pause_s: time to wait before executing the workload in
seconds. If ftrace is provided, trace collection
is started before waiting.
:type start_pause_s: float
:param end_pause_s: time to wait after executing the workload in
seconds. If ftrace is provided, trace collection is
stopped after this wait time.
:type end_pause_s: float
"""
self.cgroup = cgroup
# Compose the actual execution command starting from the base command
# defined by the base class
_command = self.command
if not _command:
self._log.error('Error: empty executor command')
# Prepend eventually required taskset command
if cpus or self.cpus:
cpus_mask = self.getCpusMask(cpus if cpus else self.cpus)
taskset_cmd = '{}/taskset 0x{:X}'\
.format(self.target.executables_directory,
cpus_mask)
_command = '{} {}'\
.format(taskset_cmd, _command)
if self.cgroup:
if hasattr(self.target, 'cgroups'):
_command = self.target.cgroups.run_into_cmd(self.cgroup,
_command)
else:
raise ValueError('To run workload in a cgroup, add "cgroups" '
'devlib module to target/test configuration')
# Start FTrace (if required)
if ftrace:
ftrace.start()
# Wait `start_pause` seconds before running the workload
if start_pause_s:
self._log.info('Waiting %f seconds before starting workload execution',
start_pause_s)
sleep(start_pause_s)
# Start task in background if required
if background:
self._log.debug('WlGen [background]: %s', _command)
self.target.background(_command, as_root=as_root)
self.output['executor'] = ''
# Start task in foreground
else:
self._log.info('Workload execution START:')
self._log.info(' %s', _command)
# Run command and wait for it to complete
results = self.target.execute(_command, as_root=as_root)
self.output['executor'] = results
# Wait `end_pause` seconds before stopping ftrace
if end_pause_s:
self._log.info('Waiting %f seconds before stopping trace collection',
end_pause_s)
sleep(end_pause_s)
# Stop FTrace (if required)
ftrace_dat = None
if ftrace:
ftrace.stop()
ftrace_dat = out_dir + '/' + self.test_label + '.dat'
dirname = os.path.dirname(ftrace_dat)
if not os.path.exists(dirname):
self._log.debug('Create ftrace results folder [%s]',
dirname)
os.makedirs(dirname)
self._log.info('Pulling trace file into [%s]...', ftrace_dat)
ftrace.get_trace(ftrace_dat)
if not background:
self.__callback('postrun', destdir=out_dir)
self._log.debug('Workload execution COMPLETED')
return ftrace_dat
def getOutput(self, step='executor'):
return self.output[step]
def listAll(self, kill=False):
# Show all the instances for the current executor
tasks = self.target.run('ps | grep {0:s}'.format(self.executor))
for task in tasks:
task = task.split()
self._log.info('%5s: %s (%s)', task[1], task[8], task[0])
if kill:
self.target.run('kill -9 {0:s}'.format(task[1]))
def killAll(self):
if self.executor is None:
return
self._log.info('Killing all [%s] instances:', self.executor)
self.listAll(True)