blob: ea842539101740ff7b73f6300f92449e5ab61472 [file] [log] [blame]
// Copyright (C) 2016 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
"bufio"
"io"
"sync/atomic"
"android.googlesource.com/platform/tools/gpu/framework/binary"
"android.googlesource.com/platform/tools/gpu/framework/binary/cyclic"
"android.googlesource.com/platform/tools/gpu/framework/binary/vle"
"android.googlesource.com/platform/tools/gpu/framework/log"
"android.googlesource.com/platform/tools/gpu/framework/multiplexer"
)
// Handler is the signature for a function that handles incoming rpc calls.
type Handler func(log.Context, interface{}) (binary.Object, error)
var lastRPCID = uint32(1)
// Serve implements the receiving side of a client server rpc pair.
// It listens on the reader for calls, and dispatches them to the supplied handler.
// Any result returned from the handler is then sent back down the writer.
func Serve(ctx log.Context, s io.ReadWriteCloser, mtu int, handler Handler) {
multiplexer.New(ctx, s, mtu, func(ctx log.Context, channel io.ReadWriteCloser) {
// If Close fails, multiplexer already knows, so we ignore the error
defer channel.Close()
ctx = ctx.V("RPC", atomic.AddUint32(&lastRPCID, 1))
w := bufio.NewWriterSize(channel, mtu)
// Flush only fails if channel fails, multiplexer already knows, so we ignore the error
defer w.Flush()
d := cyclic.Decoder(vle.Reader(channel))
e := cyclic.Encoder(vle.Writer(w))
// Function used for encoding an error
encErr := func(err error) {
if rpcErr, ok := err.(Err); ok {
e.Object(rpcErr)
} else {
e.Object(&Error{Msg: err.Error()})
}
}
// Check the RPC header
var h [4]byte
d.Data(h[:])
switch h {
case headerV0:
// Legacy support for a single string error message type.
encErr = func(err error) { e.Object(&Error{Msg: err.Error()}) }
case headerV1:
default:
err := &ErrInvalidHeader{Header: h}
ctx.Fail(err, "")
encErr(err)
return
}
// Decode the call
val := d.Object()
if err := d.Error(); err != nil {
ctx.Error().Fail(err, "Decoding call")
encErr(&ErrDecodingCall{Reason: err.Error()})
return
}
// Invoke the call
res, err := handler(ctx, val)
// Encode the call result
if err == nil {
e.Object(res) // Success
} else {
encErr(err) // Failure
}
if err := e.Error(); err != nil {
ctx.Error().V("value", val).Fail(err, "Encoding result")
return
}
})
}