blob: 32746bcfcec97b75d4de18e17a8fa12567f6501d [file] [log] [blame]
// Copyright (C) 2015 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"time"
"android.googlesource.com/platform/tools/gpu/binary"
"android.googlesource.com/platform/tools/gpu/log"
)
type span struct {
offset int64
size int64
}
func (s *span) decode(d *binary.Decoder) (err error) {
s.offset, err = d.Int64()
if err != nil {
return err
}
s.size, err = d.Int64()
if err != nil {
return err
}
return nil
}
func (s span) encode(e *binary.Encoder) error {
if err := e.Int64(s.offset); err != nil {
return err
}
return e.Int64(s.size)
}
type record struct {
id binary.ID
span span
}
func (s *record) decode(d *binary.Decoder) (err error) {
if err := s.id.Decode(d); err != nil {
return err
}
return s.span.decode(d)
}
func (r record) encode(e *binary.Encoder) error {
if err := r.id.Encode(e); err != nil {
return err
}
return r.span.encode(e)
}
type compactingArchive struct {
worker chan<- func()
workerSync chan<- func()
index *os.File
data *os.File
records map[binary.ID]span
waste int64
size int64
path string
compactionSize int64
insertCh chan<- keyValue
compacting bool
}
const (
indexFilename = "index"
dataFilename = "data"
compactingDataFilename = "compacting_" + dataFilename
compactingIndexFilename = "compacting_" + indexFilename
oldIndexFilename = "old_" + indexFilename
oldDataFilename = "old_" + dataFilename
)
func exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}
// CreateCompactingArchive returns a Store that attempts to reclaim space by
// dropping old resources that can be rebuilt on demand, and compacting the
// remaining resources to reclaim space.
func CreateCompactingArchive(path string) Store {
os.MkdirAll(path, 0755)
iFn := filepath.Join(path, indexFilename)
dFn := filepath.Join(path, dataFilename)
ciFn := filepath.Join(path, compactingIndexFilename)
cdFn := filepath.Join(path, compactingDataFilename)
oiFn := filepath.Join(path, oldIndexFilename)
odFn := filepath.Join(path, oldDataFilename)
// As flip can not be atomic. We follow these are the rules need to recover
// in the case that we discover an incomplete flip.
// If the index file is present then data is in either data or data_old
// If not then if the data file is present the index is compacting_index
// If neither exist then the data is in index_old and data_old
// Otherwise create the files
index, err := os.OpenFile(iFn, os.O_APPEND|os.O_RDWR, 0666)
if err != nil && !os.IsNotExist(err) {
if err != nil {
panic(err)
}
}
var data *os.File
if err == nil {
// The index file is present. The data is in either data or data_old
data, err = os.OpenFile(dFn, os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
if !os.IsNotExist(err) {
if err != nil {
panic(err)
}
}
// The data does not exist, try data_old
err = os.Rename(odFn, dFn)
if err != nil {
panic(err)
}
data, err = os.OpenFile(dFn, os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
}
// Both files are open
} else {
// The index is not present, check to see if the data file is.
data, err = os.OpenFile(dFn, os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
if !os.IsNotExist(err) {
if err != nil {
panic(err)
}
}
// Neither index nor data file exist. Check for index_old and data_old.
if exists(odFn) && exists(oiFn) {
// Both index_old and data_old exist. Rename them, index file first.
err = os.Rename(oiFn, iFn)
if err != nil {
panic(err)
}
err = os.Rename(odFn, dFn)
if err != nil {
panic(err)
}
index, err = os.OpenFile(iFn, os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
data, err = os.OpenFile(dFn, os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
} else {
// Create both files
index, err = os.OpenFile(iFn, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
data, err = os.OpenFile(dFn, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
}
// Both files are open
} else {
// The data file exists. Move the index from compacting_index.
err = os.Rename(ciFn, iFn)
if err != nil {
panic(err)
}
index, err = os.OpenFile(iFn, os.O_APPEND|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
// Both files are open
}
}
// Both files are open
// Tidy up
os.Remove(oiFn)
os.Remove(odFn)
os.Remove(ciFn)
os.Remove(cdFn)
waste := int64(0)
size := int64(0)
records := make(map[binary.ID]span)
// Use a buffered reader to quickly read the archive records
d := binary.NewDecoder(bufio.NewReaderSize(index, 256<<10))
for {
var r record
if err := r.decode(d); err != io.EOF {
if err != nil {
panic(err)
}
if prevSpan, ok := records[r.id]; ok {
// Archive contains multiple records with the same id.
waste += prevSpan.size
}
if r.span.size != 0 {
size += r.span.size
records[r.id] = r.span
}
} else {
break
}
}
worker := make(chan func(), 64)
workerSync := make(chan func())
go func() {
for worker != nil || workerSync != nil {
select {
case f, ok := <-worker:
if ok {
f()
} else {
worker = nil
}
case f, ok := <-workerSync:
if ok {
f()
} else {
workerSync = nil
}
}
}
}()
return &compactingArchive{
worker: worker,
workerSync: workerSync,
index: index,
data: data,
records: records,
waste: waste,
size: size,
path: path,
insertCh: nil,
compacting: false,
}
}
// Store compliance
func (s *compactingArchive) Store(id binary.ID, _ binary.Object, data []byte, logger log.Logger) (err error) {
if logger != nil {
logger = logger.Enter("Archive.Store")
logger.Info("id: %s size: %d", id, len(data))
}
s.worker <- func() {
if prevRecord, ok := s.records[id]; ok {
// Archive contains multiple records with the same id.
s.waste += prevRecord.size
}
offset, err := s.data.Seek(0, os.SEEK_END)
if err != nil {
panic(err)
}
n, err := s.data.Write(data)
if n != len(data) {
panic(fmt.Errorf("Byte count written was not as expected (%d/%d), error: %v.", n, len(data), err))
}
if err != nil {
panic(err)
}
record := record{
id: id,
span: span{
offset: offset,
size: int64(len(data)),
},
}
e := binary.NewEncoder(s.index)
record.encode(e)
s.records[id] = record.span
if s.size > s.compactionSize && !s.compacting && s.waste*3 > s.compactionSize {
if logger != nil {
logger.Info("Archive starting a compaction. Size %v Waste %v",
s.size, s.waste)
}
s.compaction(logger)
}
// If compaction is in-flight send the record to the compaction goroutine.
if s.insertCh != nil {
kv := keyValue{id, data}
s.insertCh <- kv
if logger != nil {
logger.Info("Insert sent to compactor. Size %v", len(data))
}
}
}
return nil
}
func (s *compactingArchive) Load(id binary.ID, logger log.Logger, out binary.Object) (size int, err error) {
if logger != nil {
logger = logger.Enter("Archive.Load")
logger.Info("Loading from archive: %s", id)
defer func() {
logger.Info("↪ size: %d, err: %v", size, err)
if err := recover(); err != nil {
logger.Error("Panic when loading %v: %v", id, err)
panic(err)
}
}()
}
done := make(chan struct{})
s.worker <- func() {
defer close(done)
span, found := s.records[id]
if !found {
size, err = 0, fmt.Errorf("Resource %v not found", id)
return
}
data := make([]byte, span.size)
if n, e := s.data.ReadAt(data, span.offset); n != len(data) {
size, err = 0, e
return
}
d := binary.NewDecoder(bytes.NewBuffer(data))
size, err = len(data), out.Decode(d)
return
}
<-done
return
}
func (s *compactingArchive) Contains(id binary.ID) (found bool) {
done := make(chan struct{})
s.worker <- func() {
_, found = s.records[id]
close(done)
}
<-done
return
}
func (s *compactingArchive) Close() {
done := make(chan struct{})
s.worker <- func() {
s.index.Close()
s.data.Close()
close(done)
close(s.worker)
close(s.workerSync)
}
<-done
}
func (s *compactingArchive) Delete(id binary.ID, logger log.Logger) {
done := make(chan struct{})
s.worker <- func() {
if prev, found := s.records[id]; found {
delete(s.records, id)
record := record{
id: id,
span: span{
offset: 0,
size: 0,
},
}
s.waste += int64(prev.size)
e := binary.NewEncoder(s.index)
record.encode(e)
// If compaction is in-flight send the record to the compaction goroutine.
if s.insertCh != nil {
empty := make([]byte, 0)
kv := keyValue{id, empty}
s.insertCh <- kv
if logger != nil {
logger.Info("Delete sent to compactor.")
}
}
}
close(done)
}
<-done
return
}
func (s *compactingArchive) compaction(l log.Logger) error {
if s.compacting {
return fmt.Errorf("compaction() call when compaction in progress")
}
s.compacting = true
cdFn := filepath.Join(s.path, compactingDataFilename)
dcompacting, err := os.OpenFile(cdFn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
s.compacting = false
return err
}
ciFn := filepath.Join(s.path, compactingIndexFilename)
icompacting, err := os.OpenFile(ciFn, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if err != nil {
s.compacting = false
return err
}
// We compact the file whilst allowing concurrent updates. Synchronous
// operations need to happen on the worker and therefore need to be
// relatively fast, but IO is allowed. The bulk of the compacting IO happens
// in a separate goroutine. This is what happens:
// - In worker: create a channel for concurrent updates.
// - In worker: make an in-memory copy of the index in an array.
// - In worker: start compactor go routine.
// - In compactor: load each record from the data file and write to compacting_data.
// - In compactor: write span to compacting_index.
// - In compactor: whilst outputting records allow updates from channel.
// When finished outputting:
// - In compactor: synchronously send the completion function to the worker.
// - In compactor: continue to process inserts until synchronous send is possible.
// - In worker: process any remaining inserts.
// - In worker: flip: close both
// - In worker: flip: close both files, rename the originals "_old".
// Rename the new files to replace them. Get rid of the originals.
// - In worker: put everything back to normal.
// Create a channel for concurrent updates.
insertCh := make(chan keyValue, 64)
s.insertCh = insertCh
// Make an in-memory copy of the records. By copying into an array we can manage
// about 20k records per millisecond. Note archives expected to have large
// data files rather than large indexes. Consider maintaining an offset ordered
// array to avoid
// the need for this copy.
beforeCopyRecords := time.Now()
recordsCopyArray := make([]record, len(s.records))
i := 0
for id, span := range s.records {
recordsCopyArray[i] = record{id, span}
i++
}
afterCopyRecords := time.Now()
// Start compactor go routine.
go func() {
// Output records using buffered IO.
iwriter := bufio.NewWriterSize(icompacting, 256<<10)
dwriter := bufio.NewWriterSize(dcompacting, 256<<10)
ie := binary.NewEncoder(iwriter)
size := int64(0)
waste := int64(0)
numInserts := 0
sizeInserts := 0
beforeWriteRecords := time.Now()
recordsWritten := make(map[binary.ID]int64)
didFirstInsertPath := false
didSecondInsertPath := false
didThirdInsertPath := false
offset := int64(0)
insertFun := func(insert keyValue) {
if prevSize, ok := recordsWritten[insert.id]; ok {
// As new records can be added we need to keep track of waste
waste += int64(prevSize)
}
bufSize := int64(0)
if len(insert.buffer) != 0 {
bufSize = int64(len(insert.buffer))
n, err := dwriter.Write(insert.buffer)
if err != nil {
panic(err)
}
if n != len(insert.buffer) {
panic(fmt.Errorf("Byte count written was not as expected (%d/%d), error: %v.", n, len(insert.buffer), err))
}
}
record := record{
id: insert.id,
span: span{
offset: offset,
size: bufSize,
},
}
offset += record.span.size
err = record.encode(ie)
if err != nil {
panic(err)
}
size += bufSize
recordsWritten[insert.id] = bufSize
numInserts++
sizeInserts += len(insert.buffer)
}
cleanupFun := func() {
err := dcompacting.Close()
if err != nil && l != nil {
l.Warning("Failed to close compacting data file")
}
err = icompacting.Close()
if err != nil && l != nil {
l.Warning("Failed to close compacting index file")
}
err = os.Remove(cdFn)
if err != nil && l != nil {
l.Warning("Failed to remove compacting data file")
}
err = os.Remove(ciFn)
if err != nil && l != nil {
l.Warning("Failed to remove compacting index file")
}
}
for _, record := range recordsCopyArray {
data := make([]byte, record.span.size)
if record.span.size != 0 {
if n, err := s.data.ReadAt(data, record.span.offset); n != len(data) {
if err != nil {
panic(err)
}
}
}
insertFun(keyValue{record.id, data})
// Whilst outputting records allow updates from channel.
// In the case of a rapid sustained insertion this loop may never
// terminate. However, there are non-buffered writes done in the worker
// thread which should be slower, making this unlikely. Also if there
// is rapid sustained insertion it will fill the drive at some point
// regardless of compaction.
loop:
for {
select {
case insert, ok := <-insertCh:
if !ok {
cleanupFun()
return
}
insertFun(insert)
didFirstInsertPath = true
default:
// nothing to insert right now
break loop
}
}
}
// All the records are written, but more inserts may still arrive.
afterWriteRecords := time.Now()
completionFun := func() {
// Process any remaining inserts.
startSyncPart := time.Now()
close(insertCh)
s.insertCh = nil
// The other end of the channel is now closed, but there may still
// be messages to process.
for insert := range insertCh {
insertFun(insert)
didSecondInsertPath = true
}
beforeFlush := time.Now()
err := dwriter.Flush()
if err != nil {
panic(err)
}
err = iwriter.Flush()
if err != nil {
panic(err)
}
afterFlush := time.Now()
// Flip: close both files, rename the originals "_old".
// Rename the new files to replace them. Get rid of the originals.
err = icompacting.Close()
if err != nil {
panic(err)
}
err = dcompacting.Close()
if err != nil {
panic(err)
}
afterCatchup := time.Now()
dFn := filepath.Join(s.path, dataFilename)
iFn := filepath.Join(s.path, indexFilename)
odFn := filepath.Join(s.path, oldDataFilename)
oiFn := filepath.Join(s.path, oldIndexFilename)
// If the index file is present then data is in either data or data_old
// If not then if the data file is present the index is compacting_index
// If neither exist then the data is in index_old and data_old
err = os.Rename(dFn, odFn)
if err != nil {
panic(err)
}
err = os.Rename(iFn, oiFn)
if err != nil {
panic(err)
}
err = os.Rename(cdFn, dFn)
if err != nil {
panic(err)
}
err = os.Rename(ciFn, iFn)
if err != nil {
panic(err)
}
err = os.Remove(odFn)
if err != nil {
panic(err)
}
err = os.Remove(oiFn)
if err != nil {
panic(err)
}
data, err := os.OpenFile(dFn, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
index, err := os.OpenFile(iFn, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
panic(err)
}
// Put everything back to normal.
s.data = data
s.index = index
s.waste = waste
s.size = size
s.compacting = false
afterAll := time.Now()
if l != nil {
l = l.Enter("Compaction")
l.Info("After compacting Waste %v Size %v", s.waste, s.size)
l.Info("Write records to disk %v", afterWriteRecords.Sub(beforeWriteRecords))
l.Info("Num concurrent inserts %v size of inserts %v", numInserts, sizeInserts)
l.Info("Sync with worker %v", startSyncPart.Sub(afterWriteRecords))
l.Info("Catchup time %v", afterCatchup.Sub(startSyncPart))
l.Info("Flush to disk %v", afterFlush.Sub(beforeFlush))
l.Info("Flip and open %v", afterAll.Sub(afterCatchup))
l.Info("Total sync part %v", afterAll.Sub(startSyncPart))
l.Info("Total time %v", afterAll.Sub(beforeWriteRecords))
l.Info("didFirstInsertPath %v", didFirstInsertPath)
l.Info("didSecondInsertPath %v", didSecondInsertPath)
l.Info("didThirdInsertPath %v", didThirdInsertPath)
}
}
// Note this code executes before the completionFun above.
loop2:
for {
select {
case s.workerSync <- completionFun:
break loop2
// Keep draining the insertCh. As the completionFun is run using a
// synchronous channel we know that no further insertions can be added
// and that therefore deadlock can not happen.
case insert, ok := <-insertCh:
if !ok {
cleanupFun()
return
}
insertFun(insert)
didThirdInsertPath = true
}
}
}()
if l != nil {
l.Info("Compacting copy %v records: %v", len(recordsCopyArray), afterCopyRecords.Sub(beforeCopyRecords))
}
return err
}