blob: 996b5e5ecc8f3d020087a9d716bcd110b957d828 [file] [log] [blame]
/*
* Copyright 2018 The Kythe Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package riegeli
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"github.com/golang/protobuf/proto"
rmpb "kythe.io/third_party/riegeli/records_metadata_go_proto"
)
type reader struct {
r *chunkReader
metadata *rmpb.RecordsMetadata
recordReader recordReader
chunkSize int64
}
// RecordsMetadata implements part of the Reader interface.
func (r *reader) RecordsMetadata() (*rmpb.RecordsMetadata, error) {
if r.metadata == nil {
if err := r.ensureRecordReader(); err != nil && err != io.EOF {
return nil, err
} else if r.metadata == nil {
r.metadata = new(rmpb.RecordsMetadata)
}
}
return r.metadata, nil
}
// Next implements part of the Reader interface.
func (r *reader) Next() ([]byte, error) { return r.nextRecord() }
// NextProto implements part of the Reader interface.
func (r *reader) NextProto(msg proto.Message) error {
rec, err := r.Next()
if err != nil {
return err
}
return proto.Unmarshal(rec, msg)
}
// Position implements part of the Reader interface.
func (r *reader) Position() (RecordPosition, error) {
// Verify file before returning a position.
if _, err := r.RecordsMetadata(); err != nil {
return RecordPosition{}, fmt.Errorf("error verifying file: %v", err)
}
return RecordPosition{
ChunkBegin: r.r.Position(),
RecordIndex: int64(r.recordReader.Index()),
}, nil
}
// SeekToRecord implements part of the ReadSeeker interface.
func (r *reader) SeekToRecord(pos RecordPosition) error {
// Verify file before seeking.
if _, err := r.RecordsMetadata(); err != nil {
return fmt.Errorf("error verifying file: %v", err)
}
if r.r.Position() != pos.ChunkBegin {
// We're seeking outside of the current chunk.
if err := r.r.Seek(pos.ChunkBegin); err != nil {
return err
}
r.recordReader = nil
if err := r.ensureRecordReader(); err != nil {
return err
}
}
r.recordReader.Seek(int(pos.RecordIndex))
return nil
}
// Seek implements part of the ReadSeeker interface.
func (r *reader) Seek(pos int64) error {
// Verify file before seeking.
if _, err := r.RecordsMetadata(); err != nil {
return fmt.Errorf("error verifying file: %v", err)
}
if pos < r.r.Position() || pos >= r.r.Position()+r.chunkSize {
// We're seeking outside of the current chunk.
if err := r.r.SeekToChunkContaining(pos); err != nil && err != io.EOF {
return fmt.Errorf("failed to seek to enclosing chunk: %v", err)
}
r.recordReader = nil
if err := r.ensureRecordReader(); err == io.EOF {
// Seeking to the end of the file is allowed.
return nil
} else if err != nil {
return err
}
}
recordIndex := int(pos - r.r.Position())
r.recordReader.Seek(recordIndex)
return nil
}
func (r *reader) nextRecord() ([]byte, error) {
if err := r.ensureRecordReader(); err != nil {
return nil, err
}
return r.recordReader.Next()
}
func (r *reader) ensureRecordReader() error {
if r.recordReader != nil && r.recordReader.Len() == 0 {
r.recordReader = nil
}
for r.recordReader == nil {
c, chunkSize, err := r.r.Next()
if err != nil {
return err
} else if c.Header.NumRecords == 0 && c.Header.ChunkType != fileSignatureChunkType && c.Header.ChunkType != fileMetadataChunkType {
// ignore chunks with no records; even for unknown chunk types
continue
}
r.chunkSize = chunkSize
switch c.Header.ChunkType {
case fileSignatureChunkType:
// TODO(schroederc): verify once at beginning of reader
if err := verifySignature(c); err != nil {
return err
}
case fileMetadataChunkType:
rd, err := newTransposedRecordReader(c)
if err != nil {
return fmt.Errorf("bad transpose chunk: %v", err)
} else if rd.Len() != 1 {
return fmt.Errorf("didn't find single RecordsMetadata record: found %d", rd.Len())
}
rec, err := rd.Next()
if err != nil {
return fmt.Errorf("reading RecordsMetadata: %v", err)
}
r.metadata = new(rmpb.RecordsMetadata)
if err := proto.Unmarshal(rec, r.metadata); err != nil {
return fmt.Errorf("bad RecordsMetadata: %v", err)
}
case transposedChunkType:
r.recordReader, err = newTransposedRecordReader(c)
if err != nil {
return fmt.Errorf("bad transpose chunk: %v", err)
} else if uint64(r.recordReader.Len()) != c.Header.NumRecords {
return fmt.Errorf("mismatching number of transposed records: found: %d; expected: %d", r.recordReader.Len(), c.Header.NumRecords)
}
case recordChunkType:
r.recordReader, err = newRecordChunkReader(c)
if err != nil {
return fmt.Errorf("bad record chunk: %v", err)
} else if uint64(r.recordReader.Len()) != c.Header.NumRecords {
return fmt.Errorf("mismatching number of records: found: %d; expected: %d", r.recordReader.Len(), c.Header.NumRecords)
}
default:
return fmt.Errorf("unsupported read of chunk_type: '%s'", []byte{byte(c.Header.ChunkType)})
}
}
return nil
}
func verifySignature(c *chunk) error {
if c.Header != fileSignatureChunk.Header {
return fmt.Errorf("invalid file signature: %+v", c)
} else if len(c.Data) != 0 {
return fmt.Errorf("extraneous data with file signature: %q", c.Data)
}
return nil
}
// A recordReader reads a finite stream of records.
type recordReader interface {
// Next reads and returns the next record. io.EOF is returned if no further
// records exist.
Next() ([]byte, error)
// Len returns the number of records left to read.
Len() int
// Index returns the index of the record that will be returned by Next.
// Index() >= 0 && Index() <= Len ())
Index() int
// Seek positions the reader before the record at the given 0-based index.
Seek(index int)
}
type fixedRecordReader struct {
records [][]byte
index int
}
// Len implements part of the recordReader interface.
func (r *fixedRecordReader) Len() int { return len(r.records) - r.index }
// Next implements part of the recordReader interface.
func (r *fixedRecordReader) Next() ([]byte, error) {
if r.index >= len(r.records) {
return nil, io.EOF
}
rec := r.records[r.index]
r.index++
return rec, nil
}
// Index implements part of the recordReader interface.
func (r *fixedRecordReader) Index() int { return r.index }
// Seek implements part of the recordReader interface.
func (r *fixedRecordReader) Seek(index int) {
if index < 0 {
index = 0
} else if index >= len(r.records) {
index = len(r.records)
}
r.index = index
}
func newRecordChunkReader(c *chunk) (recordReader, error) {
rec, err := decodeRecordChunk(c)
if err != nil {
return nil, fmt.Errorf("decoding record chunk: %v", err)
}
sizesBuf := rec.CompressedSizes
valsBuf := rec.CompressedValues
// Decode sizes/values, if necessary
if rec.CompressionType != noCompression {
sizesDec, err := newDecompressor(bytes.NewReader(rec.CompressedSizes), rec.CompressionType)
if err != nil {
return nil, err
}
sizesBuf, err = ioutil.ReadAll(sizesDec)
if err != nil {
return nil, fmt.Errorf("error decompressing record sizes: %v", err)
} else if err := sizesDec.Close(); err != nil {
return nil, fmt.Errorf("error closing record sizes decompressor: %v", err)
}
valsDec, err := newDecompressor(bytes.NewReader(rec.CompressedValues), rec.CompressionType)
if err != nil {
return nil, err
}
valsBuf = make([]byte, c.Header.DecodedDataSize)
if _, err := io.ReadFull(valsDec, valsBuf); err != nil {
return nil, fmt.Errorf("error decompressing record values: %v", err)
} else if b, err := valsDec.ReadByte(); c.Header.DecodedDataSize != 0 && err == nil {
return nil, fmt.Errorf("read past end of expected record values buffer: %v %v", b, err)
} else if err := valsDec.Close(); err != nil {
return nil, fmt.Errorf("error closing record values decompressor: %v", err)
}
} else if uint64(len(valsBuf)) != c.Header.DecodedDataSize {
return nil, fmt.Errorf("bad uncompressed DecodedDataSize: %d vs %d", len(valsBuf), c.Header.DecodedDataSize)
}
sizes := bytes.NewReader(sizesBuf)
records := make([][]byte, 0, c.Header.NumRecords)
for i := 0; i < int(c.Header.NumRecords); i++ {
size, err := binary.ReadUvarint(sizes)
if err != nil {
return nil, fmt.Errorf("error reading record size: %v", err)
} else if size > uint64(len(valsBuf)) {
return nil, fmt.Errorf("not enough data for record of size %d; found %d bytes", size, len(valsBuf))
}
val := valsBuf[:size]
records = append(records, val)
valsBuf = valsBuf[size:]
}
if b, err := sizes.ReadByte(); err != io.EOF {
return nil, fmt.Errorf("trailing record size data: 0x%x (err: %v)", b, err)
} else if len(valsBuf) != 0 {
return nil, fmt.Errorf("trailing record value data: %v", valsBuf)
}
return &fixedRecordReader{records: records}, nil
}
type blockReader struct {
r io.ReadSeeker
buf *bytes.Reader
header *blockHeader
position int64
}
// Read implements the io.Reader interface by skipping over the interleaven
// block headers and sequentially reading chunk data.
func (b *blockReader) Read(bs []byte) (int, error) {
if b.buf == nil || b.buf.Len() == 0 {
block, err := b.Next()
if err != nil {
return 0, err
}
b.buf = bytes.NewReader(block)
}
n, err := b.buf.Read(bs)
b.position += int64(n)
return n, err
}
// Next reads the next full block of data.
func (b *blockReader) Next() ([]byte, error) {
var block [blockSize]byte
if n, err := io.ReadFull(b.r, block[:]); err == io.EOF {
return nil, io.EOF
} else if err != nil && err != io.ErrUnexpectedEOF {
return nil, fmt.Errorf("reading block: %v", err)
} else if n < blockHeaderSize {
return nil, fmt.Errorf("short read for block header: %d", n)
} else if hdr, err := decodeBlockHeader(bytes.NewReader(block[:blockHeaderSize])); err != nil {
// TODO(schroederc): recover to next block on failure here
return nil, fmt.Errorf("decoding block header: %v", err)
} else {
b.header = hdr
b.position += blockHeaderSize
return block[blockHeaderSize:n], nil
}
}
// Position returns the current position within the underlying ReadSeeker.
func (b *blockReader) Position() int64 {
// The blockReader bookkeeping only tracks positions outside of block headers.
// However, Riegeli considers the starting position of a chunk that begins
// immediately after a block header to be the start of the block.
if b.position%blockSize == blockHeaderSize {
return b.position - blockHeaderSize
}
return b.position
}
// Seek seeks to the the given position within the underlying ReadSeeker.
func (b *blockReader) Seek(pos int64) error {
blockStart := (pos / blockSize) * blockSize
if pos == blockStart {
pos = blockStart + blockHeaderSize
} else if pos-blockStart < blockHeaderSize {
return fmt.Errorf("attempting to seek into block header: %d", pos)
}
if err := b.readBlock(blockStart); err != nil {
return err
} else if _, err := b.buf.Seek(pos-(blockStart+blockHeaderSize), io.SeekStart); err != nil {
return err
}
b.position = pos
return nil
}
func (b *blockReader) readBlock(blockStart int64) error {
if b.buf != nil && b.position >= blockStart && b.position < blockStart+blockSize {
return nil
}
_, err := b.r.Seek(blockStart, io.SeekStart)
if err != nil {
return fmt.Errorf("failed to seek to beginning of block: %v", err)
}
b.position = blockStart
block, err := b.Next()
if err != nil {
return err
}
b.buf = bytes.NewReader(block)
return nil
}
// SeekToNextChunkInBlock seeks to the first chunk starting from the block
// starting at the given offset.
func (b *blockReader) SeekToNextChunkInBlock(blockStart int64) error {
var offset int64
for {
if err := b.readBlock(blockStart); err != nil {
return err
}
if b.header.PreviousChunk == 0 {
// Block starts with a chunk
offset = 0
} else {
// Block interrupts a chunk
offset = int64(b.header.NextChunk) - blockHeaderSize
}
if offset < usableBlockSize {
break
}
blockStart += blockSize
}
if _, err := b.buf.Seek(offset, io.SeekStart); err != nil {
return err
}
b.position = blockStart + blockHeaderSize + offset
return nil
}
// A chunkReader reads a sequential stream of chunks.
type chunkReader struct {
r *blockReader
position int64
}
// Next reads the next full chunk.
func (c *chunkReader) Next() (*chunk, int64, error) {
c.position = c.r.Position()
h, err := decodeChunkHeader(c.r)
if err == io.EOF {
return nil, 0, io.EOF
} else if err != nil {
return nil, 0, fmt.Errorf("reading chunk header: %v", err)
}
data := make([]byte, h.DataSize)
if h.DataSize > 0 {
if _, err := io.ReadFull(c.r, data); err != nil {
return nil, 0, fmt.Errorf("reading chunk data: %v", err)
}
}
if hash, expected := hashBytes(data), binary.LittleEndian.Uint64(h.DataHash[:]); hash != expected {
err = fmt.Errorf("chunk hash mismatch: 0x%x vs 0x%x", hash, expected)
}
chunkSize := chunkHeaderSize + int64(len(data))
if padding := paddingSize(int(c.position), h); padding > 0 {
if _, err = io.CopyN(ioutil.Discard, c.r, int64(padding)); err != nil {
err = fmt.Errorf("failed to discard padding: %v", err)
}
chunkSize += int64(padding)
}
blockHeaders := blockHeaderSize * int64(interveningBlockHeaders(int(c.position), int(chunkSize)))
chunkSize += blockHeaders
return &chunk{Header: *h, Data: data}, chunkSize, err
}
// Seek seeks to the chunk at the given position.
func (c *chunkReader) Seek(pos int64) error { return c.r.Seek(pos) }
// Seek seeks to the chunk that contains the given position.
func (c *chunkReader) SeekToChunkContaining(pos int64) error {
blockStart := (pos / blockSize) * blockSize
if err := c.r.SeekToNextChunkInBlock(blockStart); err != nil {
return err
}
if pos < c.r.Position() {
// Chunk starts in previous block
blockStart -= blockSize
if err := c.r.SeekToNextChunkInBlock(blockStart); err != nil {
return err
}
}
// Seek through chunks in block.
for {
c.position = c.r.Position()
h, err := decodeChunkHeader(c.r)
if err == io.EOF {
return io.EOF
} else if err != nil {
return fmt.Errorf("reading chunk header at %d: %v", c.position, err)
}
chunkSize := chunkHeaderSize + int64(h.DataSize) + int64(paddingSize(int(c.position), h))
blockHeaders := blockHeaderSize * int64(interveningBlockHeaders(int(c.position), int(chunkSize)))
chunkSize += blockHeaders
nextChunk := c.position + chunkSize
if pos < nextChunk {
// We're at the chunk containing the desired position.
break
} else if err := c.r.Seek(nextChunk); err != nil {
return fmt.Errorf("error seeking to next chunk: %v", err)
}
}
return c.r.Seek(c.position)
}
// Position returns the position of the current chunk.
func (c *chunkReader) Position() int64 { return c.position }
func decodeRecordChunk(c *chunk) (*recordChunk, error) {
r := bytes.NewReader(c.Data)
ct, err := r.ReadByte()
if err != nil {
return nil, fmt.Errorf("error reading compression type: %v", err)
}
css, err := binary.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("error reading CompressedSizesSize: %v", err)
}
i, _ := r.Seek(0, io.SeekCurrent)
if leftover := len(c.Data[i:]); leftover < int(css) {
return nil, fmt.Errorf("not enough data for CompressedSizes: %d < %d", leftover, css)
}
valsStart := i + int64(css)
rec := &recordChunk{
CompressionType: compressionType(ct),
CompressedSizes: c.Data[i:valsStart],
CompressedValues: c.Data[valsStart:],
}
return rec, nil
}
func decodeBlockHeader(r io.Reader) (*blockHeader, error) {
var buf [blockHeaderSize]byte
if _, err := io.ReadFull(r, buf[:]); err != nil {
return nil, err
}
headerHash := hashBytes(buf[8:])
h := &blockHeader{
PreviousChunk: binary.LittleEndian.Uint64(buf[8:16]),
NextChunk: binary.LittleEndian.Uint64(buf[16:24]),
}
if expected := binary.LittleEndian.Uint64(buf[:8]); expected != headerHash {
return h, fmt.Errorf("bad blockHeader hash: found %d; expected %d", headerHash, expected)
}
return h, nil
}
func decodeChunkHeader(r io.Reader) (*chunkHeader, error) {
var buf [chunkHeaderSize]byte
if _, err := io.ReadFull(r, buf[:]); err != nil {
return nil, err
}
headerHash := hashBytes(buf[8:])
h := &chunkHeader{
DataSize: binary.LittleEndian.Uint64(buf[8:16]),
ChunkType: chunkType(buf[24]),
NumRecords: binary.LittleEndian.Uint64(append(buf[25:32:32], 0x00)),
DecodedDataSize: binary.LittleEndian.Uint64(buf[32:40]),
}
copy(h.DataHash[:], buf[16:24])
if expected := binary.LittleEndian.Uint64(buf[:8]); expected != headerHash {
return h, fmt.Errorf("bad chunkHeader hash: found 0x%x; expected %x", headerHash, expected)
}
return h, nil
}