Source code for python.excel.excel_objects

import openpyxl as xl
from io import StringIO
import numpy as np
import pandas as pd
import datetime

[docs]class ExcelError(Exception): pass
[docs]class FancyNamedRange(object): def __init__(self, workbook, range_name): self.wb = workbook self.base = self.__base__(range_name) self.worksheets = self.__worksheets__() self.count_worksheets = self.__count_destination_components__(self.worksheets) if self.count_worksheets > 1: raise NotImplementedError("Named Ranges spanning multiple worksheets are not currently supported") self.worksheet = self.worksheets[0] self.cell_ranges = self.__cell_ranges__() self.count_cell_ranges = self.__count_destination_components__(self.cell_ranges) if self.count_cell_ranges > 1: raise NotImplementedError("Named Ranges spanning multiple, non-contiguous cell ranges are not currently supported") self.cell_range = self.cell_ranges[0] self.topleft = self.__topleft__() self.bottomright = self.__bottomright__() self.cells = self.__cells__() self.cell_array = self.__cell_array__() self.rec_array = self.__rec_array__() self.data_frame = self.__data_frame__() def __colnames_included__(self): self.rec_array = self.__rec_array__(colnames_included = True) self.data_frame = self.__data_frame__(colnames_included = True) def __melt__(self): self.__colnames_included__() # find the data type for the column names dtype = self.data_frame.columns.dtype self.data_frame = pd.melt(self.data_frame, self.data_frame.columns[0]) # set the data type for the variable column self.data_frame.loc[:, 'variable'] = self.data_frame['variable'].astype(dtype) def __transpose_values__(self): self.cell_array = self.cell_array.T self.rec_array = self.__rec_array__() self.data_frame = self.__data_frame__() def __base__(self, range_name): # get the named range object base = self.wb.defined_names[range_name] # raise an error if the named range doesn't exist if base == None: raise ExcelError('{} named range does not exist.'.format(range_name)) return base def __worksheets__(self): worksheets = list(np.array([(a, b) for a, b in self.base.destinations])[:,0]) return worksheets def __cell_ranges__(self): cell_ranges = list(np.array([(a, b) for a, b in self.base.destinations])[:,1]) return cell_ranges def __count_destination_components__(self, destination_component): count = len(list(set(destination_component))) return count def __topleft__(self): if ':' in self.cell_ranges[0]: coordinates = self.cell_ranges[0].split(':')[0] else: coordinates = self.cell_ranges[0] return coordinates def __bottomright__(self): if ':' in self.cell_ranges[0]: coordinates = self.cell_ranges[0].split(':')[1] else: coordinates = self.cell_ranges[0] return coordinates
[docs] def contents_to_array(self): pass
def __columns__(self): self.topleft.split('$') def __cells__(self): cells = [cell for cell in self.wb[self.worksheet][self.topleft : self.bottomright]] return cells def __cell_array__(self): cell_array = np.array(self.cells) if cell_array.shape == (): cell_array = cell_array.reshape((1,1)) return cell_array def __cell_value__(self, cell, floats = True): if floats == True and cell.data_type == 'n' and type(cell.value) != datetime.datetime: if cell.value is None: cell_value = float(np.nan) else: cell_value = float(cell.value) else: cell_value = cell.value if cell_value is None: cell_value = np.nan return cell_value def __rec_array__(self, colnames_included = False): cell_values = np.vectorize(self.__cell_value__) if colnames_included == True: i_begin = 1 else: i_begin = 0 cols = [] for j in range(self.cell_array.shape[1]): col = cell_values(self.cell_array[i_begin:, j]) if col.dtype.kind not in ('S', 'b', 'U', 'O') and np.all(col.astype('int') == col): col = col.astype('int') cols.append(col) rec_array = np.rec.fromarrays(cols) if colnames_included == True: names = cell_values(self.cell_array[0, :], floats = False) rec_array.dtype.names = list(map(str, list(names))) return rec_array def __data_frame__(self, colnames_included = False): df = pd.DataFrame(self.rec_array) if colnames_included == False: ncols = df.shape[1] df.columns = list(range(0, ncols)) return df
[docs] def first_value(self): first_value = self.data_frame.iloc[0][0] return first_value
[docs] def to_stringIO(self, transpose = False, columns = None, index = False, header = False): s = StringIO() if columns == None: columns = self.data_frame.columns if transpose: out_df = self.data_frame[columns].T else: out_df = self.data_frame[columns] try: out_df.to_csv(s, delimiter = ',', index = index, header = header) except: out_df.to_csv(s, index=index, header=header) s.seek(0) return s
[docs] def to_postgres(self, connection, cursor, schema, table, transpose = False, columns = None, create = False, overwrite = True): sql_dict = {'schema': schema, 'table': table } if create == True: raise NotImplementedError('Creation of a new postgres table is not implemented') s = self.to_stringIO(transpose, columns) connection.commit() if overwrite == True: sql = 'DELETE FROM {}.{};'.format(schema, table) cursor.execute(sql) connection.commit() sql = '{}.{}'.format(schema, table) cursor.copy_from(s, sql, sep = ',', null = '') connection.commit() # release the string io object s.close()