blob: f7a7ccde7046cf5abe8d5abcd06623ba3bf34edc [file] [log] [blame]
/*
* Copyright 2015 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cherry
import (
"encoding/xml"
"log"
"../rtdb"
"strings"
"time"
"fmt"
"mime/multipart"
"unicode/utf8"
"strconv"
"bufio"
"bytes"
"os"
"regexp"
)
// Util
func btoi (v bool) int {
if v {
return 1
} else {
return 0
}
}
// TestPackageInfo
// \todo [petri] get rid of binaryName, binaryDir if/when all modules in same executable?
type TestPackageInfo struct {
name string
binaryName string
binaryDir string
testCaseTree *TestCaseTree
testCaseList []string // linearized test case list
}
// Descriptor for load list.
type TestPackageDescriptor struct {
packageName string
binaryName string
binaryDir string
testCaseFileName string
}
// TestRunner
type TestRunner struct {
rtdbServer *rtdb.Server
// \todo [petri] these should be dynamic, not loaded at init time!
testPackages map[string]TestPackageInfo
fullTestCaseList []string
// Control channel for batch executions, read in runner.handleQueue().
queueControl chan<- batchExecQueueControl
// Control channel for batch imports, read in runner.handleImports().
importControl chan<- batchImportControl
}
type testCaseSummary struct {
caseType TestCaseType
status TestStatusCode
result string
}
func parseTestCaseType (caseTypeString string) TestCaseType {
switch caseTypeString {
case string(TEST_CASE_TYPE_SELF_VALIDATE): return TEST_CASE_TYPE_SELF_VALIDATE
case string(TEST_CASE_TYPE_PERFORMANCE): return TEST_CASE_TYPE_PERFORMANCE
case string(TEST_CASE_TYPE_ACCURACY): return TEST_CASE_TYPE_ACCURACY
case string(TEST_CASE_TYPE_CAPABILITY): return TEST_CASE_TYPE_CAPABILITY
default: return TestCaseType("")
}
}
func parseTestCaseSummary (logText string) testCaseSummary {
type XmlResult struct {
Value string `xml:",chardata"`
StatusCode string `xml:"StatusCode,attr"`
}
type xmlRoot struct {
XMLName xml.Name `xml:"TestCaseResult"`
Version string `xml:"Version,attr"`
CaseType string `xml:"CaseType,attr"`
Result XmlResult `xml:"Result"`
}
// Just for the sake of the XML parser, replace invalid UTF-8 with something valid.
logTextBytes := []byte(logText)
for ndx := 0; ndx < len(logText); {
r, size := utf8.DecodeRune(logTextBytes[ndx:])
if r == utf8.RuneError && size == 1 {
logTextBytes[ndx] = '?';
}
ndx += size
}
var root xmlRoot
err := xml.Unmarshal(logTextBytes, &root)
if err == nil {
return testCaseSummary {
caseType: parseTestCaseType(root.CaseType),
status: parseTestStatusCode(root.Result.StatusCode),
result: root.Result.Value,
}
}
log.Printf("[runner] Test log was not well-formed XML, fallback to manual parsing\n")
// Parsing XML failed. Maybe the generating process terminated abnormally and
// the resulting xml is not well-formed. Try to recover by using a regexp.
var regexPattern = `<TestCaseResult [^>]*CaseType="([^"]*)"[^>]*>` +
`(?s).*` +
`<Result StatusCode="([^"]*)">([^<]*)</Result>`
var resultParserRegex = regexp.MustCompile(regexPattern)
resultParserRegex.Longest();
matches := resultParserRegex.FindSubmatch(logTextBytes)
if matches != nil {
return testCaseSummary {
caseType: parseTestCaseType(string(matches[1])),
status: parseTestStatusCode(string(matches[2])),
result: string(matches[3]),
}
}
// Fallback failed too
log.Printf("[runner] fallback parsing failed\n")
return testCaseSummary{}
}
// Read the log of the test case result and return a test case header and result with appropriate case type
// and status fields.
func augmentTestCaseInfo (testCaseInfo EventTestCaseFinished) (retHeader TestCaseHeader, retResult TestCaseResult) {
summary := parseTestCaseSummary(testCaseInfo.log)
resultStatus := testCaseInfo.status
resultCaseType := summary.caseType
// If no status in testResult, must be in the xml.
if resultStatus == TestStatusCode("") {
if summary.status == TestStatusCode("") {
resultStatus = TEST_STATUS_CODE_INTERNAL_ERROR
} else {
resultStatus = summary.status
}
}
if resultCaseType == TestCaseType("") {
resultCaseType = TEST_CASE_TYPE_SELF_VALIDATE
}
retHeader = TestCaseHeader {
CaseType: resultCaseType,
Status: resultStatus,
Result: summary.result,
}
retResult = TestCaseResult {
Path: testCaseInfo.path,
Log: testCaseInfo.log,
}
return
}
func testStatusCodeStatsDelta (status TestStatusCode) (delta TestCaseTreeGroupStatusDelta) {
return TestCaseTreeGroupStatusDelta {
DeltaSuccess: btoi(status == TEST_STATUS_CODE_PASS),
DeltaFailure: btoi(status == TEST_STATUS_CODE_FAIL),
DeltaCrash: btoi(status == TEST_STATUS_CODE_CRASH),
DeltaTimeout: btoi(status == TEST_STATUS_CODE_TIMEOUT),
DeltaQualityWarning: btoi(status == TEST_STATUS_CODE_QUALITY_WARNING),
DeltaCompatibilityWarning: btoi(status == TEST_STATUS_CODE_COMPATIBILITY_WARNING),
DeltaNotSupported: btoi(status == TEST_STATUS_CODE_NOT_SUPPORTED),
DeltaResourceError: btoi(status == TEST_STATUS_CODE_RESOURCE_ERROR),
DeltaInternalError: btoi(status == TEST_STATUS_CODE_INTERNAL_ERROR),
}
}
func (runner *TestRunner) finishTestCase (batchResultId string, testCaseInfo EventTestCaseFinished) {
testHeader, testResult := augmentTestCaseInfo(testCaseInfo)
// Upload test case result to rtdb.
log.Printf("[runner] finishTestCase(%q, %s, %s)\n", testResult.Path, testHeader.Status, testHeader.Result)
opSet := rtdb.NewOpSet()
objId := batchResultId + "/" + testResult.Path
opSet.Call(typeTestCaseHeader, objId, "SetResult", testHeader.CaseType, testHeader.Status, testHeader.Result)
opSet.Call(typeTestCaseResult, objId, "SetLog", testResult.Log)
// Update all test case groups along the way to root.
parts := strings.Split(testResult.Path, ".")
for ndx := 0; ndx < len(parts); ndx++ {
groupPath := strings.Join(parts[0:ndx], ".")
statsDelta := testStatusCodeStatsDelta(testHeader.Status)
groupObjId := batchResultId + "/" + groupPath
opSet.Call(typeTestCaseTreeGroup, groupObjId, "UpdateStats", statsDelta)
}
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
func (runner *TestRunner) setTestCaseStatus (batchResultId string, casePath string, status TestStatusCode) {
objId := batchResultId + "/" + casePath
opSet := rtdb.NewOpSet()
opSet.Call(typeTestCaseHeader, objId, "SetStatus", status)
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
func intMin (a int, b int) int {
if a <= b {
return a
} else {
return b
}
}
func getNextTestCaseBatch (testCasePaths []string) (string, []string) {
// \note Only launch at most some constant amount of cases at a time.
// This is because transmitting the test case list (trie) can
// cause trouble if it's very big.
// \note All test cases must have same package.
maxBatchSize := intMin(1000, len(testCasePaths))
packageName := strings.Split(testCasePaths[0], ".")[0]
batchSize := 1
for ; batchSize < maxBatchSize; batchSize++ {
pkgName := strings.Split(testCasePaths[batchSize], ".")[0]
if pkgName != packageName {
break
}
}
return packageName, testCasePaths[0:batchSize]
}
// Execute a batch that has been initialized - i.e., the batch result and its test result objects have already been created and set to pending or other suitable status.
func (runner *TestRunner) executeBatch (batchResultId string, batchParams BatchExecParams, testCasePaths []string, stopRequest <-chan struct{}, executionLogAppend chan<- string) {
type BatchExecutionStatus int
const (
BATCH_EXEC_STATUS_RUNNING BatchExecutionStatus = iota
BATCH_EXEC_STATUS_DONE
BATCH_EXEC_STATUS_LINK_ERROR
BATCH_EXEC_STATUS_GENERIC_ERROR
)
// Start running batch result.
{
opSet := rtdb.NewOpSet()
opSet.Call(typeBatchResult, batchResultId, "SetStatus", BATCH_STATUS_CODE_RUNNING)
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
var appendExecutionLog = func(content string) {
executionLogAppend <- content
}
var appendTestInfoLog = func(content string) {
wrappedContent := "**** Test binary info log chunk begins ****\n"
wrappedContent += content
wrappedContent += "**** Test binary info log chunk ends ****\n"
appendExecutionLog(wrappedContent)
}
var appendCherryLogLine = func(content string) {
appendExecutionLog(content + "\n")
}
var appendRunnerLogLine = func(content string) {
log.Printf("[runner] %s\n", content)
appendCherryLogLine(content)
}
appendRunnerLogLine(fmt.Sprintf("Starting test batch execution at %v", time.Now().Format(defaultHumanReadableTimeFormat)))
var deviceConfig DeviceConfig
adbOk := true
runCanceled := false
{
err := runner.rtdbServer.GetObject(batchParams.DeviceId, &deviceConfig)
if err != nil { panic(err) }
}
if deviceConfig.IsADBDevice {
err := LaunchAndroidExecServer(deviceConfig.ADBSerialNumber, batchParams.TargetPort)
if err != nil {
appendRunnerLogLine(fmt.Sprintf("Failed to launch ExecServer on Android via ADB: %v", err))
adbOk = false
}
}
if adbOk {
// Processed cases (to avoid re-executing them).
processedCasePaths := make(map[string]bool)
// Spawn execution as long as more cases to handle.
appendRunnerLogLine(fmt.Sprintf("Execute %d tests...", len(testCasePaths)))
for len(testCasePaths) > 0 {
// Choose next batch to execute & encode case list trie.
packageName, launchCaseList := getNextTestCaseBatch(testCasePaths)
encodedCaseList := prefixEncode(launchCaseList)
appendRunnerLogLine(fmt.Sprintf("Launch %d cases from package '%s'...", len(launchCaseList), packageName))
testPackage := runner.testPackages[packageName]
didProgress := false
var executionStatus BatchExecutionStatus = BATCH_EXEC_STATUS_RUNNING
// Try a few times (in case of connection errors).
for tryNdx := 0; tryNdx < 3; tryNdx++ {
if tryNdx > 0 {
appendRunnerLogLine(fmt.Sprintf("Try again: %d", tryNdx))
time.Sleep((time.Duration)(tryNdx) * 500 * time.Millisecond)
}
// If previous error was link error, relaunch execserver just to be sure
if executionStatus == BATCH_EXEC_STATUS_LINK_ERROR && deviceConfig.IsADBDevice {
appendRunnerLogLine("Relaunching execserver")
err := RelaunchAndroidExecServer(deviceConfig.ADBSerialNumber, batchParams.TargetPort)
if err != nil {
appendRunnerLogLine(fmt.Sprintf("Failed to relaunch ExecServer on Android via ADB: %v", err))
continue // Just try again, if tries left
}
}
// Create link to target.
linkParams := CommLinkParams {
SpawnProcessPath: batchParams.SpawnLocalProcess,
TargetAddress: batchParams.TargetAddress,
TargetPort: batchParams.TargetPort,
}
link := NewCommLinkTcpIp(linkParams, appendCherryLogLine)
err := link.Start()
if err != nil {
appendRunnerLogLine(fmt.Sprintf("WARNING: failed to start link: %s", err))
continue // Just try again, if tries left
}
// Execute test case on target device.
execEventChan := make(chan TestExecutorEvent, 4)
linkStopRequest := make(chan struct{}, 1)
execParams := CommLinkExecParams {
binaryName: strings.Replace(batchParams.TestBinaryName, "${TestPackageName}", testPackage.binaryName, -1),
commandLine: batchParams.TestBinaryCommandLine,
workingDir: strings.Replace(batchParams.TestBinaryWorkingDir, "${TestPackageDir}", testPackage.binaryDir, -1),
testCasePaths: encodedCaseList,
}
err = link.Execute(execParams, execEventChan, linkStopRequest)
if err != nil {
appendRunnerLogLine(fmt.Sprintf("WARNING: connecting to target device failed: %s", err))
link.Stop()
continue
}
currentlyRunningCases := make(map[string]bool) // Paths of the test cases currently running.
// Handle all events from comm link, as well as stop requests.
executionStatus = BATCH_EXEC_STATUS_RUNNING
for executionStatus == BATCH_EXEC_STATUS_RUNNING {
select {
case <-stopRequest:
runCanceled = true
appendRunnerLogLine("Got stop request")
select {
case linkStopRequest <- struct{}{}:
appendRunnerLogLine("Sent stop request to comm link")
default:
appendRunnerLogLine("Stop request already sent to comm link")
}
case event := <-execEventChan:
switch event.(type) {
case EventSessionInfoRead:
appendRunnerLogLine("Session info received")
opSet := rtdb.NewOpSet()
opSet.Call(typeBatchResult, batchResultId, "SetSessionInfo", event.(EventSessionInfoRead).sessionInfo)
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
case EventInfoLogRead:
logContent := event.(EventInfoLogRead).infoLog
appendTestInfoLog(logContent)
case EventTestCaseStarted:
testCasePath := event.(EventTestCaseStarted).testCasePath
if _, isAlreadyProcessed := processedCasePaths[testCasePath]; isAlreadyProcessed {
appendRunnerLogLine(fmt.Sprintf("WARNING: got EventTestCaseStarted for already-processed test case '%s'; ignoring", testCasePath))
} else {
runner.setTestCaseStatus(batchResultId, testCasePath, TEST_STATUS_CODE_RUNNING)
currentlyRunningCases[testCasePath] = true
}
case EventTestCaseFinished:
testCaseInfo := event.(EventTestCaseFinished)
path := testCaseInfo.path
if _, isCurrentlyRunning := currentlyRunningCases[path]; !isCurrentlyRunning {
if _, isAlreadyProcessed := processedCasePaths[path]; !isAlreadyProcessed {
appendRunnerLogLine(fmt.Sprintf("WARNING: got EventTestCaseFinished for test case '%s' that isn't running; ignoring", path))
}
} else {
delete(currentlyRunningCases, path)
processedCasePaths[path] = true
runner.finishTestCase(batchResultId, testCaseInfo) // upload to rtdb
}
case EventProcessStarted:
appendRunnerLogLine("Test process started")
case EventProcessLaunchFailed:
launchFailed := event.(EventProcessLaunchFailed)
appendRunnerLogLine(fmt.Sprintf("Process launch failed: %s", launchFailed.reason))
executionStatus = BATCH_EXEC_STATUS_GENERIC_ERROR
case EventExecutionFinished:
appendRunnerLogLine(fmt.Sprintf("Test execution finished with status %#v", event.(EventExecutionFinished).status))
switch (event.(EventExecutionFinished).status) {
case EXEC_STATUS_DONE:
executionStatus = BATCH_EXEC_STATUS_DONE
case EXEC_STATUS_LINK_ERROR:
executionStatus = BATCH_EXEC_STATUS_LINK_ERROR
case EXEC_STATUS_TIMEOUT:
executionStatus = BATCH_EXEC_STATUS_GENERIC_ERROR
default:
appendRunnerLogLine(fmt.Sprintf("WARNING: unknown end status received: %#v", event.(EventExecutionFinished).status))
executionStatus = BATCH_EXEC_STATUS_GENERIC_ERROR
}
default:
appendRunnerLogLine(fmt.Sprintf("WARNING: unknown execute event received: %#v", event))
}
}
}
// Disconnect from target.
// \todo [petri] keep link active for longer?
link.Stop()
// Reset unfinished (running) cases to pending, so they can be re-run in the future.
for testCasePath, _ := range currentlyRunningCases {
runner.setTestCaseStatus(batchResultId, testCasePath, TEST_STATUS_CODE_PENDING)
}
// Remove processed cases from the list
dstNdx := 0
for srcNdx := 0; srcNdx < len(testCasePaths); srcNdx++ {
casePath := testCasePaths[srcNdx]
if _, ok := processedCasePaths[casePath]; !ok {
testCasePaths[dstNdx] = testCasePaths[srcNdx]
dstNdx++
}
}
numProcessed := len(testCasePaths) - dstNdx
if numProcessed > 0 {
appendRunnerLogLine(fmt.Sprintf("%d test case(s) processed", numProcessed))
testCasePaths = testCasePaths[0:dstNdx]
didProgress = true
}
if runCanceled {
appendRunnerLogLine("Run canceled")
}
if runCanceled || didProgress {
break
}
appendRunnerLogLine("WARNING: no test cases processed")
}
// Exit loop if run was stopped or no progress was made.
if runCanceled || !didProgress {
break
}
}
}
// Mark the batch inactive and set its status.
var batchStatus BatchStatusCode
if runCanceled {
batchStatus = BATCH_STATUS_CODE_CANCELED
} else if len(testCasePaths) > 0 {
batchStatus = BATCH_STATUS_CODE_INTERRUPTED
} else {
batchStatus = BATCH_STATUS_CODE_FINISHED
}
// Write status of batch result (in batchResult itself and in list of active batchResults).
{
opSet := rtdb.NewOpSet()
opSet.Call(typeBatchResult, batchResultId, "SetStatus", batchStatus)
opSet.Call(typeActiveBatchResultList, "activeBatchResultList", "Remove", batchResultId)
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
if deviceConfig.IsADBDevice {
err := RemoveADBPortForward(deviceConfig.ADBSerialNumber, batchParams.TargetPort)
if err != nil {
appendRunnerLogLine(fmt.Sprintf("WARNING: Failed to remove ADB port forward: %v", err))
}
}
appendRunnerLogLine(fmt.Sprintf("Ending test batch execution at %v", time.Now().Format(defaultHumanReadableTimeFormat)))
}
// Create and initialize the test header, result, and tree node objects for the given batch.
func (runner *TestRunner) initializeBatchResult (batchResultId string) {
log.Printf("[runner] initialize test headers and results for batch: batchResultId=%q\n", batchResultId)
{
totalOps := batchResultHierarchyInitOps(runner.rtdbServer, batchResultId)
// Do transactions in chunks, because there can a big amount of operations,
// and we don't want the DB to be blocked for too long at once.
// Also keep updating the batch's InitProgress to inform the user.
startTime := time.Now()
{
numOpsDone := 0
chunkOps := rtdb.NewOpSet()
for target, opList := range totalOps.ObjectOps {
chunkOps.ObjectOps[target] = opList
numOpsDone++
if len(chunkOps.ObjectOps) == 500 {
chunkOps.Call(typeBatchResult, batchResultId, "SetInitProgress", float32(numOpsDone) / float32(len(totalOps.ObjectOps)))
err := runner.rtdbServer.ExecuteOpSet(chunkOps)
if err != nil { panic(err) }
chunkOps = rtdb.NewOpSet()
}
}
chunkOps.Call(typeBatchResult, batchResultId, "SetInitProgress", float32(1.0))
err := runner.rtdbServer.ExecuteOpSet(chunkOps)
if err != nil { panic(err) }
}
elapsed := time.Now().Sub(startTime)
log.Printf("[runner] initializeBatchResult hierarchy init op exec duration: %v\n", elapsed)
}
// \note Setting the batch result status to "pending" (from "initializing") should be the last
// operation here, so in case Cherry e.g. crashes it won't think the batch was initialized
// when it really wasn't.
opSet := rtdb.NewOpSet()
opSet.Call(typeBatchResult, batchResultId, "SetStatus", BATCH_STATUS_CODE_PENDING)
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
func (runner *TestRunner) filterPendingCasePaths (batchResultId string, testCasePaths []string) []string {
rtdbServer := runner.rtdbServer
// Get paths of unfinished test cases, and only execute those.
pendingTestCasePaths := make([]string, 0)
for _, testCasePath := range testCasePaths {
caseObjId := batchResultId + "/" + testCasePath
var testCaseHeader TestCaseHeader
err := rtdbServer.GetObject(caseObjId, &testCaseHeader)
if err != nil { panic(err) }
if testCaseHeader.Status == TEST_STATUS_CODE_PENDING {
pendingTestCasePaths = append(pendingTestCasePaths, testCasePath)
}
}
return pendingTestCasePaths
}
func importTestPackage (packageName string, binaryName string, binaryDir string, testCaseFileName string) *TestPackageInfo {
// Import test case tree.
log.Printf("[config] importTestPackage('%s')\n", packageName)
// Open file.
xmlFile, err := os.Open(testCaseFileName)
if err != nil {
log.Printf("Failed to open test case file (%s : %s)\n", testCaseFileName, err.Error())
return nil
}
defer xmlFile.Close()
testCaseTree, err := importTestCaseTree(xmlFile, packageName)
if err != nil {
log.Printf("Failed to parse test case file (%s : %s)\n", testCaseFileName, err.Error())
return nil
}
// Create test package.
testPackage := TestPackageInfo {
name: packageName,
binaryName: binaryName,
binaryDir: binaryDir,
testCaseTree: testCaseTree,
testCaseList: testCaseTree.GetLinearizedList(),
}
return &testPackage
}
// Return the list imported packages. Failure to import leaves the failing package silently out of the list.
func importTestPackages (descriptors []TestPackageDescriptor) []*TestPackageInfo {
packageList := []*TestPackageInfo{}
for _, descriptor := range(descriptors) {
testPackage := importTestPackage(descriptor.packageName, descriptor.binaryName, descriptor.binaryDir, descriptor.testCaseFileName)
if (testPackage != nil) {
packageList = append(packageList, testPackage)
} else {
log.Printf("Failed to import package '%s'\n", descriptor.packageName)
}
}
return packageList
}
func NewTestRunner (rtdbServer *rtdb.Server) *TestRunner {
var dataDir = "data/"
testPackageDescriptors := []TestPackageDescriptor {
// name binary path test case listing file
{"dE-IT", "de-internal-tests", "internal", dataDir + "dE-IT-cases.xml"},
{"dEQP-EGL", "deqp-egl", "egl", dataDir + "dEQP-EGL-cases.xml"},
{"dEQP-GLES2", "deqp-gles2", "gles2", dataDir + "dEQP-GLES2-cases.xml"},
{"dEQP-GLES3", "deqp-gles3", "gles3", dataDir + "dEQP-GLES3-cases.xml"},
{"dEQP-GLES31", "deqp-gles31", "gles31", dataDir + "dEQP-GLES31-cases.xml"},
{"dEQP-VK", "deqp-vk", "../external/vulkancts/modules/vulkan", dataDir + "dEQP-VK-cases.xml"},
}
packageList := importTestPackages(testPackageDescriptors)
// Concatenate all test case names to one big list used, e.g., in test launch view
fullTestCaseList := make([]string, 0)
for _, testPackage := range packageList {
fullTestCaseList = append(fullTestCaseList, testPackage.testCaseList...)
}
log.Printf("[config] total cases imported: %d\n", len(fullTestCaseList))
// List of packages used in the server side test launch
testPackages := make(map[string]TestPackageInfo, 0)
for _, testPackage := range packageList {
testPackages[testPackage.name] = *testPackage
}
queueControl := make(chan batchExecQueueControl)
importControl := make(chan batchImportControl)
runner := &TestRunner {
rtdbServer: rtdbServer,
testPackages: testPackages,
fullTestCaseList: fullTestCaseList,
queueControl: queueControl,
importControl: importControl,
}
go runner.handleQueue(queueControl)
go runner.handleImports(importControl)
return runner
}
func AddressQueueId (address string, port int) string {
return address + ":" + strconv.Itoa(port)
}
func batchQueueId (params BatchExecParams) string {
return AddressQueueId(params.TargetAddress, params.TargetPort)
}
// Create a new batch and start executing asynchronously.
func (runner *TestRunner) ExecuteTestBatch (batchName string, batchParams BatchExecParams, timestamp time.Time) (string, error) {
// Resolve test case list to execute.
// \todo [petri] fetch testCaseList dynamically from target?
log.Printf("[runner] test name filters: %q\n", batchParams.TestNameFilters)
testCasePaths := filterTestCaseNames(runner.fullTestCaseList, batchParams.TestNameFilters)
log.Printf("[runner] filtered from %d cases to %d\n", len(runner.fullTestCaseList), len(testCasePaths))
return runner.ExecuteTestBatchWithCaseList(batchName, batchParams, timestamp, testCasePaths)
}
func (runner *TestRunner) ExecuteTestBatchWithTestSet (batchName string, batchParams BatchExecParams, timestamp time.Time, testSetId string) (string, error) {
var testSet TestSet
err := runner.rtdbServer.GetObject(testSetId, &testSet)
log.Printf("Start test set '%s' with %d filters.\n", testSetId, len(testSet.Filters))
if err != nil {
panic(err)
}
for ndx, filter := range(testSet.Filters) {
if ndx > 5 {
log.Printf("...")
break
}
log.Printf(" Filter: '%s'", filter)
}
return runner.ExecuteTestBatchWithCaseList(batchName, batchParams, timestamp, testSet.Filters)
}
// Create a new batch, with a specific case list and no regard to batchParams.TestNameFilters, and start executing asynchronously.
func (runner *TestRunner) ExecuteTestBatchWithCaseList (batchName string, batchParams BatchExecParams, timestamp time.Time, testCasePaths []string) (string, error) {
batchResultId := runner.rtdbServer.MakeUniqueID()
opSet := rtdb.NewOpSet()
// Empty batch result.
batchResult := BatchResult {
Name: batchName,
ExecParams: batchParams,
Status: BATCH_STATUS_CODE_INITIALIZING,
Timestamp: timestamp,
}
// Header to batch result list.
batchResultHeader := BatchResultHeader {
Id: batchResultId,
Name: batchName,
}
// Store in rtdb. \note Also initialize root group node already, so that the batch result list may show something sensible.
opSet.Call(typeBatchResultList, "batchResultList", "Append", batchResultHeader)
opSet.Call(typeActiveBatchResultList, "activeBatchResultList", "Append", batchResultId)
opSet.Call(typeBatchResult, batchResultId, "Init", batchResult)
opSet.Call(typeTestCaseList, batchResultId, "Init", TestCaseList { Paths: testCasePaths })
opSet.Call(typeTestCaseTreeGroup, batchResultId + "/", "Init", TestCaseTreeGroup { NumTotalCases: len(testCasePaths) })
opSet.Call(typeBatchResultExecutionLog, batchResultId, "Init")
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
// Initialize and enqueue in background.
go func () {
runner.initializeBatchResult(batchResultId);
runner.queueControl <- batchExecQueueControlEnqueue {
batchResultId: batchResultId,
queueId: batchQueueId(batchParams),
}
}()
return batchResultId, nil
}
// Send a stop request to the given batch execution or import.
func (runner *TestRunner) StopBatchExecution (batchResultId string) error {
var batchResult BatchResult
err := runner.rtdbServer.GetObject(batchResultId, &batchResult)
if err != nil { return err }
if batchResult.Status == BATCH_STATUS_CODE_IMPORTING {
runner.importControl <- batchImportControlStop {
batchResultId: batchResultId,
}
} else {
runner.queueControl <- batchExecQueueControlStopBatch {
batchResultId: batchResultId,
queueId: batchQueueId(batchResult.ExecParams),
}
}
return nil
}
// Resume the execution of a previously stopped batch, asynchronously.
func (runner *TestRunner) ContinueBatchExecution (batchResultId string) error {
batchStatusPending := BATCH_STATUS_CODE_PENDING
var batchResult BatchResult
err := runner.rtdbServer.GetObject(batchResultId, &batchResult)
if err != nil { return err }
if batchResult.Status != BATCH_STATUS_CODE_CANCELED && batchResult.Status != BATCH_STATUS_CODE_INTERRUPTED {
return fmt.Errorf("[runner] WARNING: trying to continue batch '%s' with status '%s'", batchResultId, batchResult.Status)
}
opSet := rtdb.NewOpSet()
opSet.Call(typeActiveBatchResultList, "activeBatchResultList", "Append", batchResultId)
opSet.Call(typeBatchResult, batchResultId, "SetStatus", batchStatusPending)
err = runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
runner.queueControl <- batchExecQueueControlEnqueue {
batchResultId: batchResultId,
queueId: batchQueueId(batchResult.ExecParams),
}
return nil
}
func (runner *TestRunner) MoveBatchInQueue (batchResultId string, offset int) error {
var batchResult BatchResult
err := runner.rtdbServer.GetObject(batchResultId, &batchResult)
if err != nil { return err }
runner.queueControl <- batchExecQueueControlMove {
batchResultId: batchResultId,
queueId: batchQueueId(batchResult.ExecParams),
offset: offset,
}
return nil
}
func (runner *TestRunner) QueryBatchExecutionLog (batchResultId string) (string, error) {
var batchResult BatchResult
err := runner.rtdbServer.GetObject(batchResultId, &batchResult)
if err != nil { return "", fmt.Errorf("when trying to get batch result object: %v", err) }
executionLogChan := make(chan string, 1)
runner.queueControl <- batchExecQueueControlExecutionLogQuery {
batchResultId: batchResultId,
queueId: batchQueueId(batchResult.ExecParams),
dst: executionLogChan,
}
return <-executionLogChan, nil
}
func (runner *TestRunner) ImportBatch (batchResultDefaultName string, qpaReader *multipart.Part, totalContentLength int64) error {
batchResultId := runner.rtdbServer.MakeUniqueID()
var stopRequest <-chan struct{}
{
stopRequestBidir := make(chan struct{})
stopRequest = stopRequestBidir
runner.importControl <- batchImportControlStarted { batchResultId, stopRequestBidir }
}
qpaParserQueue := make(chan TestExecutorEvent, 4)
qpaParser := CreateLogContainerParser(qpaParserQueue)
countingQpaReader := NewCountingReader(qpaReader)
scanner := bufio.NewScanner(countingQpaReader)
scanner.Buffer(nil, 256*1024)
// Initialize batch result and root group. Add to batch result list.
{
opSet := rtdb.NewOpSet()
batchResult := BatchResult {
Name: batchResultDefaultName,
Status: BATCH_STATUS_CODE_IMPORTING,
InitProgress: 0.0,
}
batchResultHeader := BatchResultHeader {
Id: batchResultId,
Name: batchResultDefaultName,
}
opSet.Call(typeBatchResultList, "batchResultList", "Append", batchResultHeader)
opSet.Call(typeActiveBatchResultList, "activeBatchResultList", "Append", batchResultId)
opSet.Call(typeBatchResult, batchResultId, "Init", batchResult)
opSet.Call(typeTestCaseList, batchResultId, "Init", TestCaseList { Paths: []string{} })
opSet.Call(typeTestCaseTreeGroup, batchResultId + "/", "Init", TestCaseTreeGroup{})
opSet.Call(typeBatchResultExecutionLog, batchResultId, "Init")
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
isBatchFinished := true // Whether the batch contains only finished cases, and contains an #endSession.
// Start reading input and writing to db.
{
opSet := rtdb.NewOpSet()
keepReading := true
eofReached := false
lastDbExecReadCount := int64(0) // The value of countingQpaReader.Count() when the latest write to DB was done.
existingTreeNodePaths := make(map[string]struct{})
existingTreeNodePaths[""] = struct{}{} // \note Root node was already added above.
for keepReading {
select {
case <-stopRequest:
keepReading = false
case event := <-qpaParserQueue:
switch event.(type) {
case EventSessionInfoRead:
sessionInfo := event.(EventSessionInfoRead).sessionInfo
if sessionInfo.DeviceId == "" {
sessionInfo.DeviceId = "Unknown"
}
if sessionInfo.ResultName != "" {
opSet.Call(typeBatchResultList, "batchResultList", "SetBatchResultName", batchResultId, sessionInfo.ResultName)
opSet.Call(typeBatchResult, batchResultId, "SetName", sessionInfo.ResultName)
}
opSet.Call(typeBatchResult, batchResultId, "SetSessionInfo", sessionInfo)
{
err := runner.rtdbServer.GetObject(sessionInfo.DeviceId, &DeviceConfig{})
if err != nil {
var newConfig DeviceConfig
if sessionInfo.ADBSerialNumber != "" {
newConfig.IsADBDevice = true
newConfig.ADBSerialNumber = sessionInfo.ADBSerialNumber
newConfig.TargetAddress = "127.0.0.1"
newConfig.TargetPort = 50016
} else {
if sessionInfo.DeviceId == "Unknown" {
newConfig.Name = "Unknown"
} else {
newConfig.Name = "Unnamed"
}
opSet.Call(typeDeviceConfigList, "deviceConfigList", "Append", DeviceConfigHeader { sessionInfo.DeviceId })
}
opSet.Call(typeDeviceConfig, sessionInfo.DeviceId, "Init", newConfig)
}
}
case EventTestCaseFinished:
testCaseHeader, testCaseResult := augmentTestCaseInfo(event.(EventTestCaseFinished))
caseObjId := batchResultId + "/" + testCaseResult.Path
opSet.Call(typeTestCaseHeader, caseObjId, "Init", testCaseHeader)
opSet.Call(typeTestCaseResult, caseObjId, "Init", testCaseResult)
opSet.Call(typeTestCaseList, batchResultId, "Append", testCaseResult.Path)
if testCaseHeader.Status == TEST_STATUS_CODE_PENDING {
isBatchFinished = false
}
{
statsDelta := testStatusCodeStatsDelta(testCaseHeader.Status)
pathParts := strings.Split(testCaseResult.Path, ".")
for ndx := 0; ndx < len(pathParts); ndx++ {
nodePath := strings.Join(pathParts[:ndx], ".")
nodeObjId := batchResultId + "/" + nodePath
if _, ok := existingTreeNodePaths[nodePath]; !ok {
existingTreeNodePaths[nodePath] = struct{}{}
opSet.Call(typeTestCaseTreeGroup, nodeObjId, "Init", TestCaseTreeGroup{})
}
opSet.Call(typeTestCaseTreeGroup, nodeObjId, "AddCase")
if testCaseHeader.Status != TEST_STATUS_CODE_PENDING {
opSet.Call(typeTestCaseTreeGroup, nodeObjId, "UpdateStats", statsDelta)
}
}
}
}
default:
if (eofReached) {
keepReading = false
} else if scanner.Scan() {
qpaParser.ParseLine(scanner.Text())
} else {
// EOF reached
qpaParser.Terminate()
eofReached = true
}
}
// \note Write to DB in chunks.
readCount := countingQpaReader.Count()
if readCount - lastDbExecReadCount >= 4*1024*1024 {
if totalContentLength != -1 { // Content-Length = -1 mean unknown
approximateProgress := float32(float64(readCount) / float64(totalContentLength))
opSet.Call(typeBatchResult, batchResultId, "SetInitProgress", approximateProgress)
}
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
opSet = rtdb.NewOpSet()
lastDbExecReadCount = readCount
}
}
opSet.Call(typeBatchResult, batchResultId, "SetInitProgress", float32(1.0))
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
if qpaParser.IsInsideSession() {
isBatchFinished = false
}
{
opSet := rtdb.NewOpSet()
var status BatchStatusCode
if isBatchFinished {
status = BATCH_STATUS_CODE_FINISHED
} else {
status = BATCH_STATUS_CODE_INTERRUPTED
}
opSet.Call(typeBatchResult, batchResultId, "SetStatus", status)
opSet.Call(typeActiveBatchResultList, "activeBatchResultList", "Remove", batchResultId)
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
runner.importControl <- batchImportControlFinished { batchResultId }
return nil
}
// Batch execution queue controlling utilities.
type batchExecQueueControl interface {
}
// Enqueue a new batch execution.
type batchExecQueueControlEnqueue struct {
batchResultId string
queueId string
}
// Request stopping a batch run.
type batchExecQueueControlStopBatch struct {
batchResultId string
queueId string
}
// Move a batch to a different position in the queue.
type batchExecQueueControlMove struct {
batchResultId string
queueId string
offset int
}
// Query the execution log of a batch. The log will be sent to dst.
// \note This exists because the last part of the execution log of
// a running batch is kept in memory, not in DB.
type batchExecQueueControlExecutionLogQuery struct {
batchResultId string
queueId string
dst chan<- string
}
func (runner *TestRunner) isPartiallyExecutedBatch (batchResultId string) bool {
var rootGroup TestCaseTreeGroup
err := runner.rtdbServer.GetObject(batchResultId + "/", &rootGroup)
if err != nil { panic(err) }
return rootGroup.NumResults() != 0
}
// Get the DB version of the execution log of a batch result, i.e. the entire
// execution log if the batch isn't running, or, if the batch is running, the
// version containing all but the contents resulting from the current execution.
func (runner *TestRunner) getBatchResultPastExecutionLog (batchResultId string) string {
var pastExecutionLog BatchResultExecutionLog
err := runner.rtdbServer.GetObject(batchResultId, &pastExecutionLog)
if err != nil { panic(err) }
return pastExecutionLog.Content
}
// Handle the (e.g. per-device) batch execution queues. New batch enqueues and other queue operations
// are read from the channel. Removes finished/stopped batches from the queue.
// \todo [nuutti] There's currently no proper way of reporting errors (such as trying to stop a
// batch in a non-existing queue). Do we need a response channel of some kind?
func (runner *TestRunner) handleQueue (queueControl <-chan batchExecQueueControl) {
execEndSignal := make(chan string)
execute := func (enq batchExecQueueControlEnqueue, stopRequest <-chan struct{}, executionLogQuery <-chan chan<- string) {
// Handle the execution log in a separate goroutine rather than in
// the main loop of runner.executeBatch, so that any stalls in
// executeBatch don't delay execution log queries.
// The handler is controlled via the following channels.
executionLogAppend := make(chan string) // Append a string to the log; don't commit to DB yet.
executionLogCommit := make(chan struct{}) // Commit uncommitted changes to DB.
executionLogStop := make(chan struct{}) // Stop the goroutine.
go func() {
executionLog := bytes.Buffer{}
for {
select {
case dst := <-executionLogQuery:
dst <- runner.getBatchResultPastExecutionLog(enq.batchResultId) + executionLog.String()
case str := <-executionLogAppend:
executionLog.WriteString(str)
case <-executionLogCommit:
opSet := rtdb.NewOpSet()
opSet.Call(typeBatchResultExecutionLog, enq.batchResultId, "Append", executionLog.String())
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
executionLog.Reset()
case <-executionLogStop:
if executionLog.Len() > 0 { panic("Execution log handler stopped, but non-committed data remains") }
return
}
}
}()
executionLogAppend <- fmt.Sprintf("Batch reached front of its queue at %v\n", time.Now().Format(defaultHumanReadableTimeFormat))
var batchResult BatchResult
err := runner.rtdbServer.GetObject(enq.batchResultId, &batchResult)
if err != nil { panic(err) }
var testCaseList TestCaseList
err = runner.rtdbServer.GetObject(enq.batchResultId, &testCaseList)
if err != nil { panic(err) }
casePaths := testCaseList.Paths
if runner.isPartiallyExecutedBatch(enq.batchResultId) {
executionLogAppend <- fmt.Sprintf("Batch is partially executed, filtering pending cases\n")
casePaths = runner.filterPendingCasePaths(enq.batchResultId, casePaths)
}
runner.executeBatch(enq.batchResultId, batchResult.ExecParams, casePaths, stopRequest, executionLogAppend)
executionLogCommit <- struct{}{}
execEndSignal <- enq.queueId
executionLogStop <- struct{}{}
}
queueStopRequest := make(map[string]chan<- struct{})
queueExecutionLogQuery := make(map[string]chan<- chan<- string)
launch := func (enq batchExecQueueControlEnqueue) {
stopRequest := make(chan struct{}, 1)
executionLogQuery := make(chan chan<- string, 1)
queueStopRequest[enq.queueId] = stopRequest
queueExecutionLogQuery[enq.queueId] = executionLogQuery
go execute(enq, stopRequest, executionLogQuery)
}
for {
select {
case command := <-queueControl:
switch cmd := command.(type) {
case batchExecQueueControlEnqueue:
var queue DeviceBatchQueue
err := runner.rtdbServer.GetObject(cmd.queueId, &queue)
if err != nil {
// Queue does not exist; create it.
opSet := rtdb.NewOpSet()
opSet.Call(typeDeviceBatchQueueList, "deviceBatchQueueList", "Append", cmd.queueId)
opSet.Call(typeDeviceBatchQueue, cmd.queueId, "Init")
err = runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
log.Printf("[runner] created queue '%s'", cmd.queueId)
}
opSet := rtdb.NewOpSet()
opSet.Call(typeDeviceBatchQueue, cmd.queueId, "Append", cmd.batchResultId)
err = runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
if len(queue.BatchResultIds) == 0 { // \note queue is the queue before appending.
launch(cmd);
}
case batchExecQueueControlStopBatch:
var queue DeviceBatchQueue
err := runner.rtdbServer.GetObject(cmd.queueId, &queue)
if err != nil {
log.Printf("[runner] WARNING: stop request for non-existent queue '%s'", cmd.queueId)
continue
}
found := false
for ndx, enqueuedId := range queue.BatchResultIds {
if enqueuedId == cmd.batchResultId {
if ndx == 0 {
select {
case queueStopRequest[cmd.queueId] <- struct{}{}:
log.Printf("[runner] stop request sent for batch '%s'\n", cmd.batchResultId)
default:
log.Printf("[runner] stop request already sent for batch '%s'\n", cmd.batchResultId)
}
} else {
log.Printf("[runner] cancelled pending batch '%s'\n", cmd.batchResultId)
// Set batch status, and remove it from the queue and active batch list.
opSet := rtdb.NewOpSet()
opSet.Call(typeBatchResult, cmd.batchResultId, "SetStatus", BATCH_STATUS_CODE_CANCELED)
opSet.Call(typeActiveBatchResultList, "activeBatchResultList", "Remove", cmd.batchResultId)
opSet.Call(typeDeviceBatchQueue, cmd.queueId, "Remove", cmd.batchResultId)
err = runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
found = true
break
}
}
if !found {
log.Printf("[runner] WARNING: stop request for batch '%s', does not exist in queue '%s'\n", cmd.batchResultId, cmd.queueId)
}
case batchExecQueueControlMove:
var queue DeviceBatchQueue
err := runner.rtdbServer.GetObject(cmd.queueId, &queue)
if err != nil {
log.Printf("[runner] WARNING: move command for non-existent queue '%s'", cmd.queueId)
continue
}
found := false
for srcNdx, enqueuedId := range queue.BatchResultIds {
if enqueuedId == cmd.batchResultId {
dstNdx := srcNdx + cmd.offset
if srcNdx == 0 || dstNdx == 0 {
// \todo [nuutti] Support moving running batch? We'd have to automatically
// stop it first, which can be slow, so it could get confusing?
log.Printf("[runner] WARNING: trying to move currently to/from running batch in queue\n")
} else {
if dstNdx < 0 || dstNdx >= len(queue.BatchResultIds) {
log.Printf("[runner] WARNING: trying to move batch to position %d\n", dstNdx)
} else {
opSet := rtdb.NewOpSet()
opSet.Call(typeDeviceBatchQueue, cmd.queueId, "Move", srcNdx, dstNdx)
err := runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
}
}
found = true
break
}
}
if !found {
log.Printf("[runner] WARNING: move command for batch '%s', does not exist in queue '%s'\n", cmd.batchResultId, cmd.queueId)
}
case batchExecQueueControlExecutionLogQuery:
var queue DeviceBatchQueue
err := runner.rtdbServer.GetObject(cmd.queueId, &queue)
if err != nil { cmd.dst <- runner.getBatchResultPastExecutionLog(cmd.batchResultId); continue }
querySent := false
for ndx, enqueueId := range queue.BatchResultIds {
if enqueueId == cmd.batchResultId {
if ndx == 0 {
queueExecutionLogQuery[cmd.queueId] <- cmd.dst
querySent = true
}
break
}
}
if !querySent {
cmd.dst <- runner.getBatchResultPastExecutionLog(cmd.batchResultId)
}
}
case queueId := <-execEndSignal:
var queue DeviceBatchQueue
err := runner.rtdbServer.GetObject(queueId, &queue)
if err != nil { panic(err) } // \note This shouldn't happen (a batch run ends while it's not even in the queue).
opSet := rtdb.NewOpSet()
opSet.Call(typeDeviceBatchQueue, queueId, "Remove", queue.BatchResultIds[0])
err = runner.rtdbServer.ExecuteOpSet(opSet)
if err != nil { panic(err) }
if len(queue.BatchResultIds) > 1 { // \note queue is the queue before removal.
launch(batchExecQueueControlEnqueue{
batchResultId: queue.BatchResultIds[1],
queueId: queueId,
})
}
}
}
}
// Batch import controlling utilities.
type batchImportControl interface {
}
type batchImportControlStarted struct {
batchResultId string
stopChannel chan<- struct{}
}
type batchImportControlFinished struct {
batchResultId string
}
type batchImportControlStop struct {
batchResultId string
}
func (runner *TestRunner) handleImports (importControl <-chan batchImportControl) {
stopChannels := make(map[string]chan<- struct{})
for {
command := <-importControl
switch cmd := command.(type) {
case batchImportControlStarted:
if _, alreadyExists := stopChannels[cmd.batchResultId]; alreadyExists {
panic("[runner] Duplicate import for batch " + cmd.batchResultId)
}
stopChannels[cmd.batchResultId] = cmd.stopChannel
case batchImportControlFinished:
if _, exists := stopChannels[cmd.batchResultId]; !exists {
panic("[runner] Non-existent import reported as finished for batch " + cmd.batchResultId)
}
delete(stopChannels, cmd.batchResultId)
case batchImportControlStop:
if stop, ok := stopChannels[cmd.batchResultId]; ok {
stop <- struct{}{}
} else {
log.Printf("[runner] WARNING: tried to stop import for non-importing batch %s\n", cmd.batchResultId)
}
}
}
}