| import os |
| import sqlite3 |
| import sys |
| from pathlib import Path |
| from contextlib import suppress, closing |
| from collections.abc import MutableMapping |
| |
| BUILD_TABLE = """ |
| CREATE TABLE IF NOT EXISTS Dict ( |
| key BLOB UNIQUE NOT NULL, |
| value BLOB NOT NULL |
| ) |
| """ |
| GET_SIZE = "SELECT COUNT (key) FROM Dict" |
| LOOKUP_KEY = "SELECT value FROM Dict WHERE key = CAST(? AS BLOB)" |
| STORE_KV = "REPLACE INTO Dict (key, value) VALUES (CAST(? AS BLOB), CAST(? AS BLOB))" |
| DELETE_KEY = "DELETE FROM Dict WHERE key = CAST(? AS BLOB)" |
| ITER_KEYS = "SELECT key FROM Dict" |
| |
| |
| class error(OSError): |
| pass |
| |
| |
| _ERR_CLOSED = "DBM object has already been closed" |
| _ERR_REINIT = "DBM object does not support reinitialization" |
| |
| |
| def _normalize_uri(path): |
| path = Path(path) |
| uri = path.absolute().as_uri() |
| while "//" in uri: |
| uri = uri.replace("//", "/") |
| return uri |
| |
| |
| class _Database(MutableMapping): |
| |
| def __init__(self, path, /, *, flag, mode): |
| if hasattr(self, "_cx"): |
| raise error(_ERR_REINIT) |
| |
| path = os.fsdecode(path) |
| match flag: |
| case "r": |
| flag = "ro" |
| case "w": |
| flag = "rw" |
| case "c": |
| flag = "rwc" |
| Path(path).touch(mode=mode, exist_ok=True) |
| case "n": |
| flag = "rwc" |
| Path(path).unlink(missing_ok=True) |
| Path(path).touch(mode=mode) |
| case _: |
| raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n', " |
| f"not {flag!r}") |
| |
| # We use the URI format when opening the database. |
| uri = _normalize_uri(path) |
| uri = f"{uri}?mode={flag}" |
| |
| try: |
| self._cx = sqlite3.connect(uri, autocommit=True, uri=True) |
| except sqlite3.Error as exc: |
| raise error(str(exc)) |
| |
| # This is an optimization only; it's ok if it fails. |
| with suppress(sqlite3.OperationalError): |
| self._cx.execute("PRAGMA journal_mode = wal") |
| |
| if flag == "rwc": |
| self._execute(BUILD_TABLE) |
| |
| def _execute(self, *args, **kwargs): |
| if not self._cx: |
| raise error(_ERR_CLOSED) |
| try: |
| return closing(self._cx.execute(*args, **kwargs)) |
| except sqlite3.Error as exc: |
| raise error(str(exc)) |
| |
| def __len__(self): |
| with self._execute(GET_SIZE) as cu: |
| row = cu.fetchone() |
| return row[0] |
| |
| def __getitem__(self, key): |
| with self._execute(LOOKUP_KEY, (key,)) as cu: |
| row = cu.fetchone() |
| if not row: |
| raise KeyError(key) |
| return row[0] |
| |
| def __setitem__(self, key, value): |
| self._execute(STORE_KV, (key, value)) |
| |
| def __delitem__(self, key): |
| with self._execute(DELETE_KEY, (key,)) as cu: |
| if not cu.rowcount: |
| raise KeyError(key) |
| |
| def __iter__(self): |
| try: |
| with self._execute(ITER_KEYS) as cu: |
| for row in cu: |
| yield row[0] |
| except sqlite3.Error as exc: |
| raise error(str(exc)) |
| |
| def close(self): |
| if self._cx: |
| self._cx.close() |
| self._cx = None |
| |
| def keys(self): |
| return list(super().keys()) |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, *args): |
| self.close() |
| |
| |
| def open(filename, /, flag="r", mode=0o666): |
| """Open a dbm.sqlite3 database and return the dbm object. |
| |
| The 'filename' parameter is the name of the database file. |
| |
| The optional 'flag' parameter can be one of ...: |
| 'r' (default): open an existing database for read only access |
| 'w': open an existing database for read/write access |
| 'c': create a database if it does not exist; open for read/write access |
| 'n': always create a new, empty database; open for read/write access |
| |
| The optional 'mode' parameter is the Unix file access mode of the database; |
| only used when creating a new database. Default: 0o666. |
| """ |
| return _Database(filename, flag=flag, mode=mode) |