Store

class chronify.store.Store(engine: Engine | None = None, engine_name: str | None = None, file_path: Path | str | None = None, **connect_kwargs: Any)

Data store for time series data

Construct the Store.

Parameters:
  • engine – Optional, defaults to a engine connected to an in-memory DuckDB database.

  • engine_name – Optional, name of engine to use (‘duckdb’, ‘sqlite’). Mutually exclusive with engine.

  • file_path – Optional, use this file for the database. If the file does not exist, create a new database. If the file exists, load that existing database. Defaults to a new in-memory database.

Examples

>>> from sqlalchemy
>>> store1 = Store()
>>> store2 = Store(engine=Engine("duckdb:///time_series.db"))
>>> store3 = Store(engine=Engine("sqlite:///time_series.db"))
>>> store4 = Store(engine_name="sqlite")
classmethod create_in_memory_db(engine_name: str = 'duckdb', **connect_kwargs: Any) Store

Create a Store with an in-memory database.

classmethod create_file_db(file_path: Path | str = 'time_series.db', engine_name: str = 'duckdb', overwrite: bool = False, **connect_kwargs: Any) Store

Create a Store with a file-based database.

classmethod load_from_file(file_path: Path | str, engine_name: str = 'duckdb', **connect_kwargs: Any) Store

Load an existing store from a database.

get_table(name: str) Table

Return the sqlalchemy Table object.

has_table(name: str) bool

Return True if the database has a table with the given name.

list_tables() list[str]

Return a list of user tables in the database.

try_get_table(name: str) Table | None

Return the sqlalchemy Table object or None if it is not stored.

update_sqlalchemy_metadata() None

Update the sqlalchemy metadata for table schema. Call this method if you add tables in the sqlalchemy engine outside of this class.

backup(dst: Path | str, overwrite: bool = False) None

Copy the database to a new location. Not yet supported for in-memory databases.

property engine: Engine

Return the sqlalchemy engine.

property metadata: MetaData

Return the sqlalchemy metadata.

create_view_from_parquet(schema: TableSchema, path: Path | str) None

Create a view in the database from a Parquet file.

Parameters:
  • schema – Defines the schema of the view to create in the database. Must match the input data.

  • path – Path to Parquet file.

Raises:

InvalidTable – Raised if the schema does not match the input data.

Examples

>>> store = Store()
>>> store.create_view_from_parquet(
...     TableSchema(
...         name="devices",
...         value_column="value",
...         time_config=DatetimeRange(
...             time_column="timestamp",
...             start=datetime(2020, 1, 1, 0),
...             length=8784,
...             resolution=timedelta(hours=1),
...         ),
...         time_array_id_columns=["id"],
...     ),
...     "table.parquet",
... )
ingest_from_csv(path: Path | str, src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None

Ingest data from a CSV file.

Parameters:
  • path – Source data file

  • src_schema – Defines the schema of the source file.

  • dst_schema – Defines the destination table in the database.

  • connection – Optional connection to reuse. Refer to ingest_table() for notes.

Raises:

InvalidTable – Raised if the data does not match the schema.

Examples

>>> resolution = timedelta(hours=1)
>>> time_config = DatetimeRange(
...     time_column="timestamp",
...     start=datetime(2020, 1, 1, 0),
...     length=8784,
...     resolution=timedelta(hours=1),
... )
>>> store = Store()
>>> store.ingest_from_csv(
...     "data.csv",
...     CsvTableSchema(
...         time_config=time_config,
...         pivoted_dimension_name="device",
...         value_columns=["device1", "device2", "device3"],
...     ),
...     TableSchema(
...         name="devices",
...         value_column="value",
...         time_config=time_config,
...         time_array_id_columns=["device"],
...     ),
... )

See also

ingest_from_csvs

ingest_from_csvs(paths: Iterable[Path | str], src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None

Ingest data into the table specifed by schema. If the table does not exist, create it. This is faster than calling ingest_from_csv() many times. Each file is loaded into memory one at a time. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.

Parameters:
  • path – Source data files

  • src_schema – Defines the schema of the source files.

  • dst_schema – Defines the destination table in the database.

  • connection – Optional connection to reuse. Refer to ingest_table() for notes.

Raises:

InvalidTable – Raised if the data does not match the schema.

See also

ingest_from_csv

ingest_pivoted_table(data: DataFrame | DuckDBPyRelation, src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None

Ingest pivoted data into the table specifed by schema. If the table does not exist, create it. Chronify will unpivot the data before ingesting it.

Parameters:
  • data – Input data to ingest into the database.

  • src_schema – Defines the schema of the input data.

  • dst_schema – Defines the destination table in the database.

  • connection – Optional connection to reuse. Refer to ingest_table() for notes.

Raises:

InvalidTable – Raised if the data does not match the schema.

Examples

>>> resolution = timedelta(hours=1)
>>> df = pd.DataFrame(
...     {
...         "timestamp": pd.date_range(
...             "2020-01-01", "2020-12-31 23:00:00", freq=resolution
...         ),
...         "device1": np.random.random(8784),
...         "device2": np.random.random(8784),
...         "device3": np.random.random(8784),
...     }
... )
>>> time_config = DatetimeRange(
...     time_column="timestamp",
...     start=datetime(2020, 1, 1, 0),
...     length=8784,
...     resolution=timedelta(hours=1),
... )
>>> store = Store()
>>> store.ingest_pivoted_table(
...     df,
...     PivotedTableSchema(
...         time_config=time_config,
...         pivoted_dimension_name="device",
...         value_columns=["device1", "device2", "device3"],
...     ),
...     TableSchema(
...         name="devices",
...         value_column="value",
...         time_config=time_config,
...         time_array_id_columns=["device"],
...     ),
... )
ingest_pivoted_tables(data: Iterable[DataFrame | DuckDBPyRelation], src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None

Ingest pivoted data into the table specifed by schema.

If the table does not exist, create it. Unpivot the data before ingesting it. This is faster than calling ingest_pivoted_table() many times. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.

Parameters:
  • data – Data to ingest into the database.

  • src_schema – Defines the schema of all input tables.

  • dst_schema – Defines the destination table in the database.

  • connection – Optional connection to reuse. Refer to ingest_table() for notes.

ingest_table(data: DataFrame | DuckDBPyRelation, schema: TableSchema, connection: Connection | None = None) None

Ingest data into the table specifed by schema. If the table does not exist, create it.

Parameters:
  • data – Input data to ingest into the database.

  • schema – Defines the destination table in the database.

  • connection – Optional connection to reuse. If adding many tables at once, it is significantly faster to use one connection. Refer to ingest_tables() for built-in support. If connection is not set, chronify will commit the database changes or perform a rollback on error. If it is set, the caller must perform those actions.

Raises:

InvalidTable – Raised if the data does not match the schema.

Examples

>>> store = Store()
>>> resolution = timedelta(hours=1)
>>> df = pd.DataFrame(
...     {
...         "timestamp": pd.date_range(
...             "2020-01-01", "2020-12-31 23:00:00", freq=resolution
...         ),
...         "value": np.random.random(8784),
...     }
... )
>>> df["id"] = 1
>>> store.ingest_table(
...     df,
...     TableSchema(
...         name="devices",
...         value_column="value",
...         time_config=DatetimeRange(
...             time_column="timestamp",
...             start=datetime(2020, 1, 1, 0),
...             length=8784,
...             resolution=timedelta(hours=1),
...         ),
...         time_array_id_columns=["id"],
...     ),
... )

See also

ingest_tables

ingest_tables(data: Iterable[DataFrame | DuckDBPyRelation], schema: TableSchema, connection: Connection | None = None) None

Ingest multiple input tables to the same database table. All tables must have the same schema. This offers significant performance advantages over calling ingest_table() many times.

Parameters:
  • data – Input tables to ingest into one database table.

  • schema – Defines the destination table.

  • connection – Optional connection to reuse. Refer to ingest_table() for notes.

Raises:

InvalidTable – Raised if the data does not match the schema.

See also

ingest_table

map_table_time_config(src_name: str, dst_schema: TableSchema) None

Map the existing table represented by src_name to a new table represented by dst_schema with a different time configuration.

Parameters:
  • src_name – Refers to the table name of the source data.

  • dst_schema – Defines the table to create in the database. Must not already exist.

Raises:

InvalidTable – Raised if the schemas are incompatible.

Examples

>>> store = Store()
>>> hours_per_year = 12 * 7 * 24
>>> num_time_arrays = 3
>>> df = pd.DataFrame(
...     {
...         "id": np.concat(
...             [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)]
...         ),
...         "month": np.tile(np.repeat(range(1, 13), 7 * 24), num_time_arrays),
...         "day_of_week": np.tile(np.tile(np.repeat(range(7), 24), 12), num_time_arrays),
...         "hour": np.tile(np.tile(range(24), 12 * 7), num_time_arrays),
...         "value": np.random.random(hours_per_year * num_time_arrays),
...     }
... )
>>> schema = TableSchema(
...     name="devices_by_representative_time",
...     value_column="value",
...     time_config=RepresentativePeriodTime(
...         time_format=RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR,
...     ),
...     time_array_id_columns=["id"],
... )
>>> store.ingest_table(df, schema)
>>> store.map_table_time_config(
...     "devices_by_representative_time",
...     TableSchema(
...         name="devices_by_datetime",
...         value_column="value",
...         time_config=DatetimeRange(
...             time_column="timestamp",
...             start=datetime(2020, 1, 1, 0),
...             length=8784,
...             resolution=timedelta(hours=1),
...         ),
...         time_array_id_columns=["id"],
...     ),
... )
read_query(name: str, query: Selectable | str, params: Any = None, connection: Connection | None = None) DataFrame

Return the query result as a pandas DataFrame.

Parameters:
  • name – Table or view name

  • query – SQL query expressed as a string or salqlchemy Selectable

  • params – Parameters for SQL query if expressed as a string

Examples

>>> df = store.read_query("SELECT * FROM devices")
>>> df = store.read_query("SELECT * FROM devices WHERE id = ?", params=(3,))
>>> from sqlalchemy import select
>>> table = store.schemas.get_table("devices")
>>> df = store.read_query(select(table).where(table.c.id == 3)
read_table(name: str, connection: Connection | None = None) DataFrame

Return the table as a pandas DataFrame.

read_raw_query(query: str, params: Any = None, connection: Connection | None = None) DataFrame

Execute a query directly on the backend database connection, bypassing sqlalchemy, and return the results as a DataFrame.

Note: Unlike read_query(), no conversion of timestamps is performed. Timestamps will be in the format of the underlying database. SQLite backends will return strings instead of datetime.

Parameters:
  • query – SQL query to execute

  • params – Optional parameters for SQL query

  • connection – Optional sqlalchemy connection returned by Store.engine.connect(). This can improve performance when performing many reads. If used for database modifications, it is the caller’s responsibility to perform a commit and ensure that the connection is closed correctly. Use of sqlalchemy’s context manager is recommended.

Examples

>>> store = Store()
>>> query1 = "SELECT * from my_table WHERE column = ?"
>>> params1 = ("value1",)
>>> query2 = "SELECT * from my_table WHERE column = ?'"
>>> params2 = ("value2",)
>>> df = store.read_raw_query(query1, params=params1)
>>> with store.engine.connect() as conn:
...     df1 = store.read_raw_query(query1, params=params1, connection=conn)
...     df2 = store.read_raw_query(query2, params=params2, connection=conn)
write_query_to_parquet(stmt: Selectable, file_path: Path | str) None

Write the result of a query to a Parquet file.

write_table_to_parquet(name: str, file_path: Path | str) None

Write a table or view to a Parquet file.

load_table(path: Path, schema: TableSchema) None

Load a table into the database.

delete_rows(name: str, time_array_id_values: dict[str, Any], connection: Connection | None = None) None

Delete all rows matching the time_array_id_values.

Parameters:
  • name – Name of table

  • time_array_id_values – Values for the time_array_id_values. Keys must match the columns in the schema.

  • connnection – Optional connection to the database. Refer ingest_table() for notes.

Examples

>>> store.delete_rows("devices", {"id": 47})
drop_table(name: str) None

Drop a table from the database.

create_view(schema: TableSchema, stmt: Selectable) None

Create a view in the database.

drop_view(name: str) None

Drop a view from the database.