Module buildstock_query.helpers

Expand source code
from concurrent.futures import Future
from pyathena.sqlalchemy.base import AthenaDialect
from pyathena.pandas.result_set import AthenaPandasResultSet
import datetime
import pickle
import os
import pandas as pd
from typing import Literal, TYPE_CHECKING

if TYPE_CHECKING:
    from buildstock_query.schema.utilities import MappedColumn  # noqa: F401


KWH2MBTU = 0.003412141633127942
MBTU2KWH = 293.0710701722222


class CachedFutureDf(Future):
    def __init__(self, df: pd.DataFrame, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.df = df
        self.set_result(self.df)

    def running(self) -> Literal[False]:
        return False

    def done(self) -> Literal[True]:
        return True

    def cancelled(self) -> Literal[False]:
        return False

    def result(self, timeout=None) -> pd.DataFrame:
        return self.df

    def as_pandas(self) -> pd.DataFrame:
        return self.df


class AthenaFutureDf:
    def __init__(self, db_future: Future) -> None:
        self.future = db_future

    def cancel(self) -> bool:
        return self.future.cancel()

    def running(self) -> bool:
        return self.future.running()

    def done(self) -> bool:
        return self.future.done()

    def cancelled(self) -> bool:
        return self.future.cancelled()

    def result(self, timeout=None) -> AthenaPandasResultSet:
        return self.future.result()

    def as_pandas(self) -> pd.DataFrame:
        return self.future.as_pandas()  # type: ignore # mypy doesn't know about AthenaPandasResultSet


class COLOR:
    YELLOW = '\033[93m'
    RED = '\033[91m'
    GREEN = '\033[92m'
    END = '\033[0m'


def print_r(text):  # print in Red
    print(f"{COLOR.RED}{text}{COLOR.END}")


def print_y(text):  # print in Yellow
    print(f"{COLOR.YELLOW}{text}{COLOR.END}")


def print_g(text):  # print in Green
    print(f"{COLOR.GREEN}{text}{COLOR.END}")


class UnSupportedTypeException(Exception):
    pass


class CustomCompiler(AthenaDialect().statement_compiler):  # type: ignore

    @staticmethod
    def render_literal(obj):
        from buildstock_query.schema.utilities import MappedColumn  # noqa: F811
        if isinstance(obj, (int, float)):
            return str(obj)
        elif isinstance(obj, str):
            return "'%s'" % obj.replace("'", "''")
        elif isinstance(obj, (datetime.datetime)):
            return "timestamp '%s'" % str(obj).replace("'", "''")
        elif isinstance(obj, list):
            return CustomCompiler.get_array_string(obj)
        elif isinstance(obj, tuple):
            return f"({', '.join([CustomCompiler.render_literal(v) for v in obj])})"
        elif isinstance(obj, MappedColumn):
            keys = list(obj.mapping_dict.keys())
            values = list(obj.mapping_dict.values())
            if isinstance(obj.key, tuple):
                indexing_str = f"({', '.join(tuple(obj.bsq._compile(source) for source in obj.key))})"
            else:
                indexing_str = obj.bsq._compile(obj.key)

            return f"MAP({CustomCompiler.render_literal(keys)}, " +\
                   f"{CustomCompiler.render_literal(values)})[{indexing_str}]"
        else:
            raise UnSupportedTypeException(f"Unsupported type {type(obj)} for literal {obj}")

    @staticmethod
    def get_array_string(array):
        # rewrite to break into multiple arrays joined by CONCAT if the number of elements is > 254
        if len(array) > 254:
            array_list = ["ARRAY[" + ', '.join([CustomCompiler.render_literal(v) for v in array[i:i+254]]) + "]"
                          for i in range(0, len(array), 254)]
            return "CONCAT(" + ', '.join(array_list) + ")"
        else:
            return f"ARRAY[{', '.join([CustomCompiler.render_literal(v) for v in array])}]"

    def render_literal_value(self, obj, type_):
        from buildstock_query.schema.utilities import MappedColumn  # noqa: F811
        if isinstance(obj, (datetime.datetime, list, tuple, MappedColumn)):
            return CustomCompiler.render_literal(obj)

        return super(CustomCompiler, self).render_literal_value(obj, type_)


class DataExistsException(Exception):
    def __init__(self, message, existing_data=None):
        super(DataExistsException, self).__init__(message)
        self.existing_data = existing_data


def save_pickle(path, obj):
    with open(path, "wb") as f:
        pickle.dump(obj, f)


def load_pickle(path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"File {path} not found for loading table")
    with open(path, "rb") as f:
        return pickle.load(f)


def read_csv(csv_file_path, **kwargs) -> pd.DataFrame:
    default_na_values = pd._libs.parsers.STR_NA_VALUES
    df = pd.read_csv(csv_file_path, na_values=list(default_na_values - {"None"}), keep_default_na=False, **kwargs)
    return df

Functions

def load_pickle(path)
Expand source code
def load_pickle(path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"File {path} not found for loading table")
    with open(path, "rb") as f:
        return pickle.load(f)
def print_g(text)
Expand source code
def print_g(text):  # print in Green
    print(f"{COLOR.GREEN}{text}{COLOR.END}")
def print_r(text)
Expand source code
def print_r(text):  # print in Red
    print(f"{COLOR.RED}{text}{COLOR.END}")
def print_y(text)
Expand source code
def print_y(text):  # print in Yellow
    print(f"{COLOR.YELLOW}{text}{COLOR.END}")
def read_csv(csv_file_path, **kwargs) ‑> pandas.core.frame.DataFrame
Expand source code
def read_csv(csv_file_path, **kwargs) -> pd.DataFrame:
    default_na_values = pd._libs.parsers.STR_NA_VALUES
    df = pd.read_csv(csv_file_path, na_values=list(default_na_values - {"None"}), keep_default_na=False, **kwargs)
    return df
def save_pickle(path, obj)
Expand source code
def save_pickle(path, obj):
    with open(path, "wb") as f:
        pickle.dump(obj, f)

Classes

class AthenaFutureDf (db_future: concurrent.futures._base.Future)
Expand source code
class AthenaFutureDf:
    def __init__(self, db_future: Future) -> None:
        self.future = db_future

    def cancel(self) -> bool:
        return self.future.cancel()

    def running(self) -> bool:
        return self.future.running()

    def done(self) -> bool:
        return self.future.done()

    def cancelled(self) -> bool:
        return self.future.cancelled()

    def result(self, timeout=None) -> AthenaPandasResultSet:
        return self.future.result()

    def as_pandas(self) -> pd.DataFrame:
        return self.future.as_pandas()  # type: ignore # mypy doesn't know about AthenaPandasResultSet

Methods

def as_pandas(self) ‑> pandas.core.frame.DataFrame
Expand source code
def as_pandas(self) -> pd.DataFrame:
    return self.future.as_pandas()  # type: ignore # mypy doesn't know about AthenaPandasResultSet
def cancel(self) ‑> bool
Expand source code
def cancel(self) -> bool:
    return self.future.cancel()
def cancelled(self) ‑> bool
Expand source code
def cancelled(self) -> bool:
    return self.future.cancelled()
def done(self) ‑> bool
Expand source code
def done(self) -> bool:
    return self.future.done()
def result(self, timeout=None) ‑> pyathena.pandas.result_set.AthenaPandasResultSet
Expand source code
def result(self, timeout=None) -> AthenaPandasResultSet:
    return self.future.result()
def running(self) ‑> bool
Expand source code
def running(self) -> bool:
    return self.future.running()
class COLOR
Expand source code
class COLOR:
    YELLOW = '\033[93m'
    RED = '\033[91m'
    GREEN = '\033[92m'
    END = '\033[0m'

Class variables

var END
var GREEN
var RED
var YELLOW
class CachedFutureDf (df: pandas.core.frame.DataFrame, *args, **kwargs)

Represents the result of an asynchronous computation.

Initializes the future. Should not be called by clients.

Expand source code
class CachedFutureDf(Future):
    def __init__(self, df: pd.DataFrame, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.df = df
        self.set_result(self.df)

    def running(self) -> Literal[False]:
        return False

    def done(self) -> Literal[True]:
        return True

    def cancelled(self) -> Literal[False]:
        return False

    def result(self, timeout=None) -> pd.DataFrame:
        return self.df

    def as_pandas(self) -> pd.DataFrame:
        return self.df

Ancestors

  • concurrent.futures._base.Future

Methods

def as_pandas(self) ‑> pandas.core.frame.DataFrame
Expand source code
def as_pandas(self) -> pd.DataFrame:
    return self.df
def cancelled(self) ‑> Literal[False]

Return True if the future was cancelled.

Expand source code
def cancelled(self) -> Literal[False]:
    return False
def done(self) ‑> Literal[True]

Return True if the future was cancelled or finished executing.

Expand source code
def done(self) -> Literal[True]:
    return True
def result(self, timeout=None) ‑> pandas.core.frame.DataFrame

Return the result of the call that the future represents.

Args

timeout
The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time.

Returns

The result of the call that the future represents.

Raises

CancelledError
If the future was cancelled.
TimeoutError
If the future didn't finish executing before the given timeout.
Exception
If the call raised then that exception will be raised.
Expand source code
def result(self, timeout=None) -> pd.DataFrame:
    return self.df
def running(self) ‑> Literal[False]

Return True if the future is currently executing.

Expand source code
def running(self) -> Literal[False]:
    return False
class CustomCompiler (dialect, statement, cache_key=None, column_keys=None, for_executemany=False, linting=symbol('NO_LINTING'), **kwargs)

Default implementation of :class:.Compiled.

Compiles :class:_expression.ClauseElement objects into SQL strings.

Construct a new :class:.SQLCompiler object.

:param dialect: :class:.Dialect to be used

:param statement: :class:_expression.ClauseElement to be compiled

:param column_keys: a list of column names to be compiled into an INSERT or UPDATE statement.

:param for_executemany: whether INSERT / UPDATE statements should expect that they are to be invoked in an "executemany" style, which may impact how the statement will be expected to return the values of defaults and autoincrement / sequences and similar. Depending on the backend and driver in use, support for retrieving these values may be disabled which means SQL expressions may be rendered inline, RETURNING may not be rendered, etc.

:param kwargs: additional keyword arguments to be consumed by the superclass.

Expand source code
class CustomCompiler(AthenaDialect().statement_compiler):  # type: ignore

    @staticmethod
    def render_literal(obj):
        from buildstock_query.schema.utilities import MappedColumn  # noqa: F811
        if isinstance(obj, (int, float)):
            return str(obj)
        elif isinstance(obj, str):
            return "'%s'" % obj.replace("'", "''")
        elif isinstance(obj, (datetime.datetime)):
            return "timestamp '%s'" % str(obj).replace("'", "''")
        elif isinstance(obj, list):
            return CustomCompiler.get_array_string(obj)
        elif isinstance(obj, tuple):
            return f"({', '.join([CustomCompiler.render_literal(v) for v in obj])})"
        elif isinstance(obj, MappedColumn):
            keys = list(obj.mapping_dict.keys())
            values = list(obj.mapping_dict.values())
            if isinstance(obj.key, tuple):
                indexing_str = f"({', '.join(tuple(obj.bsq._compile(source) for source in obj.key))})"
            else:
                indexing_str = obj.bsq._compile(obj.key)

            return f"MAP({CustomCompiler.render_literal(keys)}, " +\
                   f"{CustomCompiler.render_literal(values)})[{indexing_str}]"
        else:
            raise UnSupportedTypeException(f"Unsupported type {type(obj)} for literal {obj}")

    @staticmethod
    def get_array_string(array):
        # rewrite to break into multiple arrays joined by CONCAT if the number of elements is > 254
        if len(array) > 254:
            array_list = ["ARRAY[" + ', '.join([CustomCompiler.render_literal(v) for v in array[i:i+254]]) + "]"
                          for i in range(0, len(array), 254)]
            return "CONCAT(" + ', '.join(array_list) + ")"
        else:
            return f"ARRAY[{', '.join([CustomCompiler.render_literal(v) for v in array])}]"

    def render_literal_value(self, obj, type_):
        from buildstock_query.schema.utilities import MappedColumn  # noqa: F811
        if isinstance(obj, (datetime.datetime, list, tuple, MappedColumn)):
            return CustomCompiler.render_literal(obj)

        return super(CustomCompiler, self).render_literal_value(obj, type_)

Ancestors

  • pyathena.sqlalchemy.base.AthenaStatementCompiler
  • sqlalchemy.sql.compiler.SQLCompiler
  • sqlalchemy.sql.compiler.Compiled

Static methods

def get_array_string(array)
Expand source code
@staticmethod
def get_array_string(array):
    # rewrite to break into multiple arrays joined by CONCAT if the number of elements is > 254
    if len(array) > 254:
        array_list = ["ARRAY[" + ', '.join([CustomCompiler.render_literal(v) for v in array[i:i+254]]) + "]"
                      for i in range(0, len(array), 254)]
        return "CONCAT(" + ', '.join(array_list) + ")"
    else:
        return f"ARRAY[{', '.join([CustomCompiler.render_literal(v) for v in array])}]"
def render_literal(obj)
Expand source code
@staticmethod
def render_literal(obj):
    from buildstock_query.schema.utilities import MappedColumn  # noqa: F811
    if isinstance(obj, (int, float)):
        return str(obj)
    elif isinstance(obj, str):
        return "'%s'" % obj.replace("'", "''")
    elif isinstance(obj, (datetime.datetime)):
        return "timestamp '%s'" % str(obj).replace("'", "''")
    elif isinstance(obj, list):
        return CustomCompiler.get_array_string(obj)
    elif isinstance(obj, tuple):
        return f"({', '.join([CustomCompiler.render_literal(v) for v in obj])})"
    elif isinstance(obj, MappedColumn):
        keys = list(obj.mapping_dict.keys())
        values = list(obj.mapping_dict.values())
        if isinstance(obj.key, tuple):
            indexing_str = f"({', '.join(tuple(obj.bsq._compile(source) for source in obj.key))})"
        else:
            indexing_str = obj.bsq._compile(obj.key)

        return f"MAP({CustomCompiler.render_literal(keys)}, " +\
               f"{CustomCompiler.render_literal(values)})[{indexing_str}]"
    else:
        raise UnSupportedTypeException(f"Unsupported type {type(obj)} for literal {obj}")

Methods

def render_literal_value(self, obj, type_)

Render the value of a bind parameter as a quoted literal.

This is used for statement sections that do not accept bind parameters on the target driver/database.

This should be implemented by subclasses using the quoting services of the DBAPI.

Expand source code
def render_literal_value(self, obj, type_):
    from buildstock_query.schema.utilities import MappedColumn  # noqa: F811
    if isinstance(obj, (datetime.datetime, list, tuple, MappedColumn)):
        return CustomCompiler.render_literal(obj)

    return super(CustomCompiler, self).render_literal_value(obj, type_)
class DataExistsException (message, existing_data=None)

Common base class for all non-exit exceptions.

Expand source code
class DataExistsException(Exception):
    def __init__(self, message, existing_data=None):
        super(DataExistsException, self).__init__(message)
        self.existing_data = existing_data

Ancestors

  • builtins.Exception
  • builtins.BaseException
class UnSupportedTypeException (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UnSupportedTypeException(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException