Module buildstock_query.savings_query

Expand source code
import pandas as pd
import sqlalchemy as sa
from typing import Sequence, Union
from buildstock_query.schema.utilities import AnyColType
from buildstock_query.schema.query_params import SavingsQuery
from buildstock_query.schema.helpers import gather_params
from sqlalchemy.sql import functions as safunc
import buildstock_query.main as main
from pydantic import Field, validate_arguments


class BuildStockSavings:
    """Class for doing savings query (both timeseries and annual).
    """

    def __init__(self, buildstock_query: 'main.BuildStockQuery') -> None:
        self._bsq = buildstock_query

    def _validate_partition_by(self, partition_by: Sequence[str]):
        if not partition_by:
            return []
        [self._bsq._get_gcol(col) for col in partition_by]  # making sure all entries are valid
        return partition_by

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def __get_timeseries_bs_up_table(self,
                                     enduses: Sequence[AnyColType],
                                     upgrade_id: str,
                                     applied_only: bool,
                                     restrict:
                                     Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]
                                              ] = Field(default_factory=list),
                                     ts_group_by: Sequence[Union[AnyColType, tuple[str, str]]
                                                           ] = Field(default_factory=list)):
        if self._bsq.ts_table is None:
            raise ValueError("No timeseries table found in database.")

        ts = self._bsq.ts_table
        base = self._bsq.bs_table
        sa_ts_cols = [ts.c[self._bsq.building_id_column_name],
                      ts.c[self._bsq.timestamp_column_name]] + list(ts_group_by)
        sa_ts_cols.extend(enduses)
        ucol = self._bsq._ts_upgrade_col
        ts_b = self._bsq._add_restrict(sa.select(sa_ts_cols), [[ucol, ("0")]] + list(restrict)).alias("ts_b")
        ts_u = self._bsq._add_restrict(sa.select(sa_ts_cols), [[ucol, (upgrade_id)]] + list(restrict)).alias("ts_u")

        if applied_only:
            tbljoin = (
                ts_b.join(
                   ts_u, sa.and_(ts_b.c[self._bsq.building_id_column_name] == ts_u.c[self._bsq.building_id_column_name],
                                 ts_b.c[self._bsq.timestamp_column_name] == ts_u.c[self._bsq.timestamp_column_name])
                ).join(base, ts_b.c[self._bsq.building_id_column_name] == base.c[self._bsq.building_id_column_name])
            )
        else:
            tbljoin = (
                ts_b.outerjoin(
                   ts_u, sa.and_(ts_b.c[self._bsq.building_id_column_name] == ts_u.c[self._bsq.building_id_column_name],
                                 ts_b.c[self._bsq.timestamp_column_name] == ts_u.c[self._bsq.timestamp_column_name])
                ).join(base, ts_b.c[self._bsq.building_id_column_name] == base.c[self._bsq.building_id_column_name])
            )
        return ts_b, ts_u, tbljoin

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def __get_annual_bs_up_table(self, upgrade_id: str, applied_only: bool):
        if self._bsq.up_table is None:
            raise ValueError("No upgrades table found in database.")
        if applied_only:
            tbljoin = (
                self._bsq.bs_table.join(
                    self._bsq.up_table, sa.and_(self._bsq.bs_table.c[self._bsq.building_id_column_name] ==
                                                self._bsq.up_table.c[self._bsq.building_id_column_name],
                                                self._bsq._up_upgrade_col == upgrade_id,
                                                self._bsq._up_successful_condition))
            )
        else:
            tbljoin = (
                self._bsq.bs_table.outerjoin(
                    self._bsq.up_table, sa.and_(self._bsq.bs_table.c[self._bsq.building_id_column_name] ==
                                                self._bsq.up_table.c[self._bsq.building_id_column_name],
                                                self._bsq._up_upgrade_col == upgrade_id,
                                                self._bsq._up_successful_condition)))

        return self._bsq.bs_table, self._bsq.up_table, tbljoin

    @gather_params(SavingsQuery)
    def savings_shape(
        self, *,
        params: SavingsQuery,
    ) -> Union[pd.DataFrame, str]:
        [self._bsq._get_table(jl[0]) for jl in params.join_list]  # ingress all tables in join list

        upgrade_id = self._bsq._validate_upgrade(params.upgrade_id)
        if params.timestamp_grouping_func and \
                params.timestamp_grouping_func not in ['hour', 'day', 'month']:
            raise ValueError("timestamp_grouping_func must be one of ['hour', 'day', 'month']")

        enduse_cols = self._bsq._get_enduse_cols(
            params.enduses, table="baseline" if params.annual_only else "timeseries")
        partition_by = self._validate_partition_by(params.partition_by)
        total_weight = self._bsq._get_weight(params.weights)
        group_by_selection = self._bsq._process_groupby_cols(params.group_by, annual_only=params.annual_only)

        if params.annual_only:
            ts_b, ts_u, tbljoin = self.__get_annual_bs_up_table(upgrade_id, params.applied_only)
        else:
            params.restrict, ts_restrict = self._bsq._split_restrict(params.restrict)
            bs_group_by, ts_group_by = self._bsq._split_group_by(group_by_selection)
            ts_b, ts_u, tbljoin = self.__get_timeseries_bs_up_table(enduse_cols, upgrade_id, params.applied_only,
                                                                    ts_restrict, ts_group_by)
            ts_group_by = [ts_b.c[c.name] for c in ts_group_by]  # Refer to the columns using ts_b table
            group_by_selection = bs_group_by + ts_group_by
        query_cols = []
        for col in enduse_cols:
            if params.annual_only:
                savings_col = (safunc.coalesce(ts_b.c[col.name], 0) -
                               safunc.coalesce(sa.case((self._bsq._get_success_condition(ts_u),
                                                        ts_u.c[col.name]),
                                               else_=ts_b.c[col.name]), 0)
                               )
            else:
                savings_col = (safunc.coalesce(ts_b.c[col.name], 0) -
                               safunc.coalesce(sa.case((ts_u.c[self._bsq.building_id_column_name] == None, ts_b.c[col.name]),  # noqa E711
                                               else_=ts_u.c[col.name]), 0)
                               )
            query_cols.extend(
                [
                    sa.func.sum(ts_b.c[col.name] *
                                total_weight).label(f"{self._bsq._simple_label(col.name)}__baseline"),
                    sa.func.sum(savings_col * total_weight).label(f"{self._bsq._simple_label(col.name)}__savings"),
                ]
            )
            if params.get_quartiles:
                query_cols.extend(
                    [sa.func.approx_percentile(savings_col, [0, 0.02, 0.25, 0.5, 0.75, 0.98, 1]).
                     label(f"{self._bsq._simple_label(col.name)}__savings__quartiles")
                     ]
                )

        query_cols.extend(group_by_selection)
        if params.annual_only:  # Use annual tables
            grouping_metrics_selection = [safunc.sum(1).label(
                "sample_count"), safunc.sum(1 * total_weight).label("units_count")]
        elif params.collapse_ts:  # Use timeseries tables but collapse timeseries
            rows_per_building = self._bsq._get_rows_per_building()
            grouping_metrics_selection = [(safunc.sum(1) / rows_per_building).label(
                "sample_count"), safunc.sum(total_weight / rows_per_building).label("units_count")]
        elif params.timestamp_grouping_func:
            colname = self._bsq.timestamp_column_name
            # sa.func.dis
            grouping_metrics_selection = [safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column)).
                                          label("sample_count"),
                                          (safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column)) *
                                           safunc.sum(total_weight) / safunc.sum(1)).label("units_count"),
                                          (safunc.sum(1) / safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column))).
                                          label("rows_per_sample"), ]
            sim_info = self._bsq._get_simulation_info()
            time_col = ts_b.c[self._bsq.timestamp_column_name]
            if sim_info.offset > 0:
                # If timestamps are not period begining we should make them so for timestamp_grouping_func aggregation.
                new_col = sa.func.date_trunc(params.timestamp_grouping_func,
                                             sa.func.date_add(sim_info.unit, -sim_info.offset, time_col)).label(colname)
            else:
                new_col = sa.func.date_trunc(params.timestamp_grouping_func, time_col).label(colname)
            grouping_metrics_selection.insert(0, new_col)
            group_by_selection.append(new_col)
        else:
            time_col = ts_b.c[self._bsq.timestamp_column_name].label(self._bsq.timestamp_column_name)
            grouping_metrics_selection = [time_col] + [safunc.sum(1).label(
                "sample_count"), safunc.sum(1 * total_weight).label("units_count")]
            group_by_selection.append(time_col)

        query_cols = grouping_metrics_selection + query_cols
        query = sa.select(query_cols).select_from(tbljoin)
        query = self._bsq._add_join(query, params.join_list)
        query = self._bsq._add_restrict(query, params.restrict)
        if params.annual_only:
            query = query.where(self._bsq._bs_successful_condition)
        query = self._bsq._add_group_by(query, group_by_selection)
        query = self._bsq._add_order_by(query, group_by_selection if params.sort else [])

        compiled_query = self._bsq._compile(query)
        if params.unload_to:
            if partition_by:
                compiled_query = f"UNLOAD ({compiled_query}) \n TO 's3://{params.unload_to}' \n "\
                                 f"WITH (format = 'PARQUET', partitioned_by = ARRAY{partition_by})"
            else:
                compiled_query = f"UNLOAD ({compiled_query}) \n TO 's3://{params.unload_to}' \n "\
                                 f"WITH (format = 'PARQUET')"

        if params.get_query_only:
            return compiled_query

        return self._bsq.execute(compiled_query)

Classes

class BuildStockSavings (buildstock_query: main.BuildStockQuery)

Class for doing savings query (both timeseries and annual).

Expand source code
class BuildStockSavings:
    """Class for doing savings query (both timeseries and annual).
    """

    def __init__(self, buildstock_query: 'main.BuildStockQuery') -> None:
        self._bsq = buildstock_query

    def _validate_partition_by(self, partition_by: Sequence[str]):
        if not partition_by:
            return []
        [self._bsq._get_gcol(col) for col in partition_by]  # making sure all entries are valid
        return partition_by

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def __get_timeseries_bs_up_table(self,
                                     enduses: Sequence[AnyColType],
                                     upgrade_id: str,
                                     applied_only: bool,
                                     restrict:
                                     Sequence[tuple[AnyColType, Union[str, int, Sequence[Union[int, str]]]]
                                              ] = Field(default_factory=list),
                                     ts_group_by: Sequence[Union[AnyColType, tuple[str, str]]
                                                           ] = Field(default_factory=list)):
        if self._bsq.ts_table is None:
            raise ValueError("No timeseries table found in database.")

        ts = self._bsq.ts_table
        base = self._bsq.bs_table
        sa_ts_cols = [ts.c[self._bsq.building_id_column_name],
                      ts.c[self._bsq.timestamp_column_name]] + list(ts_group_by)
        sa_ts_cols.extend(enduses)
        ucol = self._bsq._ts_upgrade_col
        ts_b = self._bsq._add_restrict(sa.select(sa_ts_cols), [[ucol, ("0")]] + list(restrict)).alias("ts_b")
        ts_u = self._bsq._add_restrict(sa.select(sa_ts_cols), [[ucol, (upgrade_id)]] + list(restrict)).alias("ts_u")

        if applied_only:
            tbljoin = (
                ts_b.join(
                   ts_u, sa.and_(ts_b.c[self._bsq.building_id_column_name] == ts_u.c[self._bsq.building_id_column_name],
                                 ts_b.c[self._bsq.timestamp_column_name] == ts_u.c[self._bsq.timestamp_column_name])
                ).join(base, ts_b.c[self._bsq.building_id_column_name] == base.c[self._bsq.building_id_column_name])
            )
        else:
            tbljoin = (
                ts_b.outerjoin(
                   ts_u, sa.and_(ts_b.c[self._bsq.building_id_column_name] == ts_u.c[self._bsq.building_id_column_name],
                                 ts_b.c[self._bsq.timestamp_column_name] == ts_u.c[self._bsq.timestamp_column_name])
                ).join(base, ts_b.c[self._bsq.building_id_column_name] == base.c[self._bsq.building_id_column_name])
            )
        return ts_b, ts_u, tbljoin

    @validate_arguments(config=dict(arbitrary_types_allowed=True, smart_union=True))
    def __get_annual_bs_up_table(self, upgrade_id: str, applied_only: bool):
        if self._bsq.up_table is None:
            raise ValueError("No upgrades table found in database.")
        if applied_only:
            tbljoin = (
                self._bsq.bs_table.join(
                    self._bsq.up_table, sa.and_(self._bsq.bs_table.c[self._bsq.building_id_column_name] ==
                                                self._bsq.up_table.c[self._bsq.building_id_column_name],
                                                self._bsq._up_upgrade_col == upgrade_id,
                                                self._bsq._up_successful_condition))
            )
        else:
            tbljoin = (
                self._bsq.bs_table.outerjoin(
                    self._bsq.up_table, sa.and_(self._bsq.bs_table.c[self._bsq.building_id_column_name] ==
                                                self._bsq.up_table.c[self._bsq.building_id_column_name],
                                                self._bsq._up_upgrade_col == upgrade_id,
                                                self._bsq._up_successful_condition)))

        return self._bsq.bs_table, self._bsq.up_table, tbljoin

    @gather_params(SavingsQuery)
    def savings_shape(
        self, *,
        params: SavingsQuery,
    ) -> Union[pd.DataFrame, str]:
        [self._bsq._get_table(jl[0]) for jl in params.join_list]  # ingress all tables in join list

        upgrade_id = self._bsq._validate_upgrade(params.upgrade_id)
        if params.timestamp_grouping_func and \
                params.timestamp_grouping_func not in ['hour', 'day', 'month']:
            raise ValueError("timestamp_grouping_func must be one of ['hour', 'day', 'month']")

        enduse_cols = self._bsq._get_enduse_cols(
            params.enduses, table="baseline" if params.annual_only else "timeseries")
        partition_by = self._validate_partition_by(params.partition_by)
        total_weight = self._bsq._get_weight(params.weights)
        group_by_selection = self._bsq._process_groupby_cols(params.group_by, annual_only=params.annual_only)

        if params.annual_only:
            ts_b, ts_u, tbljoin = self.__get_annual_bs_up_table(upgrade_id, params.applied_only)
        else:
            params.restrict, ts_restrict = self._bsq._split_restrict(params.restrict)
            bs_group_by, ts_group_by = self._bsq._split_group_by(group_by_selection)
            ts_b, ts_u, tbljoin = self.__get_timeseries_bs_up_table(enduse_cols, upgrade_id, params.applied_only,
                                                                    ts_restrict, ts_group_by)
            ts_group_by = [ts_b.c[c.name] for c in ts_group_by]  # Refer to the columns using ts_b table
            group_by_selection = bs_group_by + ts_group_by
        query_cols = []
        for col in enduse_cols:
            if params.annual_only:
                savings_col = (safunc.coalesce(ts_b.c[col.name], 0) -
                               safunc.coalesce(sa.case((self._bsq._get_success_condition(ts_u),
                                                        ts_u.c[col.name]),
                                               else_=ts_b.c[col.name]), 0)
                               )
            else:
                savings_col = (safunc.coalesce(ts_b.c[col.name], 0) -
                               safunc.coalesce(sa.case((ts_u.c[self._bsq.building_id_column_name] == None, ts_b.c[col.name]),  # noqa E711
                                               else_=ts_u.c[col.name]), 0)
                               )
            query_cols.extend(
                [
                    sa.func.sum(ts_b.c[col.name] *
                                total_weight).label(f"{self._bsq._simple_label(col.name)}__baseline"),
                    sa.func.sum(savings_col * total_weight).label(f"{self._bsq._simple_label(col.name)}__savings"),
                ]
            )
            if params.get_quartiles:
                query_cols.extend(
                    [sa.func.approx_percentile(savings_col, [0, 0.02, 0.25, 0.5, 0.75, 0.98, 1]).
                     label(f"{self._bsq._simple_label(col.name)}__savings__quartiles")
                     ]
                )

        query_cols.extend(group_by_selection)
        if params.annual_only:  # Use annual tables
            grouping_metrics_selection = [safunc.sum(1).label(
                "sample_count"), safunc.sum(1 * total_weight).label("units_count")]
        elif params.collapse_ts:  # Use timeseries tables but collapse timeseries
            rows_per_building = self._bsq._get_rows_per_building()
            grouping_metrics_selection = [(safunc.sum(1) / rows_per_building).label(
                "sample_count"), safunc.sum(total_weight / rows_per_building).label("units_count")]
        elif params.timestamp_grouping_func:
            colname = self._bsq.timestamp_column_name
            # sa.func.dis
            grouping_metrics_selection = [safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column)).
                                          label("sample_count"),
                                          (safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column)) *
                                           safunc.sum(total_weight) / safunc.sum(1)).label("units_count"),
                                          (safunc.sum(1) / safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column))).
                                          label("rows_per_sample"), ]
            sim_info = self._bsq._get_simulation_info()
            time_col = ts_b.c[self._bsq.timestamp_column_name]
            if sim_info.offset > 0:
                # If timestamps are not period begining we should make them so for timestamp_grouping_func aggregation.
                new_col = sa.func.date_trunc(params.timestamp_grouping_func,
                                             sa.func.date_add(sim_info.unit, -sim_info.offset, time_col)).label(colname)
            else:
                new_col = sa.func.date_trunc(params.timestamp_grouping_func, time_col).label(colname)
            grouping_metrics_selection.insert(0, new_col)
            group_by_selection.append(new_col)
        else:
            time_col = ts_b.c[self._bsq.timestamp_column_name].label(self._bsq.timestamp_column_name)
            grouping_metrics_selection = [time_col] + [safunc.sum(1).label(
                "sample_count"), safunc.sum(1 * total_weight).label("units_count")]
            group_by_selection.append(time_col)

        query_cols = grouping_metrics_selection + query_cols
        query = sa.select(query_cols).select_from(tbljoin)
        query = self._bsq._add_join(query, params.join_list)
        query = self._bsq._add_restrict(query, params.restrict)
        if params.annual_only:
            query = query.where(self._bsq._bs_successful_condition)
        query = self._bsq._add_group_by(query, group_by_selection)
        query = self._bsq._add_order_by(query, group_by_selection if params.sort else [])

        compiled_query = self._bsq._compile(query)
        if params.unload_to:
            if partition_by:
                compiled_query = f"UNLOAD ({compiled_query}) \n TO 's3://{params.unload_to}' \n "\
                                 f"WITH (format = 'PARQUET', partitioned_by = ARRAY{partition_by})"
            else:
                compiled_query = f"UNLOAD ({compiled_query}) \n TO 's3://{params.unload_to}' \n "\
                                 f"WITH (format = 'PARQUET')"

        if params.get_query_only:
            return compiled_query

        return self._bsq.execute(compiled_query)

Methods

def savings_shape(self, *, params: SavingsQuery) ‑> Union[str, pandas.core.frame.DataFrame]
Expand source code
@gather_params(SavingsQuery)
def savings_shape(
    self, *,
    params: SavingsQuery,
) -> Union[pd.DataFrame, str]:
    [self._bsq._get_table(jl[0]) for jl in params.join_list]  # ingress all tables in join list

    upgrade_id = self._bsq._validate_upgrade(params.upgrade_id)
    if params.timestamp_grouping_func and \
            params.timestamp_grouping_func not in ['hour', 'day', 'month']:
        raise ValueError("timestamp_grouping_func must be one of ['hour', 'day', 'month']")

    enduse_cols = self._bsq._get_enduse_cols(
        params.enduses, table="baseline" if params.annual_only else "timeseries")
    partition_by = self._validate_partition_by(params.partition_by)
    total_weight = self._bsq._get_weight(params.weights)
    group_by_selection = self._bsq._process_groupby_cols(params.group_by, annual_only=params.annual_only)

    if params.annual_only:
        ts_b, ts_u, tbljoin = self.__get_annual_bs_up_table(upgrade_id, params.applied_only)
    else:
        params.restrict, ts_restrict = self._bsq._split_restrict(params.restrict)
        bs_group_by, ts_group_by = self._bsq._split_group_by(group_by_selection)
        ts_b, ts_u, tbljoin = self.__get_timeseries_bs_up_table(enduse_cols, upgrade_id, params.applied_only,
                                                                ts_restrict, ts_group_by)
        ts_group_by = [ts_b.c[c.name] for c in ts_group_by]  # Refer to the columns using ts_b table
        group_by_selection = bs_group_by + ts_group_by
    query_cols = []
    for col in enduse_cols:
        if params.annual_only:
            savings_col = (safunc.coalesce(ts_b.c[col.name], 0) -
                           safunc.coalesce(sa.case((self._bsq._get_success_condition(ts_u),
                                                    ts_u.c[col.name]),
                                           else_=ts_b.c[col.name]), 0)
                           )
        else:
            savings_col = (safunc.coalesce(ts_b.c[col.name], 0) -
                           safunc.coalesce(sa.case((ts_u.c[self._bsq.building_id_column_name] == None, ts_b.c[col.name]),  # noqa E711
                                           else_=ts_u.c[col.name]), 0)
                           )
        query_cols.extend(
            [
                sa.func.sum(ts_b.c[col.name] *
                            total_weight).label(f"{self._bsq._simple_label(col.name)}__baseline"),
                sa.func.sum(savings_col * total_weight).label(f"{self._bsq._simple_label(col.name)}__savings"),
            ]
        )
        if params.get_quartiles:
            query_cols.extend(
                [sa.func.approx_percentile(savings_col, [0, 0.02, 0.25, 0.5, 0.75, 0.98, 1]).
                 label(f"{self._bsq._simple_label(col.name)}__savings__quartiles")
                 ]
            )

    query_cols.extend(group_by_selection)
    if params.annual_only:  # Use annual tables
        grouping_metrics_selection = [safunc.sum(1).label(
            "sample_count"), safunc.sum(1 * total_weight).label("units_count")]
    elif params.collapse_ts:  # Use timeseries tables but collapse timeseries
        rows_per_building = self._bsq._get_rows_per_building()
        grouping_metrics_selection = [(safunc.sum(1) / rows_per_building).label(
            "sample_count"), safunc.sum(total_weight / rows_per_building).label("units_count")]
    elif params.timestamp_grouping_func:
        colname = self._bsq.timestamp_column_name
        # sa.func.dis
        grouping_metrics_selection = [safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column)).
                                      label("sample_count"),
                                      (safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column)) *
                                       safunc.sum(total_weight) / safunc.sum(1)).label("units_count"),
                                      (safunc.sum(1) / safunc.count(sa.func.distinct(self._bsq.bs_bldgid_column))).
                                      label("rows_per_sample"), ]
        sim_info = self._bsq._get_simulation_info()
        time_col = ts_b.c[self._bsq.timestamp_column_name]
        if sim_info.offset > 0:
            # If timestamps are not period begining we should make them so for timestamp_grouping_func aggregation.
            new_col = sa.func.date_trunc(params.timestamp_grouping_func,
                                         sa.func.date_add(sim_info.unit, -sim_info.offset, time_col)).label(colname)
        else:
            new_col = sa.func.date_trunc(params.timestamp_grouping_func, time_col).label(colname)
        grouping_metrics_selection.insert(0, new_col)
        group_by_selection.append(new_col)
    else:
        time_col = ts_b.c[self._bsq.timestamp_column_name].label(self._bsq.timestamp_column_name)
        grouping_metrics_selection = [time_col] + [safunc.sum(1).label(
            "sample_count"), safunc.sum(1 * total_weight).label("units_count")]
        group_by_selection.append(time_col)

    query_cols = grouping_metrics_selection + query_cols
    query = sa.select(query_cols).select_from(tbljoin)
    query = self._bsq._add_join(query, params.join_list)
    query = self._bsq._add_restrict(query, params.restrict)
    if params.annual_only:
        query = query.where(self._bsq._bs_successful_condition)
    query = self._bsq._add_group_by(query, group_by_selection)
    query = self._bsq._add_order_by(query, group_by_selection if params.sort else [])

    compiled_query = self._bsq._compile(query)
    if params.unload_to:
        if partition_by:
            compiled_query = f"UNLOAD ({compiled_query}) \n TO 's3://{params.unload_to}' \n "\
                             f"WITH (format = 'PARQUET', partitioned_by = ARRAY{partition_by})"
        else:
            compiled_query = f"UNLOAD ({compiled_query}) \n TO 's3://{params.unload_to}' \n "\
                             f"WITH (format = 'PARQUET')"

    if params.get_query_only:
        return compiled_query

    return self._bsq.execute(compiled_query)