| // Copyright (C) 2016 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package multiplexer |
| |
| import ( |
| "io" |
| |
| "android.googlesource.com/platform/tools/gpu/framework/binary" |
| "android.googlesource.com/platform/tools/gpu/framework/binary/vle" |
| "android.googlesource.com/platform/tools/gpu/framework/log" |
| ) |
| |
| type sender struct { |
| queue chan sendItem |
| stop chan struct{} |
| done chan struct{} |
| } |
| |
| type sendCloseChannel struct { |
| c channelID |
| r chan<- error |
| } |
| |
| type sendOpenChannel struct { |
| c channelID |
| r chan<- error |
| } |
| |
| type sendData struct { |
| c channelID |
| d []byte |
| r chan<- sendDataResult |
| n int // bytes written |
| } |
| |
| type sendDataResult struct { |
| e error |
| n int |
| } |
| |
| func min(a, b int) int { |
| if a < b { |
| return a |
| } |
| return b |
| } |
| |
| func (m sendCloseChannel) channel() channelID { return m.c } |
| func (m sendOpenChannel) channel() channelID { return m.c } |
| func (m sendData) channel() channelID { return m.c } |
| |
| type sendItem interface { |
| channel() channelID |
| } |
| |
| func encodeOpenChannel(w binary.Writer, c channelID) error { |
| w.Simple(msgTypeOpenChannel) |
| w.Simple(&msgOpenChannel{channelID: c}) |
| return w.Error() |
| } |
| |
| func encodeCloseChannel(w binary.Writer, c channelID) error { |
| w.Simple(msgTypeCloseChannel) |
| w.Simple(&msgCloseChannel{channelID: c}) |
| return w.Error() |
| } |
| |
| func encodeData(w binary.Writer, c channelID, d []byte) error { |
| w.Simple(msgTypeData) |
| w.Simple(&msgData{channelID: c, data: d}) |
| return w.Error() |
| } |
| |
| func (s *sender) sendCloseChannel(c channelID) error { |
| r := make(chan error) |
| if err := s.enqueue(sendCloseChannel{c, r}); err != nil { |
| return err |
| } |
| select { |
| case err := <-r: |
| return err |
| case <-s.stop: |
| return ErrMultiplexerClosed |
| } |
| } |
| |
| func (s *sender) sendOpenChannel(c channelID) error { |
| r := make(chan error) |
| if err := s.enqueue(sendOpenChannel{c, r}); err != nil { |
| return err |
| } |
| select { |
| case err := <-r: |
| return err |
| case <-s.stop: |
| return ErrMultiplexerClosed |
| } |
| } |
| |
| func (s *sender) sendData(c channelID, d []byte) (int, error) { |
| r := make(chan sendDataResult) |
| if err := s.enqueue(sendData{c, d, r, 0}); err != nil { |
| return 0, err |
| } |
| select { |
| case res := <-r: |
| return res.n, res.e |
| case <-s.stop: |
| return 0, ErrMultiplexerClosed |
| } |
| } |
| |
| func (s *sender) enqueue(item sendItem) error { |
| select { |
| case s.queue <- item: |
| return nil |
| case <-s.stop: |
| return ErrMultiplexerClosed |
| } |
| } |
| |
| func (s *sender) begin(ctx log.Context, mtu int, out io.Writer) { |
| if s.queue != nil { |
| panic("sender.begin() called twice") |
| } |
| |
| c, done, stop := make(chan sendItem, sendQueueLength), make(chan struct{}), make(chan struct{}) |
| s.queue, s.done, s.stop = c, done, stop |
| |
| go func() { |
| defer close(done) |
| w := vle.Writer(out) |
| m := sendMap{} |
| for { |
| if len(m) == 0 { |
| // If there's nothing being worked on, block until we have something. |
| select { |
| case item := <-c: |
| m.pushBack(item) |
| case <-stop: |
| return |
| } |
| } else { |
| // If we're busy, grab more work only if there's something there. |
| select { |
| case item := <-c: |
| m.pushBack(item) |
| case <-stop: |
| return |
| default: |
| } |
| } |
| |
| for s := range m { // Send something on each of the pending channels... |
| switch i := m.popFront(s).(type) { |
| case sendOpenChannel: |
| i.r <- encodeOpenChannel(w, i.c) |
| case sendCloseChannel: |
| i.r <- encodeCloseChannel(w, i.c) |
| case sendData: |
| n := min(len(i.d), mtu) |
| err := encodeData(w, i.c, i.d[:n]) |
| if err == nil { |
| i.d, i.n = i.d[n:], i.n+n |
| if len(i.d) > 0 { |
| m.pushFront(i) // more to send later |
| continue |
| } |
| } |
| i.r <- sendDataResult{err, i.n} |
| } |
| } |
| } |
| }() |
| } |
| |
| func (s *sender) end() { |
| close(s.stop) |
| <-s.done |
| } |