blob: 717520d22ace939a1b52d5a3c2c1ddf62465fea8 [file] [log] [blame]
/*
* Copyright 2015 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cherry
import (
"bytes"
"fmt"
"io"
"log"
"net"
"os/exec"
"strings"
"time"
)
// CommLinkTcpIp
type CommLinkParams struct {
SpawnProcessPath string
TargetAddress string
TargetPort int
}
type CommLinkTcpIp struct {
params CommLinkParams
spawnProcessCommand *exec.Cmd
spawnProcessOutput bytes.Buffer
socket net.Conn
appendCommLogLine func(string)
}
func NewCommLinkTcpIp (params CommLinkParams, appendCherryLogLine func(string)) *CommLinkTcpIp {
appendCommLogLine := func(content string) {
log.Printf("[comm] %s\n", content)
appendCherryLogLine(content)
}
return &CommLinkTcpIp {
params: params,
appendCommLogLine: appendCommLogLine,
}
}
// Constants
const (
KEEP_ALIVE_INTERVAL_SECONDS = 5
KEEP_ALIVE_TIMEOUT_MINUTES = 20
STOP_REQUEST_TIMEOUT_SECONDS = 10
)
// TestExecutorEvent
type TestExecutorEvent interface {
}
type EventProcessStarted struct {
}
type EventProcessLaunchFailed struct {
reason string
}
type EventSessionInfoRead struct {
sessionInfo BatchSessionInfo
}
type EventInfoLogRead struct {
infoLog string
}
type EventTestCaseStarted struct {
testCasePath string
}
type EventTestCaseFinished struct {
path string
status TestStatusCode
log string
}
type EventExecutionFinished struct {
}
// LineSplitter
type LineSplitter struct {
buffer string
}
func (s *LineSplitter) Push (data string) {
s.buffer = s.buffer + data
}
func (s *LineSplitter) GetLines () []string {
pieces := strings.Split(s.buffer, "\n")
s.buffer = pieces[len(pieces)-1]
return pieces[:len(pieces)-1]
}
// TcpIpLink
func (link *CommLinkTcpIp) handleConnection (conn net.Conn, eventChannel chan TestExecutorEvent, stopRequest <-chan struct{}) {
defer log.Printf("[comm] handleConnection() returning..\n")
defer func(){ eventChannel <- EventExecutionFinished{} }()
type received struct {
msg Message
err error
}
receiveQueue := make(chan received, 32)
// Spawn message reader.
// \todo [petri] clean way to close this thread .. currently exits with error when socket closed
go func() {
for {
msg, err := ReadMessage(conn)
receiveQueue <- received{ msg, err }
if err != nil { return }
}
}()
log.Printf("[comm] writer goroutine active!\n")
// Keepalive state.
lastKeepaliveSent := time.Now()
lastKeepaliveReceived := time.Now()
// QPA parsing.
lineSplitter := LineSplitter{}
qpaParserQueue := make(chan TestExecutorEvent, 4)
qpaParser := CreateLogContainerParser(qpaParserQueue)
isStopRequested := false // stopRequest signaled at least once?
isStopExecMsgPending := false // stopRequest signaled after we last sent MsgStopExecution?
var firstStopRequestArrival time.Time
// \todo [nuutti] This is a kludge to avoid sending MsgStopExecution before we've
// received any log data; seems that sending it too soon won't have the
// desired effect. Should investigate if this is a problem in the
// execserver's end.
hasAnyLogDataArrived := false
handleParserQueue := func () {
// Get result from queue if available.
select {
case event := <-qpaParserQueue:
// \todo [petri] handle some events locally? only write to channel in parser?
eventChannel <- event
default:
// Nothing
}
}
// Handle connection.
linkLoop:
for {
select {
case <-stopRequest:
log.Printf("[comm] got stop request\n")
isStopExecMsgPending = true
if !isStopRequested {
isStopRequested = true
firstStopRequestArrival = time.Now()
}
case received := <- receiveQueue:
msg := received.msg
err := received.err
if err == io.EOF {
link.appendCommLogLine("Remote host closed connection")
break linkLoop
} else if err != nil {
log.Printf("[comm] GOT ERROR FROM READER: %s\n", err)
link.appendCommLogLine("Comm link terminating")
break linkLoop
} else {
// Handle incoming messages.
switch msg.(type) {
case *MsgProcessStarted:
log.Printf("[comm] process started!\n")
eventChannel <- EventProcessStarted{}
case *MsgProcessLaunchFailed:
launchFailed := msg.(*MsgProcessLaunchFailed)
log.Printf("[comm] WARNING: remote process launch failed: %s!\n", launchFailed.Reason)
eventChannel <- EventProcessLaunchFailed{ launchFailed.Reason }
case *MsgProcessFinished:
log.Printf("[comm] remote process finished!\n")
conn.Close() // \todo [petri] use CloseRead() to avoid error about reading from closed connection
case *MsgProcessLogData:
hasAnyLogDataArrived = true
logData := msg.(*MsgProcessLogData)
lineSplitter.Push(logData.Content)
for _, line := range lineSplitter.GetLines() {
qpaParser.ParseLine(line)
handleParserQueue()
}
case *MsgInfo:
info := msg.(*MsgInfo)
eventChannel <- EventInfoLogRead{ info.Content }
case *MsgKeepAlive:
lastKeepaliveReceived = time.Now()
default:
log.Printf("[comm] warning: unknown message received: %T\n", msg)
}
}
// \todo [petri] compute based on keepalive timing
case <-time.After(100 * time.Millisecond):
// nada..
}
// Check client keepalive timeout.
timeoutDiff := time.Now().Sub(lastKeepaliveReceived)
if timeoutDiff >= KEEP_ALIVE_TIMEOUT_MINUTES * time.Minute {
link.appendCommLogLine("Client timeout")
// \todo[petri] handle timeout properly
break linkLoop
}
if isStopRequested && time.Now().Sub(firstStopRequestArrival) >= STOP_REQUEST_TIMEOUT_SECONDS * time.Second {
link.appendCommLogLine("Stop request timeout")
break linkLoop
}
// Send keepalive.
keepaliveDiff := time.Now().Sub(lastKeepaliveSent)
if keepaliveDiff >= KEEP_ALIVE_INTERVAL_SECONDS * time.Second {
log.Printf("[comm] send keepalive\n")
msgKeepAlive := MsgKeepAlive{}
err := WriteMessage(conn, &msgKeepAlive)
if err != nil {
break linkLoop
}
lastKeepaliveSent = time.Now()
}
// Send MsgStopExecution if needed.
if hasAnyLogDataArrived && isStopExecMsgPending {
link.appendCommLogLine("Sending stop command to execserver")
err := WriteMessage(conn, &MsgStopExecution{})
if err != nil {
link.appendCommLogLine(fmt.Sprintf("Got error when sending stop command: %s", err))
break linkLoop
}
isStopExecMsgPending = false
}
}
// \note If test run stopped abnormally, a crashed test case may be unterminated;
// terminate it now so it doesn't keep us from continuing in the future.
if !isStopRequested {
qpaParser.Terminate()
handleParserQueue()
}
}
func (link *CommLinkTcpIp) Start () error {
params := link.params
// Spawn local process (if any).
if link.params.SpawnProcessPath != "" {
link.appendCommLogLine(fmt.Sprintf("Spawn process '%s'", params.SpawnProcessPath))
parts := strings.Split(params.SpawnProcessPath, " ") // split executable and args
cmdName := parts[0]
portArg := fmt.Sprintf("--port=%d", link.params.TargetPort)
cmdArgs := append([]string{ portArg }, parts[1:]...)
cmd := exec.Command(cmdName, cmdArgs...)
output := bytes.Buffer{}
cmd.Stdin = strings.NewReader("<local process input>")
cmd.Stdout = &output
err := cmd.Start()
if err != nil { return err }
link.spawnProcessCommand = cmd
link.spawnProcessOutput = output
// Sleep for a bit to let process start up properly.
// \todo [petri] better way to handle this?
time.Sleep(300 * time.Millisecond)
}
// Connect to target.
addr := fmt.Sprintf("%s:%d", params.TargetAddress, params.TargetPort)
link.appendCommLogLine(fmt.Sprintf("Connect to '%s'", addr))
conn, err := net.Dial("tcp", addr)
if err != nil {
link.maybeKillLocalProcess()
return err
}
link.socket = conn
// Send hello.
// \todo [petri] Candy never sends this..?
// hello := MsgHello{ Version: ?? }
// err = WriteMessage(conn, &msgExec)
// if err != nil { return err }
return nil
}
func (link *CommLinkTcpIp) Stop () {
log.Printf("[comm] CommLinkTcpIp.Stop()\n")
// Close connection to execserver.
if link.socket != nil {
link.socket.Close()
}
link.maybeKillLocalProcess()
}
func (link *CommLinkTcpIp) maybeKillLocalProcess () {
// Shut down local process, if any.
if link.spawnProcessCommand != nil {
link.appendCommLogLine("Shutting down local process...")
cmd := link.spawnProcessCommand
// Kill process.
err := cmd.Process.Kill()
if err != nil {
link.appendCommLogLine(fmt.Sprintf("WARNING: error terminating local process: %s", err))
} else {
link.appendCommLogLine("Local process terminated successfully")
}
}
}
type CommLinkExecParams struct {
// For example:
binaryName string // "Debug/deqp-gles2.exe",
commandLine string // "--deqp-watchdog=enable --deqp-crashhandler=enable --deqp-visibility=hidden --deqp-gl-config-name=rgba8888d24s8ms0",
workingDir string // "../../candy-build/deqp-wgl/modules/gles2",
testCasePaths string // "{dEQP-GLES2{..}}"
}
func (link *CommLinkTcpIp) Execute (params CommLinkExecParams, eventChannel chan TestExecutorEvent, stopRequest <-chan struct{}) error {
log.Printf("[comm] CommLinkTcpIp.Execute(binary='%s')\n", params.binaryName)
// Send test execution message.
msgExec := MsgExecuteBinary{
BinaryName: params.binaryName,
Parameters: params.commandLine,
WorkingDir: params.workingDir,
CaseList: params.testCasePaths,
}
link.appendCommLogLine(fmt.Sprintf("Sending command to execute binary '%s' to execserver", params.binaryName))
err := WriteMessage(link.socket, &msgExec)
if err != nil { return err }
// Spawn goroutine for connection.
go link.handleConnection(link.socket, eventChannel, stopRequest)
return nil
}