blob: 0804e87e6690b542faf9a2d79f73a6a3fcdbe7e5 [file] [log] [blame]
// Copyright (C) 2016 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multiplexer
import (
"io"
"android.googlesource.com/platform/tools/gpu/framework/binary"
"android.googlesource.com/platform/tools/gpu/framework/binary/vle"
"android.googlesource.com/platform/tools/gpu/framework/log"
)
type sender struct {
queue chan sendItem
stop chan struct{}
done chan struct{}
}
type sendCloseChannel struct {
c channelID
r chan<- error
}
type sendOpenChannel struct {
c channelID
r chan<- error
}
type sendData struct {
c channelID
d []byte
r chan<- sendDataResult
n int // bytes written
}
type sendDataResult struct {
e error
n int
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func (m sendCloseChannel) channel() channelID { return m.c }
func (m sendOpenChannel) channel() channelID { return m.c }
func (m sendData) channel() channelID { return m.c }
type sendItem interface {
channel() channelID
}
func encodeOpenChannel(w binary.Writer, c channelID) error {
w.Simple(msgTypeOpenChannel)
w.Simple(&msgOpenChannel{channelID: c})
return w.Error()
}
func encodeCloseChannel(w binary.Writer, c channelID) error {
w.Simple(msgTypeCloseChannel)
w.Simple(&msgCloseChannel{channelID: c})
return w.Error()
}
func encodeData(w binary.Writer, c channelID, d []byte) error {
w.Simple(msgTypeData)
w.Simple(&msgData{channelID: c, data: d})
return w.Error()
}
func (s *sender) sendCloseChannel(c channelID) error {
r := make(chan error)
if err := s.enqueue(sendCloseChannel{c, r}); err != nil {
return err
}
select {
case err := <-r:
return err
case <-s.stop:
return ErrMultiplexerClosed
}
}
func (s *sender) sendOpenChannel(c channelID) error {
r := make(chan error)
if err := s.enqueue(sendOpenChannel{c, r}); err != nil {
return err
}
select {
case err := <-r:
return err
case <-s.stop:
return ErrMultiplexerClosed
}
}
func (s *sender) sendData(c channelID, d []byte) (int, error) {
r := make(chan sendDataResult)
if err := s.enqueue(sendData{c, d, r, 0}); err != nil {
return 0, err
}
select {
case res := <-r:
return res.n, res.e
case <-s.stop:
return 0, ErrMultiplexerClosed
}
}
func (s *sender) enqueue(item sendItem) error {
select {
case s.queue <- item:
return nil
case <-s.stop:
return ErrMultiplexerClosed
}
}
func (s *sender) begin(ctx log.Context, mtu int, out io.Writer) {
if s.queue != nil {
panic("sender.begin() called twice")
}
c, done, stop := make(chan sendItem, sendQueueLength), make(chan struct{}), make(chan struct{})
s.queue, s.done, s.stop = c, done, stop
go func() {
defer close(done)
w := vle.Writer(out)
m := sendMap{}
for {
if len(m) == 0 {
// If there's nothing being worked on, block until we have something.
select {
case item := <-c:
m.pushBack(item)
case <-stop:
return
}
} else {
// If we're busy, grab more work only if there's something there.
select {
case item := <-c:
m.pushBack(item)
case <-stop:
return
default:
}
}
for s := range m { // Send something on each of the pending channels...
switch i := m.popFront(s).(type) {
case sendOpenChannel:
i.r <- encodeOpenChannel(w, i.c)
case sendCloseChannel:
i.r <- encodeCloseChannel(w, i.c)
case sendData:
n := min(len(i.d), mtu)
err := encodeData(w, i.c, i.d[:n])
if err == nil {
i.d, i.n = i.d[n:], i.n+n
if len(i.d) > 0 {
m.pushFront(i) // more to send later
continue
}
}
i.r <- sendDataResult{err, i.n}
}
}
}
}()
}
func (s *sender) end() {
close(s.stop)
<-s.done
}