Store¶
- class chronify.store.Store(engine: Engine | None = None, engine_name: str | None = None, file_path: Path | str | None = None, **connect_kwargs: Any)¶
Data store for time series data
Construct the Store.
- Parameters:
engine – Optional, defaults to a engine connected to an in-memory DuckDB database.
engine_name – Optional, name of engine to use (‘duckdb’, ‘sqlite’). Mutually exclusive with engine.
file_path – Optional, use this file for the database. If the file does not exist, create a new database. If the file exists, load that existing database. Defaults to a new in-memory database.
Examples
>>> from sqlalchemy >>> store1 = Store() >>> store2 = Store(engine=Engine("duckdb:///time_series.db")) >>> store3 = Store(engine=Engine("sqlite:///time_series.db")) >>> store4 = Store(engine_name="sqlite")
- classmethod create_in_memory_db(engine_name: str = 'duckdb', **connect_kwargs: Any) Store ¶
Create a Store with an in-memory database.
- classmethod create_file_db(file_path: Path | str = 'time_series.db', engine_name: str = 'duckdb', overwrite: bool = False, **connect_kwargs: Any) Store ¶
Create a Store with a file-based database.
- classmethod create_new_hive_store(url: str, drop_schema: bool = True, **connect_kwargs: Any) Store ¶
Create a new Store in a Hive database. Recommended usage is to create views from Parquet files. Ingesting data into tables from files or DataFrames is not supported.
This has been tested with Apache Spark running an Apache Thrift Server.
- Parameters:
url – Thrift server URL
drop_schema – If True, drop the schema table if it’s already there.
Examples
>>> store = Store.create_new_hive_store("hive://localhost:10000/default")
See also
- classmethod load_from_file(file_path: Path | str, engine_name: str = 'duckdb', **connect_kwargs: Any) Store ¶
Load an existing store from a database.
- get_table(name: str) Table ¶
Return the sqlalchemy Table object.
- has_table(name: str) bool ¶
Return True if the database has a table with the given name.
- list_tables() list[str] ¶
Return a list of user tables in the database.
- try_get_table(name: str) Table | None ¶
Return the sqlalchemy Table object or None if it is not stored.
- update_metadata() None ¶
Update the sqlalchemy metadata for table schema. Call this method if you add tables in the sqlalchemy engine outside of this class or perform a rollback in the same transaction in which chronify added tables.
- backup(dst: Path | str, overwrite: bool = False) None ¶
Copy the database to a new location. Not yet supported for in-memory databases.
- property engine: Engine¶
Return the sqlalchemy engine.
- property metadata: MetaData¶
Return the sqlalchemy metadata.
- property schema_manager: SchemaManager¶
Return the store’s schema manager.
- create_view_from_parquet(path: Path, schema: TableSchema, bypass_checks: bool = False) None ¶
Load a table into the database.
- ingest_from_csv(path: Path | str, src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool ¶
Ingest data from a CSV file.
- Parameters:
path – Source data file
src_schema – Defines the schema of the source file.
dst_schema – Defines the destination table in the database.
connection – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Returns:
Return True if a table was created.
- Return type:
bool
- Raises:
InvalidTable – Raised if the data does not match the schema.
Examples
>>> resolution = timedelta(hours=1) >>> time_config = DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ) >>> store = Store() >>> store.ingest_from_csv( ... "data.csv", ... CsvTableSchema( ... time_config=time_config, ... pivoted_dimension_name="device", ... value_columns=["device1", "device2", "device3"], ... ), ... TableSchema( ... name="devices", ... value_column="value", ... time_config=time_config, ... time_array_id_columns=["device"], ... ), ... )
See also
- ingest_from_csvs(paths: Iterable[Path | str], src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool ¶
Ingest data into the table specifed by schema. If the table does not exist, create it. This is faster than calling
ingest_from_csv()
many times. Each file is loaded into memory one at a time. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.- Parameters:
path – Source data files
src_schema – Defines the schema of the source files.
dst_schema – Defines the destination table in the database.
conn – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Returns:
Return True if a table was created.
- Return type:
bool
- Raises:
InvalidTable – Raised if the data does not match the schema.
See also
- ingest_pivoted_table(data: DataFrame | DuckDBPyRelation, src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool ¶
Ingest pivoted data into the table specifed by schema. If the table does not exist, create it. Chronify will unpivot the data before ingesting it.
- Parameters:
data – Input data to ingest into the database.
src_schema – Defines the schema of the input data.
dst_schema – Defines the destination table in the database.
conn – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Returns:
Return True if a table was created.
- Return type:
bool
- Raises:
InvalidTable – Raised if the data does not match the schema.
Examples
>>> resolution = timedelta(hours=1) >>> df = pd.DataFrame( ... { ... "timestamp": pd.date_range( ... "2020-01-01", "2020-12-31 23:00:00", freq=resolution ... ), ... "device1": np.random.random(8784), ... "device2": np.random.random(8784), ... "device3": np.random.random(8784), ... } ... ) >>> time_config = DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ) >>> store = Store() >>> store.ingest_pivoted_table( ... df, ... PivotedTableSchema( ... time_config=time_config, ... pivoted_dimension_name="device", ... value_columns=["device1", "device2", "device3"], ... ), ... TableSchema( ... name="devices", ... value_column="value", ... time_config=time_config, ... time_array_id_columns=["device"], ... ), ... )
See also
- ingest_pivoted_tables(data: Iterable[DataFrame | DuckDBPyRelation], src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool ¶
Ingest pivoted data into the table specifed by schema.
If the table does not exist, create it. Unpivot the data before ingesting it. This is faster than calling
ingest_pivoted_table()
many times. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.- Parameters:
data – Data to ingest into the database.
src_schema – Defines the schema of all input tables.
dst_schema – Defines the destination table in the database.
conn – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Returns:
Return True if a table was created.
- Return type:
bool
See also
- ingest_table(data: DataFrame | DuckDBPyRelation, schema: TableSchema, connection: Connection | None = None, **kwargs: Any) bool ¶
Ingest data into the table specifed by schema. If the table does not exist, create it.
- Parameters:
data – Input data to ingest into the database.
schema – Defines the destination table in the database.
connection – Optional connection to reuse. If adding many tables at once, it is significantly faster to use one connection. Refer to
ingest_tables()
for built-in support. If connection is not set, chronify will commit the database changes or perform a rollback on error. If it is set, the caller must perform those actions. If you peform a rollback, you must callrebuild_schema_cache()
because the Store will cache all table names in memory.
- Returns:
Return True if a table was created.
- Return type:
bool
- Raises:
InvalidTable – Raised if the data does not match the schema.
Examples
>>> store = Store() >>> resolution = timedelta(hours=1) >>> df = pd.DataFrame( ... { ... "timestamp": pd.date_range( ... "2020-01-01", "2020-12-31 23:00:00", freq=resolution ... ), ... "value": np.random.random(8784), ... } ... ) >>> df["id"] = 1 >>> store.ingest_table( ... df, ... TableSchema( ... name="devices", ... value_column="value", ... time_config=DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ), ... time_array_id_columns=["id"], ... ), ... )
See also
- ingest_tables(data: Iterable[DataFrame | DuckDBPyRelation], schema: TableSchema, connection: Connection | None = None, **kwargs: Any) bool ¶
Ingest multiple input tables to the same database table. All tables must have the same schema. This offers significant performance advantages over calling
ingest_table()
many times.- Parameters:
data – Input tables to ingest into one database table.
schema – Defines the destination table.
conn – Optional connection to reuse. Refer to
ingest_table()
for notes.
- Returns:
Return True if a table was created.
- Return type:
bool
- Raises:
InvalidTable – Raised if the data does not match the schema.
See also
- map_table_time_config(src_name: str, dst_schema: TableSchema, scratch_dir: Path | None = None, output_file: Path | None = None, check_mapped_timestamps: bool = False) None ¶
Map the existing table represented by src_name to a new table represented by dst_schema with a different time configuration.
- Parameters:
src_name – Refers to the table name of the source data.
dst_schema – Defines the table to create in the database. Must not already exist.
scratch_dir – Directory to use for temporary writes. Default to the system’s tmp filesystem.
check_mapped_timestamps – Perform time checks on the result of the mapping operation. This can be slow and is not required.
- Raises:
InvalidTable – Raised if the schemas are incompatible.
TableAlreadyExists – Raised if the dst_schema name already exists.
Examples
>>> store = Store() >>> hours_per_year = 12 * 7 * 24 >>> num_time_arrays = 3 >>> df = pd.DataFrame( ... { ... "id": np.concat( ... [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)] ... ), ... "month": np.tile(np.repeat(range(1, 13), 7 * 24), num_time_arrays), ... "day_of_week": np.tile(np.tile(np.repeat(range(7), 24), 12), num_time_arrays), ... "hour": np.tile(np.tile(range(24), 12 * 7), num_time_arrays), ... "value": np.random.random(hours_per_year * num_time_arrays), ... } ... ) >>> schema = TableSchema( ... name="devices_by_representative_time", ... value_column="value", ... time_config=RepresentativePeriodTime( ... time_format=RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR, ... ), ... time_array_id_columns=["id"], ... ) >>> store.ingest_table(df, schema) >>> store.map_table_time_config( ... "devices_by_representative_time", ... TableSchema( ... name="devices_by_datetime", ... value_column="value", ... time_config=DatetimeRange( ... time_column="timestamp", ... start=datetime(2020, 1, 1, 0), ... length=8784, ... resolution=timedelta(hours=1), ... ), ... time_array_id_columns=["id"], ... ), ... )
- read_query(name: str, query: Selectable | str, params: Any = None, connection: Connection | None = None) DataFrame ¶
Return the query result as a pandas DataFrame.
- Parameters:
name – Table or view name
query – SQL query expressed as a string or salqlchemy Selectable
params – Parameters for SQL query if expressed as a string
Examples
>>> df = store.read_query("SELECT * FROM devices") >>> df = store.read_query("SELECT * FROM devices WHERE id = ?", params=(3,))
>>> from sqlalchemy import select >>> table = store.schemas.get_table("devices") >>> df = store.read_query(select(table).where(table.c.id == 3)
- read_table(name: str, connection: Connection | None = None) DataFrame ¶
Return the table as a pandas DataFrame.
- read_raw_query(query: str, params: Any = None, connection: Connection | None = None) DataFrame ¶
Execute a query directly on the backend database connection, bypassing sqlalchemy, and return the results as a DataFrame.
Note: Unlike
read_query()
, no conversion of timestamps is performed. Timestamps will be in the format of the underlying database. SQLite backends will return strings instead of datetime.- Parameters:
query – SQL query to execute
params – Optional parameters for SQL query
conn – Optional sqlalchemy connection returned by Store.engine.connect(). This can improve performance when performing many reads. If used for database modifications, it is the caller’s responsibility to perform a commit and ensure that the connection is closed correctly. Use of sqlalchemy’s context manager is recommended.
Examples
>>> store = Store() >>> query1 = "SELECT * from my_table WHERE column = ?" >>> params1 = ("value1",) >>> query2 = "SELECT * from my_table WHERE column = ?'" >>> params2 = ("value2",)
>>> df = store.read_raw_query(query1, params=params1)
>>> with store.engine.connect() as conn: ... df1 = store.read_raw_query(query1, params=params1, connection=conn) ... df2 = store.read_raw_query(query2, params=params2, connection=conn)
- write_query_to_parquet(stmt: Selectable, file_path: Path | str, overwrite: bool = False, partition_columns: list[str] | None = None) None ¶
Write the result of a query to a Parquet file.
- write_table_to_parquet(name: str, file_path: Path | str, partition_columns: list[str] | None = None, overwrite: bool = False) None ¶
Write a table or view to a Parquet file.
- delete_rows(name: str, time_array_id_values: dict[str, Any], connection: Connection | None = None) None ¶
Delete all rows matching the time_array_id_values.
- Parameters:
name – Name of table
time_array_id_values – Values for the time_array_id_values. Keys must match the columns in the schema.
connnection – Optional connection to the database. Refer
ingest_table()
for notes.
Examples
>>> store.delete_rows("devices", {"id": 47})
- drop_table(name: str, connection: Connection | None = None, if_exists: bool = False) None ¶
Drop a table from the database.
- create_view(schema: TableSchema, stmt: Selectable) None ¶
Create a view in the database.
- drop_view(name: str, connection: Connection | None = None, if_exists: bool = False) None ¶
Drop a view from the database.