Source code for fied.geocoder.geo_tools


import pandas as pd
import math
import requests
import os
import concurrent.futures
import logging

logging.basicConfig(level=logging.INFO)


[docs] def fix_county_fips(df): """ County FIPS should be strings. Use geoID or censusBlock to replace existing county FIPS. Parameters ---------- df : pandas.DataFrame DataFrame with either geoID or censusBlock in the columns Returns ------- df : pandas.DataFrame DataFrame with updated countyFIPS """ for c in ['geoID', 'countyFIPS']: if c in df.columns: geo_column = c else: continue df.countyFIPS.update( df.dropna( subset=[geo_column] )[geo_column].astype(str).apply(lambda x: x[0:5]) ) missing = df[df[geo_column].isnull()] missing_str = missing.countyFIPS.dropna().astype(int).astype(str) df.countyFIPS.update( missing_str[missing_str.apply(lambda x: len(x)==5)] ) df.countyFIPS.update( missing_str[missing_str.apply(lambda x: len(x)==4)].apply(lambda x: f'0{x}') ) # Assume that the countyFIPS with len <4 are missing the state FIPS. state_fips = pd.read_csv( 'https://www2.census.gov/geo/docs/reference/state.txt', usecols=[0, 1], names=['statefips', 'stateCode'], sep='|', header=0, dtype=str, index_col=['stateCode'] ) for n in range(1, 4): data = missing_str[missing_str.apply(lambda x: len(x)==n)] data = df.loc[data.index, 'stateCode'].map( state_fips.to_dict()['statefips'] ) + data.apply(lambda x: f'{(3-n)*"0"}{x}') data.name = 'countyFIPS' df.countyFIPS.update(data) return df
[docs] def find_missing_congress(df): """" Update Congressional Districts to 118th Congress, for 2020. Update the FRS column legislativeDistrictNumber """ cd = pd.read_csv( 'https://www2.census.gov/geo/docs/maps-data/data/rel2020/cd-sld/tab20_cd11820_county20_natl.txt', sep='|', header=0, usecols=[1, 8], dtype='str', names=['congressionalDistrictNumber','countyFIPS'] ) cd = dict(cd[['countyFIPS', 'congressionalDistrictNumber']].values) df.loc[:, 'legislativeDistrictNumber'] = df.countyFIPS.map(cd) # Merge created duplicate entries and was slower than mapping with dict. # df = pd.merge( # df, cd, # on='countyFIPS', # how='left' # ) return df
[docs] def fcc_block_api(lat_lon, census_year=2020): """ Call FCC's Area API (https://geo.fcc.gov/api/census/) with lat, lon coordinates to return the corresponding Census Block. Parameters ---------- lat_lon : list List of lat, lon coordinates census_year : default is 2020 Returns ------- block : int or None Census block. Returns None if there is no corresponding block (e.g., offshore oil platform) """ url = 'https://geo.fcc.gov/api/census/block/find?' params = { 'latitude': lat_lon[0], 'longitude': lat_lon[1], 'censusYear': census_year, 'showall': True, 'format': 'json' } try: r = requests.get(url, params=params, timeout=(1, 3)) logging.info(f'{lat_lon[0]}, {lat_lon[1]}') except requests.exceptions.ConnectionError: logging.error( f'ConnectionError: latitude ({lat_lon[0]}), longitude ({lat_lon[1]})' ) block = None except requests.exceptions.ReadTimeout: logging.error( f'ReadTimeout exception: latitude ({lat_lon[0]}), longitude ({lat_lon[1]})' ) block = None else: try: block = r.json()['Block']['FIPS'] except: block = None return block
[docs] def get_blocks_parallelized(df): """ Paraellization for FCC API. Final industrial data has ~360,000 unique lat, lon coordinates. Parameters ---------- df : pandas.DataFrame Final foundational energy dataframe Returns ------- df : pandas.DataFrame Final foundational energy dataframe with new colum for censusBlock """ all_latlon = pd.DataFrame( df.drop_duplicates(['latitude', 'longitude'])[['latitude', 'longitude']] ).values results = [] executor = concurrent.futures.ThreadPoolExecutor() # max_workers=65 for result in executor.map(fcc_block_api, all_latlon): results.append(result) latlon_block = pd.DataFrame(all_latlon, columns=['latitude', 'longitude']) latlon_block.loc[:, 'censusBlock'] = results df = pd.merge(df, latlon_block, on=['latitude', 'longitude'], how='left') return df
# Abandoned geocoder approach # def create_geocode_batch(df, benchmark="2020", vintage="2020"): # """ # Creates CSV's to submit to Census Geocoder # (https://www.census.gov/programs-surveys/geography/technical-documentation/complete-technical-documentation/census-geocoder.html) # Format is registryID, locationAddress, cityName, stateCode, postalCode. # Each CSV is limited to 10,000 records. # Parameters # ---------- # df : pandas.DataFrame # Final data dataframe. # benchmark : str; default is "2020". # Other available benchmarks: https://geocoding.geo.census.gov/geocoder/benchmarks. # vintage : str; default is "2020". Dependant on benchmark. # Vintages are based on benchmark; see https://geocoding.geo.census.gov/geocoder/vintages?benchmark=benchmarkId., # Returns # ------- # df : pandas.DataFrame # """ # url = 'http://geocoding.geo.census.gov/geocoder/geographies/addressbatch' # params = { # 'returntype': "geographies", # 'benchmark': benchmark, # 'vintage': vintage, # } # geoinfo = pd.DataFrame() # cols = ['registryID', 'locationAddress', 'cityName', 'stateCode', 'postalCode'] # cols_i = [] # for c in cols: # cols_i.append(df.columns.to_list().index(c)) # geo_df = pd.DataFrame(df.drop_duplicates(subset=cols)) # geo_df.reset_index(drop=True, inplace=True) # chunksize = 10000 # n_chunks = math.ceil(len(geo_df) / chunksize) # for n in range(n_chunks): # file_path_name = os.path.abspath(f'./geocoder/Addresses.csv') # # file_path_name = os.path.abspath(f'./geocoder/data_to_geocoder_{n}.csv') # geo_df.iloc[n*chunksize:(n+1)*chunksize, cols_i].to_csv( # file_path_name, index=False, header=False # ) # files = {'addressFile': (file_path_name, open(file_path_name, 'rb'), 'text/csv')} # r = requests.post(url, files=files, data=params)