Module buildstock_query.helpers
Expand source code
from concurrent.futures import Future
from pyathena.sqlalchemy.base import AthenaDialect
from pyathena.pandas.result_set import AthenaPandasResultSet
import datetime
import pickle
import os
import pandas as pd
from typing import Literal, TYPE_CHECKING
if TYPE_CHECKING:
from buildstock_query.schema.utilities import MappedColumn # noqa: F401
KWH2MBTU = 0.003412141633127942
MBTU2KWH = 293.0710701722222
class CachedFutureDf(Future):
def __init__(self, df: pd.DataFrame, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.df = df
self.set_result(self.df)
def running(self) -> Literal[False]:
return False
def done(self) -> Literal[True]:
return True
def cancelled(self) -> Literal[False]:
return False
def result(self, timeout=None) -> pd.DataFrame:
return self.df
def as_pandas(self) -> pd.DataFrame:
return self.df
class AthenaFutureDf:
def __init__(self, db_future: Future) -> None:
self.future = db_future
def cancel(self) -> bool:
return self.future.cancel()
def running(self) -> bool:
return self.future.running()
def done(self) -> bool:
return self.future.done()
def cancelled(self) -> bool:
return self.future.cancelled()
def result(self, timeout=None) -> AthenaPandasResultSet:
return self.future.result()
def as_pandas(self) -> pd.DataFrame:
return self.future.as_pandas() # type: ignore # mypy doesn't know about AthenaPandasResultSet
class COLOR:
YELLOW = '\033[93m'
RED = '\033[91m'
GREEN = '\033[92m'
END = '\033[0m'
def print_r(text): # print in Red
print(f"{COLOR.RED}{text}{COLOR.END}")
def print_y(text): # print in Yellow
print(f"{COLOR.YELLOW}{text}{COLOR.END}")
def print_g(text): # print in Green
print(f"{COLOR.GREEN}{text}{COLOR.END}")
class UnSupportedTypeException(Exception):
pass
class CustomCompiler(AthenaDialect().statement_compiler): # type: ignore
@staticmethod
def render_literal(obj):
from buildstock_query.schema.utilities import MappedColumn # noqa: F811
if isinstance(obj, (int, float)):
return str(obj)
elif isinstance(obj, str):
return "'%s'" % obj.replace("'", "''")
elif isinstance(obj, (datetime.datetime)):
return "timestamp '%s'" % str(obj).replace("'", "''")
elif isinstance(obj, list):
return CustomCompiler.get_array_string(obj)
elif isinstance(obj, tuple):
return f"({', '.join([CustomCompiler.render_literal(v) for v in obj])})"
elif isinstance(obj, MappedColumn):
keys = list(obj.mapping_dict.keys())
values = list(obj.mapping_dict.values())
if isinstance(obj.key, tuple):
indexing_str = f"({', '.join(tuple(obj.bsq._compile(source) for source in obj.key))})"
else:
indexing_str = obj.bsq._compile(obj.key)
return f"MAP({CustomCompiler.render_literal(keys)}, " +\
f"{CustomCompiler.render_literal(values)})[{indexing_str}]"
else:
raise UnSupportedTypeException(f"Unsupported type {type(obj)} for literal {obj}")
@staticmethod
def get_array_string(array):
# rewrite to break into multiple arrays joined by CONCAT if the number of elements is > 254
if len(array) > 254:
array_list = ["ARRAY[" + ', '.join([CustomCompiler.render_literal(v) for v in array[i:i+254]]) + "]"
for i in range(0, len(array), 254)]
return "CONCAT(" + ', '.join(array_list) + ")"
else:
return f"ARRAY[{', '.join([CustomCompiler.render_literal(v) for v in array])}]"
def render_literal_value(self, obj, type_):
from buildstock_query.schema.utilities import MappedColumn # noqa: F811
if isinstance(obj, (datetime.datetime, list, tuple, MappedColumn)):
return CustomCompiler.render_literal(obj)
return super(CustomCompiler, self).render_literal_value(obj, type_)
class DataExistsException(Exception):
def __init__(self, message, existing_data=None):
super(DataExistsException, self).__init__(message)
self.existing_data = existing_data
def save_pickle(path, obj):
with open(path, "wb") as f:
pickle.dump(obj, f)
def load_pickle(path):
if not os.path.exists(path):
raise FileNotFoundError(f"File {path} not found for loading table")
with open(path, "rb") as f:
return pickle.load(f)
def read_csv(csv_file_path, **kwargs) -> pd.DataFrame:
default_na_values = pd._libs.parsers.STR_NA_VALUES
df = pd.read_csv(csv_file_path, na_values=list(default_na_values - {"None"}), keep_default_na=False, **kwargs)
return df
Functions
def load_pickle(path)
-
Expand source code
def load_pickle(path): if not os.path.exists(path): raise FileNotFoundError(f"File {path} not found for loading table") with open(path, "rb") as f: return pickle.load(f)
def print_g(text)
-
Expand source code
def print_g(text): # print in Green print(f"{COLOR.GREEN}{text}{COLOR.END}")
def print_r(text)
-
Expand source code
def print_r(text): # print in Red print(f"{COLOR.RED}{text}{COLOR.END}")
def print_y(text)
-
Expand source code
def print_y(text): # print in Yellow print(f"{COLOR.YELLOW}{text}{COLOR.END}")
def read_csv(csv_file_path, **kwargs) ‑> pandas.core.frame.DataFrame
-
Expand source code
def read_csv(csv_file_path, **kwargs) -> pd.DataFrame: default_na_values = pd._libs.parsers.STR_NA_VALUES df = pd.read_csv(csv_file_path, na_values=list(default_na_values - {"None"}), keep_default_na=False, **kwargs) return df
def save_pickle(path, obj)
-
Expand source code
def save_pickle(path, obj): with open(path, "wb") as f: pickle.dump(obj, f)
Classes
class AthenaFutureDf (db_future: concurrent.futures._base.Future)
-
Expand source code
class AthenaFutureDf: def __init__(self, db_future: Future) -> None: self.future = db_future def cancel(self) -> bool: return self.future.cancel() def running(self) -> bool: return self.future.running() def done(self) -> bool: return self.future.done() def cancelled(self) -> bool: return self.future.cancelled() def result(self, timeout=None) -> AthenaPandasResultSet: return self.future.result() def as_pandas(self) -> pd.DataFrame: return self.future.as_pandas() # type: ignore # mypy doesn't know about AthenaPandasResultSet
Methods
def as_pandas(self) ‑> pandas.core.frame.DataFrame
-
Expand source code
def as_pandas(self) -> pd.DataFrame: return self.future.as_pandas() # type: ignore # mypy doesn't know about AthenaPandasResultSet
def cancel(self) ‑> bool
-
Expand source code
def cancel(self) -> bool: return self.future.cancel()
def cancelled(self) ‑> bool
-
Expand source code
def cancelled(self) -> bool: return self.future.cancelled()
def done(self) ‑> bool
-
Expand source code
def done(self) -> bool: return self.future.done()
def result(self, timeout=None) ‑> pyathena.pandas.result_set.AthenaPandasResultSet
-
Expand source code
def result(self, timeout=None) -> AthenaPandasResultSet: return self.future.result()
def running(self) ‑> bool
-
Expand source code
def running(self) -> bool: return self.future.running()
class COLOR
-
Expand source code
class COLOR: YELLOW = '\033[93m' RED = '\033[91m' GREEN = '\033[92m' END = '\033[0m'
Class variables
var END
var GREEN
var RED
var YELLOW
class CachedFutureDf (df: pandas.core.frame.DataFrame, *args, **kwargs)
-
Represents the result of an asynchronous computation.
Initializes the future. Should not be called by clients.
Expand source code
class CachedFutureDf(Future): def __init__(self, df: pd.DataFrame, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.df = df self.set_result(self.df) def running(self) -> Literal[False]: return False def done(self) -> Literal[True]: return True def cancelled(self) -> Literal[False]: return False def result(self, timeout=None) -> pd.DataFrame: return self.df def as_pandas(self) -> pd.DataFrame: return self.df
Ancestors
- concurrent.futures._base.Future
Methods
def as_pandas(self) ‑> pandas.core.frame.DataFrame
-
Expand source code
def as_pandas(self) -> pd.DataFrame: return self.df
def cancelled(self) ‑> Literal[False]
-
Return True if the future was cancelled.
Expand source code
def cancelled(self) -> Literal[False]: return False
def done(self) ‑> Literal[True]
-
Return True if the future was cancelled or finished executing.
Expand source code
def done(self) -> Literal[True]: return True
def result(self, timeout=None) ‑> pandas.core.frame.DataFrame
-
Return the result of the call that the future represents.
Args
timeout
- The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time.
Returns
The result of the call that the future represents.
Raises
CancelledError
- If the future was cancelled.
TimeoutError
- If the future didn't finish executing before the given timeout.
Exception
- If the call raised then that exception will be raised.
Expand source code
def result(self, timeout=None) -> pd.DataFrame: return self.df
def running(self) ‑> Literal[False]
-
Return True if the future is currently executing.
Expand source code
def running(self) -> Literal[False]: return False
class CustomCompiler (dialect, statement, cache_key=None, column_keys=None, for_executemany=False, linting=symbol('NO_LINTING'), **kwargs)
-
Default implementation of :class:
.Compiled
.Compiles :class:
_expression.ClauseElement
objects into SQL strings.Construct a new :class:
.SQLCompiler
object.:param dialect: :class:
.Dialect
to be used:param statement: :class:
_expression.ClauseElement
to be compiled:param column_keys: a list of column names to be compiled into an INSERT or UPDATE statement.
:param for_executemany: whether INSERT / UPDATE statements should expect that they are to be invoked in an "executemany" style, which may impact how the statement will be expected to return the values of defaults and autoincrement / sequences and similar. Depending on the backend and driver in use, support for retrieving these values may be disabled which means SQL expressions may be rendered inline, RETURNING may not be rendered, etc.
:param kwargs: additional keyword arguments to be consumed by the superclass.
Expand source code
class CustomCompiler(AthenaDialect().statement_compiler): # type: ignore @staticmethod def render_literal(obj): from buildstock_query.schema.utilities import MappedColumn # noqa: F811 if isinstance(obj, (int, float)): return str(obj) elif isinstance(obj, str): return "'%s'" % obj.replace("'", "''") elif isinstance(obj, (datetime.datetime)): return "timestamp '%s'" % str(obj).replace("'", "''") elif isinstance(obj, list): return CustomCompiler.get_array_string(obj) elif isinstance(obj, tuple): return f"({', '.join([CustomCompiler.render_literal(v) for v in obj])})" elif isinstance(obj, MappedColumn): keys = list(obj.mapping_dict.keys()) values = list(obj.mapping_dict.values()) if isinstance(obj.key, tuple): indexing_str = f"({', '.join(tuple(obj.bsq._compile(source) for source in obj.key))})" else: indexing_str = obj.bsq._compile(obj.key) return f"MAP({CustomCompiler.render_literal(keys)}, " +\ f"{CustomCompiler.render_literal(values)})[{indexing_str}]" else: raise UnSupportedTypeException(f"Unsupported type {type(obj)} for literal {obj}") @staticmethod def get_array_string(array): # rewrite to break into multiple arrays joined by CONCAT if the number of elements is > 254 if len(array) > 254: array_list = ["ARRAY[" + ', '.join([CustomCompiler.render_literal(v) for v in array[i:i+254]]) + "]" for i in range(0, len(array), 254)] return "CONCAT(" + ', '.join(array_list) + ")" else: return f"ARRAY[{', '.join([CustomCompiler.render_literal(v) for v in array])}]" def render_literal_value(self, obj, type_): from buildstock_query.schema.utilities import MappedColumn # noqa: F811 if isinstance(obj, (datetime.datetime, list, tuple, MappedColumn)): return CustomCompiler.render_literal(obj) return super(CustomCompiler, self).render_literal_value(obj, type_)
Ancestors
- pyathena.sqlalchemy.base.AthenaStatementCompiler
- sqlalchemy.sql.compiler.SQLCompiler
- sqlalchemy.sql.compiler.Compiled
Static methods
def get_array_string(array)
-
Expand source code
@staticmethod def get_array_string(array): # rewrite to break into multiple arrays joined by CONCAT if the number of elements is > 254 if len(array) > 254: array_list = ["ARRAY[" + ', '.join([CustomCompiler.render_literal(v) for v in array[i:i+254]]) + "]" for i in range(0, len(array), 254)] return "CONCAT(" + ', '.join(array_list) + ")" else: return f"ARRAY[{', '.join([CustomCompiler.render_literal(v) for v in array])}]"
def render_literal(obj)
-
Expand source code
@staticmethod def render_literal(obj): from buildstock_query.schema.utilities import MappedColumn # noqa: F811 if isinstance(obj, (int, float)): return str(obj) elif isinstance(obj, str): return "'%s'" % obj.replace("'", "''") elif isinstance(obj, (datetime.datetime)): return "timestamp '%s'" % str(obj).replace("'", "''") elif isinstance(obj, list): return CustomCompiler.get_array_string(obj) elif isinstance(obj, tuple): return f"({', '.join([CustomCompiler.render_literal(v) for v in obj])})" elif isinstance(obj, MappedColumn): keys = list(obj.mapping_dict.keys()) values = list(obj.mapping_dict.values()) if isinstance(obj.key, tuple): indexing_str = f"({', '.join(tuple(obj.bsq._compile(source) for source in obj.key))})" else: indexing_str = obj.bsq._compile(obj.key) return f"MAP({CustomCompiler.render_literal(keys)}, " +\ f"{CustomCompiler.render_literal(values)})[{indexing_str}]" else: raise UnSupportedTypeException(f"Unsupported type {type(obj)} for literal {obj}")
Methods
def render_literal_value(self, obj, type_)
-
Render the value of a bind parameter as a quoted literal.
This is used for statement sections that do not accept bind parameters on the target driver/database.
This should be implemented by subclasses using the quoting services of the DBAPI.
Expand source code
def render_literal_value(self, obj, type_): from buildstock_query.schema.utilities import MappedColumn # noqa: F811 if isinstance(obj, (datetime.datetime, list, tuple, MappedColumn)): return CustomCompiler.render_literal(obj) return super(CustomCompiler, self).render_literal_value(obj, type_)
class DataExistsException (message, existing_data=None)
-
Common base class for all non-exit exceptions.
Expand source code
class DataExistsException(Exception): def __init__(self, message, existing_data=None): super(DataExistsException, self).__init__(message) self.existing_data = existing_data
Ancestors
- builtins.Exception
- builtins.BaseException
class UnSupportedTypeException (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class UnSupportedTypeException(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException