Store¶
- class chronify.store.Store(engine: Engine | None = None, engine_name: str | None = None, file_path: Path | str | None = None, **connect_kwargs: Any)¶
Data store for time series data
Construct the Store.
- Parameters:
engine – Optional, defaults to a engine connected to an in-memory DuckDB database.
engine_name – Optional, name of engine to use (‘duckdb’, ‘sqlite’). Mutually exclusive with engine.
file_path – Optional, use this file for the database. If the file does not exist, create a new database. If the file exists, load that existing database. Defaults to a new in-memory database.
Examples
>>> from sqlalchemy >>> store1 = Store() >>> store2 = Store(engine=Engine("duckdb:///time_series.db")) >>> store3 = Store(engine=Engine("sqlite:///time_series.db")) >>> store4 = Store(engine_name="sqlite")
- classmethod create_in_memory_db(engine_name: str = 'duckdb', **connect_kwargs: Any) Store ¶
Create a Store with an in-memory database.
- classmethod create_file_db(file_path: Path | str = 'time_series.db', engine_name: str = 'duckdb', overwrite: bool = False, **connect_kwargs: Any) Store ¶
Create a Store with a file-based database.
- classmethod load_from_file(file_path: Path | str, engine_name: str = 'duckdb', **connect_kwargs: Any) Store ¶
Load an existing store from a database.
- get_table(name: str) Table ¶
Return the sqlalchemy Table object.
- has_table(name: str) bool ¶
Return True if the database has a table with the given name.
- list_tables() list[str] ¶
Return a list of user tables in the database.
- try_get_table(name: str) Table | None ¶
Return the sqlalchemy Table object or None if it is not stored.
- update_sqlalchemy_metadata() None ¶
Update the sqlalchemy metadata for table schema. Call this method if you add tables in the sqlalchemy engine outside of this class.
- backup(dst: Path | str, overwrite: bool = False) None ¶
Copy the database to a new location. Not yet supported for in-memory databases.
- property engine: Engine¶
Return the sqlalchemy engine.
- property metadata: MetaData¶
Return the sqlalchemy metadata.
- create_view_from_parquet(schema: TableSchema, path: Path | str) None ¶
Create a view in the database from a Parquet file.
- Parameters:
schema – Defines the schema of the view to create in the database. Must match the input data.
path – Path to Parquet file.
- Raises:
InvalidTable – Raised if the schema does not match the input data.
Examples
>>> store = Store() >>> store.create_view_from_parquet( ... TableSchema( ... name="devices", ... value_column="value", ... time_config=DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ), ... time_array_id_columns=["id"], ... ), ... "table.parquet", ... )
- ingest_from_csv(path: Path | str, src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None ¶
Ingest data from a CSV file.
- Parameters:
path – Source data file
src_schema – Defines the schema of the source file.
dst_schema – Defines the destination table in the database.
connection – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Raises:
InvalidTable – Raised if the data does not match the schema.
Examples
>>> resolution = timedelta(hours=1) >>> time_config = DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ) >>> store = Store() >>> store.ingest_from_csv( ... "data.csv", ... CsvTableSchema( ... time_config=time_config, ... pivoted_dimension_name="device", ... value_columns=["device1", "device2", "device3"], ... ), ... TableSchema( ... name="devices", ... value_column="value", ... time_config=time_config, ... time_array_id_columns=["device"], ... ), ... )
See also
- ingest_from_csvs(paths: Iterable[Path | str], src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None ¶
Ingest data into the table specifed by schema. If the table does not exist, create it. This is faster than calling
ingest_from_csv()
many times. Each file is loaded into memory one at a time. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.- Parameters:
path – Source data files
src_schema – Defines the schema of the source files.
dst_schema – Defines the destination table in the database.
connection – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Raises:
InvalidTable – Raised if the data does not match the schema.
See also
- ingest_pivoted_table(data: DataFrame | DuckDBPyRelation, src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None ¶
Ingest pivoted data into the table specifed by schema. If the table does not exist, create it. Chronify will unpivot the data before ingesting it.
- Parameters:
data – Input data to ingest into the database.
src_schema – Defines the schema of the input data.
dst_schema – Defines the destination table in the database.
connection – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Raises:
InvalidTable – Raised if the data does not match the schema.
Examples
>>> resolution = timedelta(hours=1) >>> df = pd.DataFrame( ... { ... "timestamp": pd.date_range( ... "2020-01-01", "2020-12-31 23:00:00", freq=resolution ... ), ... "device1": np.random.random(8784), ... "device2": np.random.random(8784), ... "device3": np.random.random(8784), ... } ... ) >>> time_config = DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ) >>> store = Store() >>> store.ingest_pivoted_table( ... df, ... PivotedTableSchema( ... time_config=time_config, ... pivoted_dimension_name="device", ... value_columns=["device1", "device2", "device3"], ... ), ... TableSchema( ... name="devices", ... value_column="value", ... time_config=time_config, ... time_array_id_columns=["device"], ... ), ... )
See also
- ingest_pivoted_tables(data: Iterable[DataFrame | DuckDBPyRelation], src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) None ¶
Ingest pivoted data into the table specifed by schema.
If the table does not exist, create it. Unpivot the data before ingesting it. This is faster than calling
ingest_pivoted_table()
many times. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.- Parameters:
data – Data to ingest into the database.
src_schema – Defines the schema of all input tables.
dst_schema – Defines the destination table in the database.
connection – Optional connection to reuse. Refer to
ingest_table()
for notes.
See also
- ingest_table(data: DataFrame | DuckDBPyRelation, schema: TableSchema, connection: Connection | None = None) None ¶
Ingest data into the table specifed by schema. If the table does not exist, create it.
- Parameters:
data – Input data to ingest into the database.
schema – Defines the destination table in the database.
connection – Optional connection to reuse. If adding many tables at once, it is significantly faster to use one connection. Refer to
ingest_tables()
for built-in support. If connection is not set, chronify will commit the database changes or perform a rollback on error. If it is set, the caller must perform those actions.
- Raises:
InvalidTable – Raised if the data does not match the schema.
Examples
>>> store = Store() >>> resolution = timedelta(hours=1) >>> df = pd.DataFrame( ... { ... "timestamp": pd.date_range( ... "2020-01-01", "2020-12-31 23:00:00", freq=resolution ... ), ... "value": np.random.random(8784), ... } ... ) >>> df["id"] = 1 >>> store.ingest_table( ... df, ... TableSchema( ... name="devices", ... value_column="value", ... time_config=DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ), ... time_array_id_columns=["id"], ... ), ... )
See also
- ingest_tables(data: Iterable[DataFrame | DuckDBPyRelation], schema: TableSchema, connection: Connection | None = None) None ¶
Ingest multiple input tables to the same database table. All tables must have the same schema. This offers significant performance advantages over calling
ingest_table()
many times.- Parameters:
data – Input tables to ingest into one database table.
schema – Defines the destination table.
connection – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Raises:
InvalidTable – Raised if the data does not match the schema.
See also
- map_table_time_config(src_name: str, dst_schema: TableSchema) None ¶
Map the existing table represented by src_name to a new table represented by dst_schema with a different time configuration.
- Parameters:
src_name – Refers to the table name of the source data.
dst_schema – Defines the table to create in the database. Must not already exist.
- Raises:
InvalidTable – Raised if the schemas are incompatible.
Examples
>>> store = Store() >>> hours_per_year = 12 * 7 * 24 >>> num_time_arrays = 3 >>> df = pd.DataFrame( ... { ... "id": np.concat( ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] ... ), ... "month": np.tile(np.repeat(range(1, 13), 7 * 24), num_time_arrays), ... "day_of_week": np.tile(np.tile(np.repeat(range(7), 24), 12), num_time_arrays), ... "hour": np.tile(np.tile(range(24), 12 * 7), num_time_arrays), ... "value": np.random.random(hours_per_year * num_time_arrays), ... } ... ) >>> schema = TableSchema( ... name="devices_by_representative_time", ... value_column="value", ... time_config=RepresentativePeriodTime( ... time_format=RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR, ... ), ... time_array_id_columns=["id"], ... ) >>> store.ingest_table(df, schema) >>> store.map_table_time_config( ... "devices_by_representative_time", ... TableSchema( ... name="devices_by_datetime", ... value_column="value", ... time_config=DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ), ... time_array_id_columns=["id"], ... ), ... )
- read_query(name: str, query: Selectable | str, params: Any = None, connection: Connection | None = None) DataFrame ¶
Return the query result as a pandas DataFrame.
- Parameters:
name – Table or view name
query – SQL query expressed as a string or salqlchemy Selectable
params – Parameters for SQL query if expressed as a string
Examples
>>> df = store.read_query("SELECT * FROM devices") >>> df = store.read_query("SELECT * FROM devices WHERE id = ?", params=(3,))
>>> from sqlalchemy import select >>> table = store.schemas.get_table("devices") >>> df = store.read_query(select(table).where(table.c.id == 3)
- read_table(name: str, connection: Connection | None = None) DataFrame ¶
Return the table as a pandas DataFrame.
- read_raw_query(query: str, params: Any = None, connection: Connection | None = None) DataFrame ¶
Execute a query directly on the backend database connection, bypassing sqlalchemy, and return the results as a DataFrame.
Note: Unlike
read_query()
, no conversion of timestamps is performed. Timestamps will be in the format of the underlying database. SQLite backends will return strings instead of datetime.- Parameters:
query – SQL query to execute
params – Optional parameters for SQL query
connection – Optional sqlalchemy connection returned by Store.engine.connect(). This can improve performance when performing many reads. If used for database modifications, it is the caller’s responsibility to perform a commit and ensure that the connection is closed correctly. Use of sqlalchemy’s context manager is recommended.
Examples
>>> store = Store() >>> query1 = "SELECT * from my_table WHERE column = ?" >>> params1 = ("value1",) >>> query2 = "SELECT * from my_table WHERE column = ?'" >>> params2 = ("value2",)
>>> df = store.read_raw_query(query1, params=params1)
>>> with store.engine.connect() as conn: ... df1 = store.read_raw_query(query1, params=params1, connection=conn) ... df2 = store.read_raw_query(query2, params=params2, connection=conn)
- write_query_to_parquet(stmt: Selectable, file_path: Path | str) None ¶
Write the result of a query to a Parquet file.
- write_table_to_parquet(name: str, file_path: Path | str) None ¶
Write a table or view to a Parquet file.
- load_table(path: Path, schema: TableSchema) None ¶
Load a table into the database.
- delete_rows(name: str, time_array_id_values: dict[str, Any], connection: Connection | None = None) None ¶
Delete all rows matching the time_array_id_values.
- Parameters:
name – Name of table
time_array_id_values – Values for the time_array_id_values. Keys must match the columns in the schema.
connnection – Optional connection to the database. Refer
ingest_table()
for notes.
Examples
>>> store.delete_rows("devices", {"id": 47})
- drop_table(name: str) None ¶
Drop a table from the database.
- create_view(schema: TableSchema, stmt: Selectable) None ¶
Create a view in the database.
- drop_view(name: str) None ¶
Drop a view from the database.