Source code for fied.frs.frs_extraction

import requests
import re
import json
import gzip
import logging
import os
import zipfile
import requests
import urllib
import time
import itertools
import pandas as pd
from collections import OrderedDict


logging.basicConfig(level=logging.INFO)


[docs] class FRS: """ Class for extracting relevant facility-level data from EPA's Facility Registration Service (FRS) data. Documentation of FRS data fields: https://www.epa.gov/sites/default/files/2015-09/documents/frs_data_dictionary.pdf """ def __init__(self): self._frs_data_path = os.path.abspath('./data/FRS') # Names of relevant FRS data files and columns. self._names_columns = OrderedDict({ 'PROGRAM': ['REGISTRY_ID', 'SMALL_BUS_IND', 'ENV_JUSTICE_CODE', 'PGM_SYS_ACRNM', 'PGM_SYS_ID', 'SENSITIVE_IND', 'STD_NAME', 'STD_LOC_ADDRESS', 'STD_COUNTY_FIPS', 'STD_CITY_NAME', 'STD_COUNTY_NAME', 'STD_STATE_CODE', 'STD_POSTAL_CODE', 'LEGISLATIVE_DIST_NUM', 'HUC_CODE_8', 'SITE_TYPE_NAME'], 'FACILITY': [ 'REGISTRY_ID', 'EPA_REGION_CODE', 'LATITUDE83', 'LONGITUDE83' ], 'NAICS': ['REGISTRY_ID', 'NAICS_CODE'], 'single': ['REGISTRY_ID', 'PRIMARY_NAME', 'LOCATION_ADDRESS', 'SUPPLEMENTAL_LOCATION', 'CITY_NAME', 'COUNTY_NAME', 'FIPS_CODE', 'STATE_CODE', 'STATE_NAME', 'POSTAL_CODE', 'TRIBAL_LAND_CODE', 'CONGRESSIONAL_DIST_NUM', 'CENSUS_BLOCK_CODE', 'HUC_CODE', 'SITE_TYPE_NAME', 'US_MEXICO_BORDER_IND', 'LATITUDE83', 'LONGITUDE83'] # 'PROGRAM': ['REGISTRY_ID', 'SMALL_BUS_IND', 'ENV_JUSTICE_CODE', # 'PGM_SYS_ACRNM', 'PGM_SYS_ID', 'SENSITIVE_IND'] # 'ORGANIZATION': [ # 'REGISTRY_ID', 'PGM_SYS_ACRNM', 'PGM_SYS_ID', 'EIN', # 'DUNS_NUMBER', 'ORG_NAME', ORG_TYPE # ], #TODO CSV contains useful data, but many ORG_NAME, etc. for a given RegistryID }) # Dictionary of relevant data categories (keys) and variables (values) self._json_format = OrderedDict({ 'site': [ 'CITY_NAME', 'COUNTY_NAME', 'FIPS_CODE', 'STATE_CODE', 'POSTAL_CODE', 'TRIBAL_LAND_CODE', 'CONGRESSIONAL_DIST_NUM', 'CENSUS_BLOCK_CODE', 'HUC_CODE', 'EPA_REGION_CODE' ], 'facility': [ 'PRIMARY_NAME', 'LATITUDE83', 'LONGITUDE83', 'LOCATION_ADDRESS', 'SMALL_BUS_IND', 'ENV_JUSTICE_CODE', 'NAICS_CODE_additional', 'NAICS_CODE', 'SITE_TYPE_NAME', 'SENSITIVE_IND', 'ENERGY_EST_SOURCE' ] })
[docs] def call_all_fips(self): """ Uses Census API to call all state and county fips codes. Excludes U.S. territories and outerlying areas. Combines with file on stat abbrevitions and zip codes. Returns ------- all_fips : json """ fips_url = 'https://api.census.gov/data/2010/dec/sf1?' fips_params = {'get': 'NAME', 'for': 'county:*'} state_abbr_url = \ 'https://www2.census.gov/geo/docs/reference/state.txts' zip_code_url = \ 'https://postalpro.usps.com/mnt/glusterfs/2022-12/ZIP_Locale_Detail.xls' try: r = requests.get(fips_url, params=fips_params) except requests.HTTPError as e: logging.error(f'{e}') else: all_fips_df = pd.DataFrame(r.json()) all_fips_df.columns = all_fips_df.loc[0,:] all_fips_df.drop(0, axis=0, inplace=True) all_fips_df.loc[:, 'state_abbr'] = all_fips_df.state.astype('int') try: state_abbr = pd.read_csv( state_abbr_url, sep='|' ) except urllib.error.HTTPError as e: logging.error(f'Error with fips csv: {e}') else: state_abbr.columns = [c.lower() for c in state_abbr.columns] state_abbr.rename(columns={'state': 'state_abbr'}, inplace=True) try: zip_codes = pd.read_excel(zip_code_url) except urllib.error.HTTPError as e: logging.error(f'Error with zip code xls:{e}') else: zip_codes.columns = [ x.lower().replace(' ', '_') for x in zip_codes.columns ] zip_codes.replace(coumns={'physical_state': 'state_abbr'}, inplace=true) all_fips_df = pd.merge( all_fips_df, state_abbr, on='state_abbr', how='left' ) all_fips_df = pd.merge( all_fips_df, zip_codes[['physical_zip', 'state_abbr']], on='state_abbr', how='left' ) all_fips = all_fips_df.to_json(orient='records') all_fips = json.loads(all_fips) return all_fips
[docs] def download_unzip_frs_data(self, combined=True): """ Download bulk FRS data files from EPA. """ if combined: name = 'combined' else: name = 'single' # Combined file is ~732 MB as of December 2022 frs_url = \ f"https://ordsext.epa.gov/FLA/www3/state_files/national_{name}.zip" zip_path = os.path.abspath( os.path.join(self._frs_data_path, f"national_{name}.zip") ) if not os.path.exists(os.path.abspath(self._frs_data_path)): os.makedirs(os.path.abspath(self._frs_data_path)) else: pass if os.path.exists(zip_path): logging.info(f"FRS {name.capitalize()} zip file exists.") else: logging.info(f"FRS {name.capitalize()} zip file does not exist. Downloading...") r = requests.get(frs_url) try: r.raise_for_status() except requests.exceptions.HTTPError as e: logging.error(f'{e}') with open(zip_path, "wb") as f: f.write(r.content) # Unzip with zipfile with zipfile.ZipFile(zip_path, 'r') as zf: zf.extractall(os.path.abspath(self._frs_data_path)) logging.info(f"FRS {name.capitalize()} file unzipped.") return
[docs] @staticmethod def fix_code(code): """ Fix codes that should be int, not float or str """ try: code_fixed = int(code) except ValueError: return code else: return code_fixed
[docs] def format_program_csv(self, data, programs): """ Builds dataframe from FRS_PROGRAM dataset. Parameters ---------- data : pandas.DataFrame Initial imported DataFrame. programs : list List of program system acronyms to extract. Returns ------- data : pandas.DataFrame Formatted FRS data """ data_dict = {} for a in programs: data_dict[a] = pd.DataFrame(data.query("PGM_SYS_ACRNM==@a")[ ['REGISTRY_ID', 'PGM_SYS_ID', 'PGM_SYS_ACRNM'] ]) data_dict[a].loc[:, f'PGM_SYS_ID_{a}'] = \ data_dict[a].PGM_SYS_ID # Facilities may have >1 program system ID, which # leads to duplicate entries when indexing by REGISTRY_ID. dups = data_dict[a][data_dict[a].REGISTRY_ID.duplicated()].REGISTRY_ID.unique() if len(dups) == 0: continue else: data_dict[a].loc[:, f'PGM_SYS_ID_{a}_additional'] = None for d in dups: ids = data_dict[a][data_dict[a].REGISTRY_ID == d] use_index = ids.drop_duplicates( subset=['REGISTRY_ID'], keep='first' ).index # logging.info(f'registry ID: {d}\nLength of IDs: {len(ids)}') # for i, v in enumerate(ids[1:].index): data_dict[a].at[use_index, f'PGM_SYS_ID_{a}_additional'] =\ ', '.join(ids[1:][f'PGM_SYS_ID_{a}'].to_list()) # data_dict[a].loc[v, f'PGM_SYS_ID_{a}'] data_dict[a].drop_duplicates( subset=['REGISTRY_ID'], keep='first', inplace=True ) data_dict[a].set_index('REGISTRY_ID', inplace=True) data_dict[a].drop( ['PGM_SYS_ACRNM', 'PGM_SYS_ID'], axis=1, inplace=True ) data_dict[a].replace({'N': False, 'Y': True}, inplace=True) data.drop(['PGM_SYS_ACRNM', 'PGM_SYS_ID'], axis=1, inplace=True) data.drop_duplicates(subset=['REGISTRY_ID'], inplace=True) data.replace({'N': False, 'Y': True}, inplace=True) data.set_index('REGISTRY_ID', inplace=True) pgm_data = pd.concat( [data_dict[k] for k in data_dict.keys()], axis=1 ) data = data.join(pgm_data) # pgm_data = pgm_data.reindex(index=data.index) # pgm_data.update(data) # data = pd.DataFrame(pgm_data) data.reset_index(inplace=True) return data
[docs] def format_naics_csv(self, data): """ Builds dataframe from FRS_FACILITY dataset. Parameters ---------- data : pandas.DataFrame Initial imported DataFrame. Returns ------- data : pandas.DataFrame Formatted FRS data """ # Duplicate NAICS codes for facililities. Keep first # and move remaining to NAICS_CODE_additional. # Assume that a facility with any industry NAICS # code (i.e., 11, 21, 23, 31-33) is # an industrial facility all_naics = pd.DataFrame( data.NAICS_CODE.unique(), columns=['NAICS_CODE'] ) all_naics.loc[:, 'ind'] = all_naics.NAICS_CODE.apply( lambda x: int(str(x)[0:2]) in [11, 21, 23, 31, 32, 33] ) data = pd.merge(data, all_naics, on='NAICS_CODE', how='left') data = data.query("ind==True") data_unique = data.drop_duplicates( subset=['REGISTRY_ID'], keep='first' ) dups = data[~data.index.isin(data_unique.index)] dups = dups.groupby('REGISTRY_ID').apply( lambda x: list(x['NAICS_CODE'].unique()) ) dups.name = 'NAICS_CODE_additional' data = pd.merge( data_unique, dups, on='REGISTRY_ID', how='left' ) data.drop(['ind'], axis=1, inplace=True) return data
[docs] def read_frs_csv(self, name, columns, programs=['EIS', 'E-GGRT']): """ Builds dataframe based on FRS datasets. Parameters ---------- name : str String for name of FRS csv file. All csv files extracted from national_combined.zip are named according to "NATIONAL_{name}_FILE.CSV". columns : list List of columns to extract from csv. programs : list; ['EIS', 'E-GGRT'] List of program system acronyms to extract from NATIONAL_PROGRAM_FILE.CSV. Returns ------- data : pandas.DataFrame Formatted FRS data, based on FACILITY, ORGANIZATION, NAICS, and PROGRAM datasets. """ if name == 'single': file = f'NATIONAL_{name.upper()}.CSV' else: file = f'NATIONAL_{name}_FILE.CSV' file_path = os.path.abspath(os.path.join(self._frs_data_path, file)) data = pd.read_csv( file_path, usecols=columns, low_memory=False ) if name == 'PROGRAM': data = self.format_program_csv(data, programs) elif name == 'NAICS': data = self.format_naics_csv(data) elif name == 'FACILITY': pass return data
[docs] def build_frs_json(self, frs_data_df, save_path=None, ret=False): """ Parameters ---------- frs_data_df : pandas.DataFrame Dataframe from formatted FRS csv datasets. ret : bool; default == False Returns FRS data in json format. save_path : str; default == None Directory to save FRS data in json file. Must specify to save. Returns ------- frs_json : json, optional. Dictionary of facility data extracted from FRS in JSON format. """ # Fix formatting fix_codes = ['NAICS_CODE', 'POSTAL_CODE', 'CONGRESSIONAL_DIST_NUM', 'CENSUS_BLOCK_CODE', 'HUC_CODE', 'EPA_REGION_CODE', ] for code in fix_codes: frs_data_df.loc[:, code] = frs_data_df[code].apply( lambda x: FRS.fix_code(x) ) if frs_data_df.index.name == 'REGISTRY_ID': pass else: frs_data_df.set_index('REGISTRY_ID', inplace=True) # Must first transpose DF frs_data_df = frs_data_df.T frs_data_df.index.name = 'VARIABLE' frs_data_df.reset_index(inplace=True) frs_data_df.loc[:, 'CATEGORY'] = None for i in frs_data_df.index: for cat, v in self._json_format.items(): if frs_data_df.at[i, 'VARIABLE'] in v: frs_data_df.at[i, 'CATEGORY'] = cat else: continue frs_data_df.set_index(['CATEGORY', 'VARIABLE'], inplace=True) val_dict = \ {k: frs_data_df.xs(k).to_dict() for k in self._json_format.keys()} # nested dict, e.g., {'site' : {1000: {NAICS_CODE: 2111}}} # Previous approach used dict.fromkeys, which didn't work for setting values from # val_dict frs_dict = { k: [{c: v[k]} for c, v in val_dict.items()] for k in frs_data_df.columns } if save_path: with gzip.open(os.path.join(save_path, 'found_ind_data.json.gz'), 'wt', encoding="ascii") as f: json.dump(frs_dict, f, sort_keys=True, indent=4) else: pass if ret: frs_json = json.dump(frs_dict) return frs_json else: pass
[docs] def add_frs_columns_json(self, frs_data_df): """ Add columns that capture multiple program IDs. """ logging.info(f'Starting fields:{self._json_format["facility"]}') for c in frs_data_df.columns: if 'PGM_SYS_ID' in c: self._json_format['facility'].append(c) else: continue logging.info(f'Ending fields:{self._json_format["facility"]}') return
[docs] def import_format_frs(self, combined=True): """ Import and format downloaded frs files Parameters ---------- file_dir : str Directory of FRS files. combined : bool; default is True Indicate whether the data set is constructed using the EPA FRS single file or combined files. Returns ------- final_data : pandas.DataFrame DataFrame indexed by REGISTRY_ID, containing relevant site and facility data from EPA FRS. """ # Reminder that self._names_columns is an ordered dict pgm_data = self.read_frs_csv( name='PROGRAM', columns=self._names_columns['PROGRAM'] ) naics_data = self.read_frs_csv( name='NAICS', columns=self._names_columns['NAICS'] ) if combined: fac_data = self.read_frs_csv( name='FACILITY', columns=self._names_columns['FACILITY'] ) final_data = pd.merge( pgm_data, fac_data, on='REGISTRY_ID', how='right' ) else: fac_data = self.read_frs_csv( name='single', columns=self._names_columns['single'] ) final_data = pd.merge( fac_data, pgm_data, on='REGISTRY_ID', how='left' ) final_data = pd.merge( final_data, naics_data, on='REGISTRY_ID', how='left' ) # All dataframes but the NAICS dataframe have non-industrial # facilities. Drop facilities that don't have NAICS codes after # merging. final_data.dropna(subset=['NAICS_CODE'], inplace=True) final_data.rename(columns={ 'REGISTRY_ID': 'registryID', 'LEGISLATIVE_DIST_NUM': 'legislativeDistrictNumber', 'HUC_CODE_8': 'hucCode8', 'SITE_TYPE_NAME': 'siteTypeName', 'STD_NAME': 'name', 'STD_LOC_ADDRESS': 'locationAddress', 'STD_POSTAL_CODE': 'postalCode', 'STD_CITY_NAME': 'cityName', 'STD_COUNTY_NAME': 'countyName', 'STD_STATE_CODE': 'stateCode', 'STD_COUNTY_FIPS': 'countyFIPS', 'SENSITIVE_IND': 'sensitiveInd', 'SMALL_BUS_IND': 'smallBusInd', 'ENV_JUSTICE_CODE': 'envJusticeCode', 'PGM_SYS_ID_EIS': 'eisFacilityID', 'PGM_SYS_ID_EIS_additional': 'eisFacilityIDAdditional', 'PGM_SYS_ID_E-GGRT': 'ghgrpID', 'PGM_SYS_ID_E-GGRT_additional': 'ghgrpIDAdditional', 'EPA_REGION_CODE': 'epaRegionCode', 'LATITUDE83': 'latitude', 'LONGITUDE83': 'longitude', 'NAICS_CODE': 'naicsCode', 'NAICS_CODE_additional': 'naicsCodeAdditional' }, inplace=True) # for i, v in enumerate(self._names_columns.items()): # if i == 0: # frs_data_df = self.read_frs_csv(name=v[0], columns=v[1]) # frs_data_df.set_index('REGISTRY_ID', inplace=True) # logging.info(f'File name: {v[0]}\nDF len: {len(frs_data_df)}') # elif i < 4: # data = self.read_frs_csv(name=v[0], columns=v[1]) # data.set_index('REGISTRY_ID', inplace=True) # logging.info(f'File name: {v[0]}\nDF len: {len(data)}') # frs_data_df = pd.merge( # frs_data_df, data, left_index=True, # right_index=True, how='left' # ) # logging.info(f'File len: {len(frs_data_df)}') # else: # continue logging.info(f'Final len: {len(final_data)}') final_data.set_index('registryID', inplace=True) return final_data
# TODO
[docs] @staticmethod def load_foundational_json(found_json_file): """ Load json file of foundational energy data. """ with gzip.open(found_json_file, mode='rb') as gzfile: # with json.load(gfile) as jfile: json_data = pd.DataFrame.from_dict(json.load(gzfile), orient='index') frs_data = pd.DataFrame(index=json_data.index) for c in json_data.columns: column_data = pd.concat( [pd.DataFrame.from_dict(json_data.iloc[i, c]).T for i in range(0, len(json_data))], axis=0 ) column_data.index = frs_data.index frs_data = pd.concat( [frs_data, column_data], axis=1 )
[docs] @staticmethod def find_eis(acrnm): """ Pull out EIS ID from program system field Parameters ---------- acrnm : str String of program system names and IDs Returns ------- eis : str or None Returns string if EIS in program system field; None if not. """ eis = re.search(r'(?<=EIS:)\w+', acrnm) try: eis = eis.group() except AttributeError: eis = None return eis
if __name__ == '__main__': # t_start = time.perf_counter() combined = True frs_methods = FRS() frs_methods.download_unzip_frs_data(combined=combined) frs_data_df = frs_methods.import_format_frs(combined=combined) frs_data_df.to_csv('./data/FRS/frs_data_formatted.csv') # t_stop = time.perf_counter() # logging.info(f'Program time: {t_stop - t_start:0.2f} seconds')