Store

class chronify.store.Store(engine: Engine | None = None, engine_name: str | None = None, file_path: Path | str | None = None, **connect_kwargs: Any)

Data store for time series data

Construct the Store.

Parameters:
  • engine – Optional, defaults to a engine connected to an in-memory DuckDB database.

  • engine_name – Optional, name of engine to use (‘duckdb’, ‘sqlite’). Mutually exclusive with engine.

  • file_path – Optional, use this file for the database. If the file does not exist, create a new database. If the file exists, load that existing database. Defaults to a new in-memory database.

Examples

>>> from sqlalchemy
>>> store1 = Store()
>>> store2 = Store(engine=Engine("duckdb:///time_series.db"))
>>> store3 = Store(engine=Engine("sqlite:///time_series.db"))
>>> store4 = Store(engine_name="sqlite")
classmethod create_in_memory_db(engine_name: str = 'duckdb', **connect_kwargs: Any) Store

Create a Store with an in-memory database.

classmethod create_file_db(file_path: Path | str = 'time_series.db', engine_name: str = 'duckdb', overwrite: bool = False, **connect_kwargs: Any) Store

Create a Store with a file-based database.

classmethod create_new_hive_store(url: str, drop_schema: bool = True, **connect_kwargs: Any) Store

Create a new Store in a Hive database. Recommended usage is to create views from Parquet files. Ingesting data into tables from files or DataFrames is not supported.

This has been tested with Apache Spark running an Apache Thrift Server.

Parameters:
  • url – Thrift server URL

  • drop_schema – If True, drop the schema table if it’s already there.

Examples

>>> store = Store.create_new_hive_store("hive://localhost:10000/default")
classmethod load_from_file(file_path: Path | str, engine_name: str = 'duckdb', **connect_kwargs: Any) Store

Load an existing store from a database.

get_table(name: str) Table

Return the sqlalchemy Table object.

has_table(name: str) bool

Return True if the database has a table with the given name.

list_tables() list[str]

Return a list of user tables in the database.

try_get_table(name: str) Table | None

Return the sqlalchemy Table object or None if it is not stored.

update_metadata() None

Update the sqlalchemy metadata for table schema. Call this method if you add tables in the sqlalchemy engine outside of this class or perform a rollback in the same transaction in which chronify added tables.

backup(dst: Path | str, overwrite: bool = False) None

Copy the database to a new location. Not yet supported for in-memory databases.

property engine: Engine

Return the sqlalchemy engine.

property metadata: MetaData

Return the sqlalchemy metadata.

property schema_manager: SchemaManager

Return the store’s schema manager.

create_view_from_parquet(path: Path, schema: TableSchema, bypass_checks: bool = False) None

Load a table into the database.

ingest_from_csv(path: Path | str, src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool

Ingest data from a CSV file.

Parameters:
  • path – Source data file

  • src_schema – Defines the schema of the source file.

  • dst_schema – Defines the destination table in the database.

  • connection – Optional connection to reuse. Refer to ingest_table() for notes.

Returns:

Return True if a table was created.

Return type:

bool

Raises:

InvalidTable – Raised if the data does not match the schema.

Examples

>>> resolution = timedelta(hours=1)
>>> time_config = DatetimeRange(
...     time_column="timestamp",
...     start=datetime(2020, 1, 1, 0),
...     length=8784,
...     resolution=timedelta(hours=1),
... )
>>> store = Store()
>>> store.ingest_from_csv(
...     "data.csv",
...     CsvTableSchema(
...         time_config=time_config,
...         pivoted_dimension_name="device",
...         value_columns=["device1", "device2", "device3"],
...     ),
...     TableSchema(
...         name="devices",
...         value_column="value",
...         time_config=time_config,
...         time_array_id_columns=["device"],
...     ),
... )

See also

ingest_from_csvs

ingest_from_csvs(paths: Iterable[Path | str], src_schema: CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool

Ingest data into the table specifed by schema. If the table does not exist, create it. This is faster than calling ingest_from_csv() many times. Each file is loaded into memory one at a time. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.

Parameters:
  • path – Source data files

  • src_schema – Defines the schema of the source files.

  • dst_schema – Defines the destination table in the database.

  • conn – Optional connection to reuse. Refer to ingest_table() for notes.

Returns:

Return True if a table was created.

Return type:

bool

Raises:

InvalidTable – Raised if the data does not match the schema.

See also

ingest_from_csv

ingest_pivoted_table(data: DataFrame | DuckDBPyRelation, src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool

Ingest pivoted data into the table specifed by schema. If the table does not exist, create it. Chronify will unpivot the data before ingesting it.

Parameters:
  • data – Input data to ingest into the database.

  • src_schema – Defines the schema of the input data.

  • dst_schema – Defines the destination table in the database.

  • conn – Optional connection to reuse. Refer to ingest_table() for notes.

Returns:

Return True if a table was created.

Return type:

bool

Raises:

InvalidTable – Raised if the data does not match the schema.

Examples

>>> resolution = timedelta(hours=1)
>>> df = pd.DataFrame(
...     {
...         "timestamp": pd.date_range(
...             "2020-01-01", "2020-12-31 23:00:00", freq=resolution
...         ),
...         "device1": np.random.random(8784),
...         "device2": np.random.random(8784),
...         "device3": np.random.random(8784),
...     }
... )
>>> time_config = DatetimeRange(
...     time_column="timestamp",
...     start=datetime(2020, 1, 1, 0),
...     length=8784,
...     resolution=timedelta(hours=1),
... )
>>> store = Store()
>>> store.ingest_pivoted_table(
...     df,
...     PivotedTableSchema(
...         time_config=time_config,
...         pivoted_dimension_name="device",
...         value_columns=["device1", "device2", "device3"],
...     ),
...     TableSchema(
...         name="devices",
...         value_column="value",
...         time_config=time_config,
...         time_array_id_columns=["device"],
...     ),
... )
ingest_pivoted_tables(data: Iterable[DataFrame | DuckDBPyRelation], src_schema: PivotedTableSchema | CsvTableSchema, dst_schema: TableSchema, connection: Connection | None = None) bool

Ingest pivoted data into the table specifed by schema.

If the table does not exist, create it. Unpivot the data before ingesting it. This is faster than calling ingest_pivoted_table() many times. If any error occurs, all added data will be removed and the state of the database will be the same as the original state.

Parameters:
  • data – Data to ingest into the database.

  • src_schema – Defines the schema of all input tables.

  • dst_schema – Defines the destination table in the database.

  • conn – Optional connection to reuse. Refer to ingest_table() for notes.

Returns:

Return True if a table was created.

Return type:

bool

ingest_table(data: DataFrame | DuckDBPyRelation, schema: TableSchema, connection: Connection | None = None, **kwargs: Any) bool

Ingest data into the table specifed by schema. If the table does not exist, create it.

Parameters:
  • data – Input data to ingest into the database.

  • schema – Defines the destination table in the database.

  • connection – Optional connection to reuse. If adding many tables at once, it is significantly faster to use one connection. Refer to ingest_tables() for built-in support. If connection is not set, chronify will commit the database changes or perform a rollback on error. If it is set, the caller must perform those actions. If you peform a rollback, you must call rebuild_schema_cache() because the Store will cache all table names in memory.

Returns:

Return True if a table was created.

Return type:

bool

Raises:

InvalidTable – Raised if the data does not match the schema.

Examples

>>> store = Store()
>>> resolution = timedelta(hours=1)
>>> df = pd.DataFrame(
...     {
...         "timestamp": pd.date_range(
...             "2020-01-01", "2020-12-31 23:00:00", freq=resolution
...         ),
...         "value": np.random.random(8784),
...     }
... )
>>> df["id"] = 1
>>> store.ingest_table(
...     df,
...     TableSchema(
...         name="devices",
...         value_column="value",
...         time_config=DatetimeRange(
...             time_column="timestamp",
...             start=datetime(2020, 1, 1, 0),
...             length=8784,
...             resolution=timedelta(hours=1),
...         ),
...         time_array_id_columns=["id"],
...     ),
... )

See also

ingest_tables

ingest_tables(data: Iterable[DataFrame | DuckDBPyRelation], schema: TableSchema, connection: Connection | None = None, **kwargs: Any) bool

Ingest multiple input tables to the same database table. All tables must have the same schema. This offers significant performance advantages over calling ingest_table() many times.

Parameters:
  • data – Input tables to ingest into one database table.

  • schema – Defines the destination table.

  • conn – Optional connection to reuse. Refer to ingest_table() for notes.

Returns:

Return True if a table was created.

Return type:

bool

Raises:

InvalidTable – Raised if the data does not match the schema.

See also

ingest_table

map_table_time_config(src_name: str, dst_schema: TableSchema, scratch_dir: Path | None = None, output_file: Path | None = None, check_mapped_timestamps: bool = False) None

Map the existing table represented by src_name to a new table represented by dst_schema with a different time configuration.

Parameters:
  • src_name – Refers to the table name of the source data.

  • dst_schema – Defines the table to create in the database. Must not already exist.

  • scratch_dir – Directory to use for temporary writes. Default to the system’s tmp filesystem.

  • check_mapped_timestamps – Perform time checks on the result of the mapping operation. This can be slow and is not required.

Raises:
  • InvalidTable – Raised if the schemas are incompatible.

  • TableAlreadyExists – Raised if the dst_schema name already exists.

Examples

>>> store = Store()
>>> hours_per_year = 12 * 7 * 24
>>> num_time_arrays = 3
>>> df = pd.DataFrame(
...     {
...         "id": np.concat(
...             [np.repeat(i, hours_per_year) for i in range(1, 1 + num_time_arrays)]
...         ),
...         "month": np.tile(np.repeat(range(1, 13), 7 * 24), num_time_arrays),
...         "day_of_week": np.tile(np.tile(np.repeat(range(7), 24), 12), num_time_arrays),
...         "hour": np.tile(np.tile(range(24), 12 * 7), num_time_arrays),
...         "value": np.random.random(hours_per_year * num_time_arrays),
...     }
... )
>>> schema = TableSchema(
...     name="devices_by_representative_time",
...     value_column="value",
...     time_config=RepresentativePeriodTime(
...         time_format=RepresentativePeriodFormat.ONE_WEEK_PER_MONTH_BY_HOUR,
...     ),
...     time_array_id_columns=["id"],
... )
>>> store.ingest_table(df, schema)
>>> store.map_table_time_config(
...     "devices_by_representative_time",
...     TableSchema(
...         name="devices_by_datetime",
...         value_column="value",
...         time_config=DatetimeRange(
...             time_column="timestamp",
...             start=datetime(2020, 1, 1, 0),
...             length=8784,
...             resolution=timedelta(hours=1),
...         ),
...         time_array_id_columns=["id"],
...     ),
... )
read_query(name: str, query: Selectable | str, params: Any = None, connection: Connection | None = None) DataFrame

Return the query result as a pandas DataFrame.

Parameters:
  • name – Table or view name

  • query – SQL query expressed as a string or salqlchemy Selectable

  • params – Parameters for SQL query if expressed as a string

Examples

>>> df = store.read_query("SELECT * FROM devices")
>>> df = store.read_query("SELECT * FROM devices WHERE id = ?", params=(3,))
>>> from sqlalchemy import select
>>> table = store.schemas.get_table("devices")
>>> df = store.read_query(select(table).where(table.c.id == 3)
read_table(name: str, connection: Connection | None = None) DataFrame

Return the table as a pandas DataFrame.

read_raw_query(query: str, params: Any = None, connection: Connection | None = None) DataFrame

Execute a query directly on the backend database connection, bypassing sqlalchemy, and return the results as a DataFrame.

Note: Unlike read_query(), no conversion of timestamps is performed. Timestamps will be in the format of the underlying database. SQLite backends will return strings instead of datetime.

Parameters:
  • query – SQL query to execute

  • params – Optional parameters for SQL query

  • conn – Optional sqlalchemy connection returned by Store.engine.connect(). This can improve performance when performing many reads. If used for database modifications, it is the caller’s responsibility to perform a commit and ensure that the connection is closed correctly. Use of sqlalchemy’s context manager is recommended.

Examples

>>> store = Store()
>>> query1 = "SELECT * from my_table WHERE column = ?"
>>> params1 = ("value1",)
>>> query2 = "SELECT * from my_table WHERE column = ?'"
>>> params2 = ("value2",)
>>> df = store.read_raw_query(query1, params=params1)
>>> with store.engine.connect() as conn:
...     df1 = store.read_raw_query(query1, params=params1, connection=conn)
...     df2 = store.read_raw_query(query2, params=params2, connection=conn)
write_query_to_parquet(stmt: Selectable, file_path: Path | str, overwrite: bool = False, partition_columns: list[str] | None = None) None

Write the result of a query to a Parquet file.

write_table_to_parquet(name: str, file_path: Path | str, partition_columns: list[str] | None = None, overwrite: bool = False) None

Write a table or view to a Parquet file.

delete_rows(name: str, time_array_id_values: dict[str, Any], connection: Connection | None = None) None

Delete all rows matching the time_array_id_values.

Parameters:
  • name – Name of table

  • time_array_id_values – Values for the time_array_id_values. Keys must match the columns in the schema.

  • connnection – Optional connection to the database. Refer ingest_table() for notes.

Examples

>>> store.delete_rows("devices", {"id": 47})
drop_table(name: str, connection: Connection | None = None, if_exists: bool = False) None

Drop a table from the database.

create_view(schema: TableSchema, stmt: Selectable) None

Create a view in the database.

drop_view(name: str, connection: Connection | None = None, if_exists: bool = False) None

Drop a view from the database.