blob: 9ff71483d7815a14f504589a2a45d060b5b30f37 [file] [log] [blame]
/*
* Copyright 2015 Google Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rtdb
import (
"bytes"
"encoding/gob"
_ "../third_party/go-sqlite3"
"database/sql"
"fmt"
"log"
"reflect"
)
type SQLiteBackend struct {
db *sql.DB
}
// SQLiteBackend implementation
func (backend *SQLiteBackend) upgrade () error {
db := backend.db
db.Exec("CREATE TABLE IF NOT EXISTS meta (id INT PRIMARY KEY, schemaVersion INT)")
var schemaVersion int
err := db.QueryRow("SELECT schemaVersion FROM meta").Scan(&schemaVersion)
// Initialize schema, if none found in db.
if err != nil {
log.Println("[rtdb] no schema version found in database, create version 1")
db.Exec("INSERT INTO meta (id, schemaVersion) VALUES (0, 1)")
schemaVersion = 1
}
// Upgrade to version 2.
if schemaVersion == 1 {
log.Println("[rtdb] update schema to version 2")
db.Exec("UPDATE meta SET schemaVersion=2 WHERE schemaVersion=1")
schemaVersion = 2
}
// Upgrade to version 3.
if schemaVersion == 2 {
log.Println("[rtdb] update schema to version 3")
_, err = db.Exec("CREATE TABLE objects (id VARCHAR(64) NOT NULL, version INT NOT NULL, value TEXT NOT NULL, ops TEXT NOT NULL, PRIMARY KEY(id, version))")
if err != nil { return err }
_, err = db.Exec("CREATE INDEX objects_id_version ON objects (id, version)")
if err != nil { return err }
_, err = db.Exec("CREATE TABLE version (id INT PRIMARY KEY, version INT NOT NULL)")
if err != nil { return err }
_, err = db.Exec("INSERT INTO version (id, version) VaLUES (0, 1)")
if err != nil { return err }
db.Exec("UPDATE meta SET schemaVersion=3 WHERE schemaVersion=2")
schemaVersion = 3
}
// Upgrade to version 4.
if schemaVersion == 3 {
log.Println("[rtdb] update schema to version 4")
_, err = db.Exec("DROP TABLE objects")
if err != nil { return err }
_, err = db.Exec("CREATE TABLE objects (id VARCHAR(128) NOT NULL, version INT NOT NULL, value BLOB NOT NULL, PRIMARY KEY(id, version))")
if err != nil { return err }
db.Exec("UPDATE meta SET schemaVersion=4 WHERE schemaVersion=3")
schemaVersion = 4
}
// Upgrade to version 5.
if schemaVersion == 4 {
log.Println("[rtdb] update schema to version 5")
_, err := db.Exec("DROP TABLE objects")
if err != nil { return err }
_, err = db.Exec("CREATE TABLE objects (type VARCHAR(64) NOT NULL, id VARCHAR(128) NOT NULL, version INT NOT NULL, value BLOB NOT NULL, PRIMARY KEY(type, id, version))")
db.Exec("UPDATE meta SET schemaVersion=5 WHERE schemaVersion=4")
schemaVersion = 5
}
// Upgrade to version 6.
if schemaVersion == 5 {
log.Println("[rtdb] update schema to version 6")
_, err := db.Exec("DROP TABLE objects")
if err != nil { return err }
_, err = db.Exec("DROP TABLE version")
if err != nil { return err }
_, err = db.Exec("CREATE TABLE objects (type VARCHAR(64) NOT NULL, id VARCHAR(128) NOT NULL, value BLOB NOT NULL, PRIMARY KEY(type, id))")
if err != nil { return err }
db.Exec("UPDATE meta SET schemaVersion=6 WHERE schemaVersion=5")
schemaVersion = 6
}
return nil
}
// Backend API
func NewSQLiteBackend (fileName string, syncIo bool) (Backend, error) {
// Open connection to MySQL.
db, err := sql.Open("sqlite3", fileName)
if err != nil { return nil, err }
// Create backend.
backend := SQLiteBackend {
db: db,
}
if !syncIo { backend.db.Exec("PRAGMA synchronous = OFF") }
// Upgrade schema to latest version.
if err = backend.upgrade(); err != nil {
return nil, err
}
return &backend, nil
}
func (backend *SQLiteBackend) GetExplicitTypeObject (objType string, objId string, dst ObjectBase) error {
// Query from database.
var objValue []byte
err := backend.db.QueryRow("SELECT value FROM objects WHERE type=? AND id=?", objType, objId).Scan(&objValue)
if err != nil { return err }
// Create object.
reader := bytes.NewReader(objValue)
dec := gob.NewDecoder(reader)
dec.Decode(dst)
// Call object post-load.
dst.PostLoad()
return nil
}
func (backend *SQLiteBackend) GetObject (objId string, dst ObjectBase) error {
// Resolve type.
ptr := reflect.ValueOf(dst)
if ptr.Kind() != reflect.Ptr { return fmt.Errorf("[rtdb] must pass pointer to dst (got %s)", ptr) }
direct := reflect.Indirect(ptr)
if direct.Kind() != reflect.Struct { return fmt.Errorf("[rtdb] pointer must be to a struct (got %s)", direct) }
return backend.GetExplicitTypeObject(direct.Type().Name(), objId, dst)
}
func (backend *SQLiteBackend) WriteObjects (writeOps []WriteOp) error {
// Begin transaction.
tx, err := backend.db.Begin()
if err != nil { panic(err) } // \todo [petri] can we handle this sanely?
// Enqueue all object writes.
// \todo [petri] version conflicts are *NOT* handled here!
for _, writeOp := range writeOps {
// writeOps: ObjId, Value, Ops
buffer := bytes.Buffer{}
enc := gob.NewEncoder(&buffer)
enc.Encode(writeOp.Value)
if err != nil { return err }
typeName := reflect.TypeOf(writeOp.Value).Elem().Name()
_, err = tx.Exec("INSERT OR REPLACE INTO objects (type, id, value) VALUES (?, ?, ?)", typeName, writeOp.ObjId, buffer.Bytes())
if err != nil { return err }
}
// Commit transaction.
err = tx.Commit()
if err != nil { return err }
return nil
}