Source code for el1xr_opt.Modules.oM_InputDuckDBSource

"""el1xr_opt DuckDB backend — reads a single ``<case>.duckdb`` file.

The file is produced by ``oM_CsvToDuckDB`` and holds one table per input
table plus a small metadata table. Data tables store their (originally unnamed)
index levels in reserved ``__idx0``, ``__idx1``, ... columns; on read those
columns are moved back into a nameless index so the DataFrame matches what the
CSV backend returns.
"""
from __future__ import annotations

from pathlib import Path

import pandas as pd

from .oM_InputSchema import (
    DB_DATA_PREFIX,
    DB_DICT_PREFIX,
    IDX_PREFIX,
    META_KEY_CASE,
    META_TABLE,
    is_idx_col,
)
from .oM_InputSource import InputSource, finalize_data_index

try:
    import duckdb  # noqa: F401
    _HAS_DUCKDB = True
except ImportError:  # pragma: no cover - exercised only in duckdb-free trees
    _HAS_DUCKDB = False


[docs] class DuckDBSource(InputSource): def __init__(self, db_path) -> None: # duckdb is imported at module level (guarded by _HAS_DUCKDB); this class # is only instantiated when that import succeeded. self.db_path = Path(db_path) self._con = duckdb.connect(str(self.db_path), read_only=True) # Read all tables once. Identifiers come only from our own schema, and # every read below goes through the relational API (con.table) rather than # a composed SQL string, so no table name is ever interpolated into SQL. names = self._con.execute( "SELECT table_name FROM information_schema.tables" ).fetchall() self._tables = {r[0] for r in names} meta = self._con.table(META_TABLE).df() match = meta.loc[meta["Key"] == META_KEY_CASE, "Value"] if match.empty or not match.iloc[0]: raise ValueError( f"{self.db_path}: metadata table '{META_TABLE}' has no '{META_KEY_CASE}'" ) self.case_name = str(match.iloc[0]) @property def dir_name(self) -> str: return str(self.db_path.parent)
[docs] def close(self) -> None: if self._con is not None: self._con.close() self._con = None
[docs] def list_data_stems(self) -> set: return { t[len(DB_DATA_PREFIX):] for t in self._tables if t.startswith(DB_DATA_PREFIX) }
[docs] def read_dict(self, stem: str) -> pd.DataFrame: table = f"{DB_DICT_PREFIX}{stem}" if table not in self._tables: return pd.DataFrame() return self._con.table(table).df()
[docs] def read_data(self, stem: str) -> pd.DataFrame: table = f"{DB_DATA_PREFIX}{stem}" if table not in self._tables: raise FileNotFoundError(f"oM_Data_{stem}_*.csv not present in {self.db_path}") df = self._con.table(table).df() idx_cols = sorted( (c for c in df.columns if is_idx_col(c)), key=lambda c: int(c[len(IDX_PREFIX):]), ) return finalize_data_index(df, idx_cols)