blob: a46db62c6a952d7c385f99614e67e727a234573c [file] [log] [blame]
/*
* Copyright 2018 The Kythe Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package riegeli
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"github.com/golang/protobuf/proto"
rmpb "kythe.io/third_party/riegeli/records_metadata_go_proto"
)
// https://github.com/google/riegeli/blob/master/doc/riegeli_records_file_format.md#file-signature
var fileSignatureChunk = &chunk{Header: chunkHeader{ChunkType: fileSignatureChunkType}}
func init() {
binary.LittleEndian.PutUint64(fileSignatureChunk.Header.DataHash[:], hashBytes(fileSignatureChunk.Data))
}
func (w *Writer) ensureFileHeader() error {
if w.fileHeaderWritten {
return nil
}
_, err := fileSignatureChunk.WriteTo(w.w, w.w.pos)
if err != nil {
return err
}
opts := w.opts.String()
if opts != "" {
rw, err := newTransposeChunkWriter(w.opts)
tw := &talliedRecordWriter{recordWriter: rw}
if err != nil {
return err
} else if _, err := tw.PutProto(&rmpb.RecordsMetadata{
// TODO(schroederc): add support for full RecordsMetadata
RecordWriterOptions: proto.String(opts),
}); err != nil {
return err
}
data, err := tw.Encode()
if err != nil {
return err
}
chunk := &chunk{
Header: chunkHeader{
ChunkType: fileMetadataChunkType,
DataSize: uint64(len(data)),
DecodedDataSize: tw.decodedSize,
},
Data: data,
}
if _, err := chunk.WriteTo(w.w, w.w.pos); err != nil {
return err
}
}
w.fileHeaderWritten = true
return nil
}
func (w *Writer) setupRecordWriter() error {
var (
rw recordWriter
err error
)
if w.opts.transpose() {
rw, err = newTransposeChunkWriter(w.opts)
} else {
rw, err = newRecordChunkWriter(w.opts)
}
if err != nil {
return err
}
w.recordWriter = &talliedRecordWriter{recordWriter: rw}
return nil
}
func (w *Writer) flushRecord() error {
if w.recordWriter == nil || w.recordWriter.numRecords == 0 {
// Skip writing empty record chunk.
return nil
}
data, err := w.recordWriter.Encode()
if err != nil {
return fmt.Errorf("encoding record chunk: %v", err)
}
chunkType := recordChunkType
if w.opts.transpose() {
chunkType = transposedChunkType
}
chunk := &chunk{
Header: chunkHeader{
ChunkType: chunkType,
DataSize: uint64(len(data)),
DecodedDataSize: w.recordWriter.decodedSize,
NumRecords: w.recordWriter.numRecords,
},
Data: data,
}
if _, err := chunk.WriteTo(w.w, w.w.pos); err != nil {
return err
}
return w.setupRecordWriter()
}
// A blockWriter interleaves blockHeaders inside chunks of data. Each
// blockHeader interrupts a single chunk, providing both its relative starting
// and ending positions.
type blockWriter struct {
w io.Writer
pos int
}
// WriteChunk writes a single chunk with interleaving blockHeaders written at
// every 64KiB boundary of the underlying io.Writer.
func (b *blockWriter) WriteChunk(chunk []byte) (n int, err error) {
if len(chunk) == 0 {
return 0, errors.New("zero-sized chunk")
}
nextBlock := ((b.pos + blockSize - 1) / blockSize) * blockSize
remainingBlockSize := nextBlock - b.pos
// The chunk fits entirely within the current block; write it and return.
if remainingBlockSize >= len(chunk) {
n, err = b.w.Write(chunk)
b.pos += n
return
}
blockHeaders := interveningBlockHeaders(b.pos, len(chunk))
chunkStart, chunkEnd := b.pos, b.pos+len(chunk)+blockHeaders*blockHeaderSize
// Fill up the current block with as much of the chunk as possible.
if remainingBlockSize > 0 {
n, err = b.w.Write(chunk[:remainingBlockSize])
b.pos += n
if err != nil {
return
}
}
// For each remaining slice of the chunk of size usableBlockSize, write a
// blockHeader and that slice of data.
for i, blocksLeft := remainingBlockSize, blockHeaders; blocksLeft > 0; i, blocksLeft = i+usableBlockSize, blocksLeft-1 {
// write blockHeader
blockStart := b.pos
n, err := (&blockHeader{
PreviousChunk: uint64(blockStart - chunkStart),
NextChunk: uint64(chunkEnd - blockStart),
}).WriteTo(b.w)
b.pos += n
if err != nil {
return b.pos - chunkStart, err
}
l := len(chunk) - i
if l > usableBlockSize {
// write the maximum size of the chunk possible
l = usableBlockSize
}
n, err = b.w.Write(chunk[i : i+l])
b.pos += n
if err != nil {
return b.pos - chunkStart, err
}
}
return b.pos - chunkStart, nil
}
// WriteTo implements the io.WriterTo interface for blockHeaders.
func (b *blockHeader) WriteTo(w io.Writer) (int, error) {
var buf [blockHeaderSize]byte
binary.LittleEndian.PutUint64(buf[8:], b.PreviousChunk)
binary.LittleEndian.PutUint64(buf[16:], b.NextChunk)
binary.LittleEndian.PutUint64(buf[:], hashBytes(buf[8:]))
return w.Write(buf[:])
}
type talliedRecordWriter struct {
recordWriter
numRecords, decodedSize uint64
}
// Put implements part of the recordWriter interface.
func (t *talliedRecordWriter) Put(rec []byte) error {
if err := t.recordWriter.Put(rec); err != nil {
return err
}
t.numRecords++
t.decodedSize += uint64(len(rec))
return nil
}
// PutProto implements part of the recordWriter interface.
func (t *talliedRecordWriter) PutProto(msg proto.Message) (int, error) {
size, err := t.recordWriter.PutProto(msg)
if err != nil {
return size, err
}
t.numRecords++
t.decodedSize += uint64(size)
return size, nil
}
type recordWriter interface {
io.Closer
// Put adds a new record to the chunk being written.
Put([]byte) error
// PutProto adds a new proto to the chunk being written.
PutProto(proto.Message) (int, error)
// Encode returns the binary-encoding of the Riegeli record chunk data.
Encode() ([]byte, error)
}
type recordChunkWriter struct {
compressionType compressionType
sizesCompressor, valsCompressor compressor
}
// Put implements part of the recordWriter interface.
func (r *recordChunkWriter) Put(rec []byte) error {
size := uint64(len(rec))
var buf [binary.MaxVarintLen64]byte
n := binary.PutUvarint(buf[:], size)
if _, err := r.sizesCompressor.Write(buf[:n]); err != nil {
return fmt.Errorf("compressing record size: %v", err)
} else if _, err := r.valsCompressor.Write(rec); err != nil {
return fmt.Errorf("compressing record: %v", err)
}
return nil
}
// PutProto implements part of the recordWriter interface.
func (r *recordChunkWriter) PutProto(msg proto.Message) (int, error) {
rec, err := proto.Marshal(msg)
if err != nil {
return 0, err
}
return len(rec), r.Put(rec)
}
// Close implements part of the recordWriter interface.
func (r *recordChunkWriter) Close() error {
if err := r.sizesCompressor.Close(); err != nil {
return fmt.Errorf("closing record size compressor: %v", err)
} else if err := r.valsCompressor.Close(); err != nil {
return fmt.Errorf("closing record value compressor: %v", err)
}
return nil
}
// Encode implements part of the recordWriter interface.
func (r *recordChunkWriter) Encode() ([]byte, error) {
if err := r.Close(); err != nil {
return nil, err
}
sizesSizePrefix := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(sizesSizePrefix[:], uint64(r.sizesCompressor.Len()))
sizesSizePrefix = sizesSizePrefix[:n]
// TODO(schroederc): reuse buffers
buf := bytes.NewBuffer(make([]byte, 0, 1+len(sizesSizePrefix)+r.sizesCompressor.Len()+r.valsCompressor.Len()))
buf.WriteByte(byte(r.compressionType))
buf.Write(sizesSizePrefix)
r.sizesCompressor.WriteTo(buf)
r.valsCompressor.WriteTo(buf)
return buf.Bytes(), nil
}
func newRecordChunkWriter(opts *WriterOptions) (*recordChunkWriter, error) {
vals, err := newCompressor(opts)
if err != nil {
return nil, err
}
sizes, err := newCompressor(opts)
if err != nil {
return nil, err
}
return &recordChunkWriter{
compressionType: opts.compressionType(),
valsCompressor: vals,
sizesCompressor: sizes,
}, nil
}
// WriteTo implements the io.WriterTo interface for chunkHeaders.
func (h *chunkHeader) WriteTo(w io.Writer) (int, error) {
// header_hash (8 bytes) — hash of the rest of the header
// data_size (8 bytes) — size of data
// data_hash (8 bytes) — hash of data
// chunk_type (1 byte) — determines how to interpret data
// num_records (7 bytes) — number of records after decoding
// decoded_data_size (8 bytes) — sum of record sizes after decoding
var buf [chunkHeaderSize]byte
binary.LittleEndian.PutUint64(buf[8:16], h.DataSize)
copy(buf[16:], h.DataHash[:])
buf[24] = byte(h.ChunkType)
// NumRecords is only 7 bytes, but the binary package requires an 8 byte
// buffer. Pass the 8 bytes and overwrite the last byte when encoding
// decoded_data_size.
binary.LittleEndian.PutUint64(buf[25:33], h.NumRecords) // overwrite buf[32] below
binary.LittleEndian.PutUint64(buf[32:40], h.DecodedDataSize)
hash := hashBytes(buf[8:])
binary.LittleEndian.PutUint64(buf[:8], hash)
return w.Write(buf[:])
}
// WriteTo writes the chunk to w, given its starting position within w.
func (c *chunk) WriteTo(w *blockWriter, pos int) (int, error) {
binary.LittleEndian.PutUint64(c.Header.DataHash[:], hashBytes(c.Data))
// TODO(schroederc): reuse buffers
var buf bytes.Buffer
if _, err := c.Header.WriteTo(&buf); err != nil {
return 0, err
}
(&buf).Write(c.Data)
padding := paddingSize(pos, &c.Header)
for i := 0; i < padding; i++ {
(&buf).WriteByte(0)
}
if buf.Len() != chunkHeaderSize+len(c.Data)+padding {
return 0, fmt.Errorf("bad chunk size: %v", buf.Len())
}
return w.WriteChunk(buf.Bytes())
}