| // Copyright 2016 syzkaller project authors. All rights reserved. |
| // Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. |
| |
| package vmimpl |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "sync" |
| ) |
| |
| type OutputMerger struct { |
| Output chan []byte |
| Err chan error |
| teeMu sync.Mutex |
| tee io.Writer |
| wg sync.WaitGroup |
| } |
| |
| type MergerError struct { |
| Name string |
| R io.ReadCloser |
| Err error |
| } |
| |
| func (err MergerError) Error() string { |
| return fmt.Sprintf("failed to read from %v: %v", err.Name, err.Err) |
| } |
| |
| func NewOutputMerger(tee io.Writer) *OutputMerger { |
| return &OutputMerger{ |
| Output: make(chan []byte, 1000), |
| Err: make(chan error, 1), |
| tee: tee, |
| } |
| } |
| |
| func (merger *OutputMerger) Wait() { |
| merger.wg.Wait() |
| close(merger.Output) |
| } |
| |
| func (merger *OutputMerger) Add(name string, r io.ReadCloser) { |
| merger.AddDecoder(name, r, nil) |
| } |
| |
| func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser, |
| decoder func(data []byte) (start, size int, decoded []byte)) { |
| merger.wg.Add(1) |
| go func() { |
| var pending []byte |
| var proto []byte |
| var buf [4 << 10]byte |
| for { |
| n, err := r.Read(buf[:]) |
| if n != 0 { |
| if decoder != nil { |
| proto = append(proto, buf[:n]...) |
| start, size, decoded := decoder(proto) |
| proto = proto[start+size:] |
| if len(decoded) != 0 { |
| merger.Output <- decoded // note: this can block |
| } |
| } |
| pending = append(pending, buf[:n]...) |
| if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 { |
| out := pending[:pos+1] |
| if merger.tee != nil { |
| merger.teeMu.Lock() |
| merger.tee.Write(out) |
| merger.teeMu.Unlock() |
| } |
| select { |
| case merger.Output <- append([]byte{}, out...): |
| r := copy(pending[:], pending[pos+1:]) |
| pending = pending[:r] |
| default: |
| } |
| } |
| } |
| if err != nil { |
| if len(pending) != 0 { |
| pending = append(pending, '\n') |
| if merger.tee != nil { |
| merger.teeMu.Lock() |
| merger.tee.Write(pending) |
| merger.teeMu.Unlock() |
| } |
| select { |
| case merger.Output <- pending: |
| default: |
| } |
| } |
| r.Close() |
| select { |
| case merger.Err <- MergerError{name, r, err}: |
| default: |
| } |
| merger.wg.Done() |
| return |
| } |
| } |
| }() |
| } |