| // Copyright (C) 2016 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package multiplexer |
| |
| import ( |
| "io" |
| |
| "android.googlesource.com/platform/tools/gpu/framework/log" |
| ) |
| |
| type dispatcher struct { |
| requests chan interface{} |
| nextChannelID channelID |
| stop chan struct{} |
| done chan struct{} |
| } |
| |
| type requestOpenChannel struct { |
| result chan resultOpenChannel |
| } |
| type resultOpenChannel struct { |
| channel *channel |
| err error |
| } |
| |
| type requestDeleteChannel struct { |
| id channelID |
| result chan error |
| } |
| |
| type channelMap map[channelID]*channel |
| |
| func (m *channelMap) get(id channelID) (*channel, error) { |
| channel, found := (*m)[id] |
| if !found { |
| return nil, ErrChannelClosed |
| } |
| return channel, nil |
| } |
| |
| func (m *channelMap) create(id channelID, multiplexer *Multiplexer) (*channel, error) { |
| channel := newChannel(id, multiplexer) |
| (*m)[id] = channel |
| return channel, nil // TODO: error |
| } |
| |
| func (m *channelMap) delete(ctx log.Context, id channelID) error { |
| channel, err := m.get(id) |
| if err != nil { |
| return err |
| } |
| channel.stop(ctx) |
| delete(*m, id) |
| return err |
| } |
| |
| // begin starts the dispatcher go-routine. |
| func (d *dispatcher) begin(ctx log.Context, m *Multiplexer) { |
| d.requests = make(chan interface{}, requestQueueLength) |
| d.stop = make(chan struct{}) |
| d.done = make(chan struct{}) |
| |
| channels := channelMap{} |
| |
| // start the dispatcher go routine. |
| go func() { |
| for { |
| select { |
| case <-d.stop: |
| // The dispatcher has been asked to stop. |
| // Stop all channels and tear-down. |
| for _, c := range channels { |
| c.stop(ctx) |
| } |
| d.requests = nil |
| close(d.done) |
| return |
| |
| case req := <-d.requests: |
| // We have a new request. |
| switch req := req.(type) { |
| case requestOpenChannel: |
| id := d.nextChannelID.increment() |
| channel, err := channels.create(id, m) |
| req.result <- resultOpenChannel{channel, err} |
| |
| case requestDeleteChannel: |
| req.result <- channels.delete(ctx, req.id) |
| } |
| |
| case msg := <-m.receiver.msgs: |
| // We have a new message. |
| switch msg := msg.(type) { |
| case *msgOpenChannel: |
| channel, err := channels.create(remote(msg.channelID), m) |
| if err == nil { |
| go m.channelOpenedCallback(ctx, channel) |
| } |
| |
| case *msgCloseChannel: |
| channels.delete(ctx, remote(msg.channelID)) |
| |
| case *msgData: |
| if channel, err := channels.get(remote(msg.channelID)); err == nil { |
| channel.receive(msg.data) |
| } else { |
| // Likely this channel was closed this side, and we're receiving data |
| // that should be dropped on the floor. |
| } |
| } |
| |
| case err := <-m.receiver.errs: |
| if err == io.EOF { |
| go m.Close(ctx) |
| } else { |
| ctx.Fail(err, "Multiplexer receive error") |
| } |
| } |
| } |
| }() |
| } |
| |
| // end stops the dispatcher, blocking until the dispatcher and all sockets are |
| // stopped. |
| func (d *dispatcher) end() { |
| close(d.stop) |
| <-d.done |
| } |
| |
| // createChannel enqueues the creation of a new channel on the dispatcher |
| // go-routine. |
| func (d *dispatcher) createChannel() (*channel, error) { |
| request := requestOpenChannel{result: make(chan resultOpenChannel)} |
| select { |
| case <-d.stop: |
| return nil, ErrMultiplexerClosed |
| case d.requests <- request: |
| } |
| select { |
| case <-d.stop: |
| return nil, ErrMultiplexerClosed |
| case result := <-request.result: |
| return result.channel, result.err |
| } |
| } |
| |
| // deleteChannel enqueues the destruction of a new channel on the dispacher |
| // go-routine. |
| func (d *dispatcher) deleteChannel(id channelID) error { |
| request := requestDeleteChannel{id: id, result: make(chan error)} |
| select { |
| case <-d.stop: |
| return ErrMultiplexerClosed |
| case d.requests <- request: |
| } |
| select { |
| case <-d.stop: |
| return ErrMultiplexerClosed |
| case result := <-request.result: |
| return result |
| } |
| } |