blob: 1e7f9f3a38796511fba2d900e21df618a9431a8b [file] [log] [blame]
// Copyright (C) 2016 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multiplexer
import (
"io"
"android.googlesource.com/platform/tools/gpu/framework/log"
)
type dispatcher struct {
requests chan interface{}
nextChannelID channelID
stop chan struct{}
done chan struct{}
}
type requestOpenChannel struct {
result chan resultOpenChannel
}
type resultOpenChannel struct {
channel *channel
err error
}
type requestDeleteChannel struct {
id channelID
result chan error
}
type channelMap map[channelID]*channel
func (m *channelMap) get(id channelID) (*channel, error) {
channel, found := (*m)[id]
if !found {
return nil, ErrChannelClosed
}
return channel, nil
}
func (m *channelMap) create(id channelID, multiplexer *Multiplexer) (*channel, error) {
channel := newChannel(id, multiplexer)
(*m)[id] = channel
return channel, nil // TODO: error
}
func (m *channelMap) delete(ctx log.Context, id channelID) error {
channel, err := m.get(id)
if err != nil {
return err
}
channel.stop(ctx)
delete(*m, id)
return err
}
// begin starts the dispatcher go-routine.
func (d *dispatcher) begin(ctx log.Context, m *Multiplexer) {
d.requests = make(chan interface{}, requestQueueLength)
d.stop = make(chan struct{})
d.done = make(chan struct{})
channels := channelMap{}
// start the dispatcher go routine.
go func() {
for {
select {
case <-d.stop:
// The dispatcher has been asked to stop.
// Stop all channels and tear-down.
for _, c := range channels {
c.stop(ctx)
}
d.requests = nil
close(d.done)
return
case req := <-d.requests:
// We have a new request.
switch req := req.(type) {
case requestOpenChannel:
id := d.nextChannelID.increment()
channel, err := channels.create(id, m)
req.result <- resultOpenChannel{channel, err}
case requestDeleteChannel:
req.result <- channels.delete(ctx, req.id)
}
case msg := <-m.receiver.msgs:
// We have a new message.
switch msg := msg.(type) {
case *msgOpenChannel:
channel, err := channels.create(remote(msg.channelID), m)
if err == nil {
go m.channelOpenedCallback(ctx, channel)
}
case *msgCloseChannel:
channels.delete(ctx, remote(msg.channelID))
case *msgData:
if channel, err := channels.get(remote(msg.channelID)); err == nil {
channel.receive(msg.data)
} else {
// Likely this channel was closed this side, and we're receiving data
// that should be dropped on the floor.
}
}
case err := <-m.receiver.errs:
if err == io.EOF {
go m.Close(ctx)
} else {
ctx.Fail(err, "Multiplexer receive error")
}
}
}
}()
}
// end stops the dispatcher, blocking until the dispatcher and all sockets are
// stopped.
func (d *dispatcher) end() {
close(d.stop)
<-d.done
}
// createChannel enqueues the creation of a new channel on the dispatcher
// go-routine.
func (d *dispatcher) createChannel() (*channel, error) {
request := requestOpenChannel{result: make(chan resultOpenChannel)}
select {
case <-d.stop:
return nil, ErrMultiplexerClosed
case d.requests <- request:
}
select {
case <-d.stop:
return nil, ErrMultiplexerClosed
case result := <-request.result:
return result.channel, result.err
}
}
// deleteChannel enqueues the destruction of a new channel on the dispacher
// go-routine.
func (d *dispatcher) deleteChannel(id channelID) error {
request := requestDeleteChannel{id: id, result: make(chan error)}
select {
case <-d.stop:
return ErrMultiplexerClosed
case d.requests <- request:
}
select {
case <-d.stop:
return ErrMultiplexerClosed
case result := <-request.result:
return result
}
}