blob: b787f7dd59e7bc1de417df4b47c27fe5432904b6 [file] [log] [blame]
// Copyright (C) 2016 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multiplexer
import (
"io"
"sync"
"android.googlesource.com/platform/tools/gpu/framework/task"
)
type channel struct {
id channelID
next chan []byte
current []byte
shutdown task.Signal
readLock sync.Mutex
stop task.Task
multiplexer *Multiplexer
}
func newChannel(id channelID, m *Multiplexer) *channel {
shutdown, stop := task.NewSignal()
return &channel{
id: id,
next: make(chan []byte),
shutdown: shutdown,
stop: task.Once(stop),
multiplexer: m,
}
}
func (c *channel) receive(p []byte) {
select {
case <-c.shutdown:
case c.next <- p:
}
}
func (c *channel) Close() error {
if c.shutdown.Fired() {
return ErrChannelClosed
}
c.stop(nil)
c.multiplexer.dispatcher.deleteChannel(c.id)
return c.multiplexer.sender.sendCloseChannel(c.id)
}
func (c *channel) Read(p []byte) (n int, err error) {
c.readLock.Lock()
defer c.readLock.Unlock()
for len(p) > 0 {
if len(c.current) == 0 {
select {
case <-c.shutdown:
return n, io.EOF
case c.current = <-c.next:
}
}
bytes := copy(p, c.current)
p, c.current = p[bytes:], c.current[bytes:]
n += bytes
}
return n, nil
}
func (c *channel) Write(p []byte) (n int, err error) {
if c.shutdown.Fired() {
return 0, ErrChannelClosed
}
return c.multiplexer.sender.sendData(c.id, p)
}