Source code for ogusa.calibrate

from ogusa import estimate_beta_j, bequest_transmission
from ogusa import macro_params, transfer_distribution, income
from ogusa import get_micro_data
import os
import numpy as np
from taxcalc import Records
from ogcore import txfunc, demographics
from ogcore.utils import safe_read_pickle, mkdirs


[docs] class Calibration: """OG-USA calibration class""" def __init__( self, p, estimate_tax_functions=False, estimate_beta=False, estimate_chi_n=False, estimate_pop=False, tax_func_path=None, iit_baseline=None, iit_reform={}, guid="", data="cps", gfactors=None, weights=None, records_start_year=Records.CPSCSV_YEAR, client=None, num_workers=1, demographic_data_path=None, output_path=None, ): """ Constructor for the Calibration class. This class is used to find parameter values for the OG-USA model. Args: p (OG-USA Parameters object): parameters object estimate_tax_functions (bool): whether to estimate tax functions estimate_beta (bool): whether to estimate beta estimate_chi_n (bool): whether to estimate chi_n estimate_pop (bool): whether to estimate population tax_func_path (str): path to tax function parameters iit_baseline (dict): baseline policy to use iit_reform (dict): reform tax parameters guid (str): id for tax function parameters data (str or Pandas DataFrame): path or DataFrame with data for Tax-Calculator model gfactors (str or Pandas DataFrame ): path or DataFrame with growth factors for Tax-Calculator model weights (str or Pandas DataFrame): path or DataFrame with weights for Tax-Calculator model records_start_year (int): year micro data begins client (Dask client object): client num_workers (int): number of workers for Dask client output_path (str): path to save output to Returns: Calibration class object instance """ # Create output_path if it doesn't exist if output_path is not None: if not os.path.exists(output_path): os.makedirs(output_path) self.estimate_tax_functions = estimate_tax_functions self.estimate_beta = estimate_beta self.estimate_chi_n = estimate_chi_n self.estimate_pop = estimate_pop if estimate_tax_functions: if tax_func_path is not None: run_micro = False else: run_micro = True self.tax_function_params = self.get_tax_function_parameters( p, iit_baseline, iit_reform, guid, data, gfactors, weights, records_start_year, client, num_workers, run_micro=run_micro, tax_func_path=tax_func_path, ) if self.estimate_beta: self.beta_j = estimate_beta_j.beta_estimate(self) # if estimate_chi_n: # chi_n = self.get_chi_n() # Macro estimation self.macro_params = macro_params.get_macro_params() # eta estimation self.eta = transfer_distribution.get_transfer_matrix( p.J, p.lambdas, output_path=output_path ) # zeta estimation self.zeta = bequest_transmission.get_bequest_matrix( p.J, p.lambdas, output_path=output_path ) # demographics if estimate_pop: self.demographic_params = demographics.get_pop_objs( p.E, p.S, p.T, 0, 99, initial_data_year=p.start_year - 1, final_data_year=p.start_year, GraphDiag=False, download_path=demographic_data_path, ) # demographics for 80 period lives (needed for getting e below) demog80 = demographics.get_pop_objs( 20, 80, p.T, 0, 99, initial_data_year=p.start_year - 1, final_data_year=p.start_year, GraphDiag=False, ) # earnings profiles self.e = income.get_e_interp( p.S, self.demographic_params["omega_SS"], demog80["omega_SS"], p.lambdas, plot_path=output_path, ) else: self.e = income.get_e_interp( p.S, p.omega_SS, p.omega_SS, p.lambdas, plot_path=output_path, ) # Tax Functions
[docs] def get_tax_function_parameters( self, p, iit_baseline=None, iit_reform={}, guid="", data="", gfactors=None, weights=None, records_start_year=Records.CPSCSV_YEAR, client=None, num_workers=1, run_micro=False, tax_func_path=None, ): """ Reads pickle file of tax function parameters or estimates the parameters from microsimulation model output. Args: p (OG-Core Parameters object): parameters object iit_baseline (dict): baseline policy to use iit_reform (dict): reform tax parameters guid (string): id for tax function parameters data (str or Pandas DataFrame): path or DataFrame with data for Tax-Calculator model gfactors (str or Pandas DataFrame ): path or DataFrame with growth factors for Tax-Calculator model weights (str or Pandas DataFrame): path or DataFrame with weights for Tax-Calculator model records_start_year (int): year micro data begins client (Dask client object): client num_workers (int): number of workers for Dask client run_micro (bool): whether to estimate parameters from microsimulation model tax_func_path (string): path where find or save tax function parameter estimates Returns: None """ # set paths if none given if tax_func_path is None: if p.baseline: pckl = "TxFuncEst_baseline{}.pkl".format(guid) tax_func_path = os.path.join(p.output_base, pckl) print("Using baseline tax parameters from ", tax_func_path) else: pckl = "TxFuncEst_policy{}.pkl".format(guid) tax_func_path = os.path.join(p.output_base, pckl) print( "Using reform policy tax parameters from ", tax_func_path ) # create directory for tax function pickles to be saved to mkdirs(os.path.split(tax_func_path)[0]) # If run_micro is false, check to see if parameters file exists # and if it is consistent with Specifications instance if not run_micro: dict_params, run_micro = self.read_tax_func_estimate( p, tax_func_path ) taxcalc_version = "Cached tax parameters, no taxcalc version" if run_micro: micro_data, taxcalc_version = get_micro_data.get_data( baseline=p.baseline, start_year=p.start_year, iit_baseline=iit_baseline, iit_reform=iit_reform, data=data, path=p.output_base, client=client, num_workers=num_workers, ) p.BW = len(micro_data) dict_params = txfunc.tax_func_estimate( # pragma: no cover micro_data, p.BW, p.S, p.starting_age, p.ending_age, start_year=p.start_year, analytical_mtrs=p.analytical_mtrs, tax_func_type=p.tax_func_type, age_specific=p.age_specific, client=client, num_workers=num_workers, tax_func_path=tax_func_path, ) mean_income_data = dict_params["tfunc_avginc"][0] frac_tax_payroll = np.append( dict_params["tfunc_frac_tax_payroll"], np.ones(p.T + p.S - p.BW) * dict_params["tfunc_frac_tax_payroll"][-1], ) # Conduct checks to be sure tax function params are consistent # with the model run params_list = ["etr", "mtrx", "mtry"] BW_in_tax_params = dict_params["BW"] start_year_in_tax_params = dict_params["start_year"] S_in_tax_params = len(dict_params["tfunc_etr_params_S"][0]) # Check that start years are consistent in model and cached tax functions if p.start_year != start_year_in_tax_params: print( "Input Error: There is a discrepancy between the start" + " year of the model and that of the tax functions!!" ) assert False # Check that S is consistent in model and cached tax functions # Note: even if p.age_specific = False, the arrays coming from # ogcore.txfunc_est should be of length S if p.S != S_in_tax_params: print( "Input Error: There is a discrepancy between the ages" + " used in the model and those in the tax functions!!" ) assert False # Extrapolate tax function parameters for years after budget window # list of list: BW x S - either an array of function at that element... etr_params = [[None] * p.S] * p.T mtrx_params = [[None] * p.S] * p.T mtry_params = [[None] * p.S] * p.T for s in range(p.S): for t in range(p.T): if t < p.BW: etr_params[t][s] = dict_params["tfunc_etr_params_S"][t][s] mtrx_params[t][s] = dict_params["tfunc_mtrx_params_S"][t][ s ] mtry_params[t][s] = dict_params["tfunc_mtry_params_S"][t][ s ] else: etr_params[t][s] = dict_params["tfunc_etr_params_S"][-1][s] mtrx_params[t][s] = dict_params["tfunc_mtrx_params_S"][-1][ s ] mtry_params[t][s] = dict_params["tfunc_mtry_params_S"][-1][ s ] if p.constant_rates: print("Using constant rates!") # Make all tax rates equal the average p.tax_func_type = "linear" etr_params = [[None] * p.S] * p.T mtrx_params = [[None] * p.S] * p.T mtry_params = [[None] * p.S] * p.T for s in range(p.S): for t in range(p.T): if t < p.BW: etr_params[t][s] = dict_params["tfunc_avg_etr"][t] mtrx_params[t][s] = dict_params["tfunc_avg_mtrx"][t] mtry_params[t][s] = dict_params["tfunc_avg_mtry"][t] else: etr_params[t][s] = dict_params["tfunc_avg_etr"][-1] mtrx_params[t][s] = dict_params["tfunc_avg_mtrx"][-1] mtry_params[t][s] = dict_params["tfunc_avg_mtry"][-1] if p.zero_taxes: print("Zero taxes!") etr_params = [[0] * p.S] * p.T mtrx_params = [[0] * p.S] * p.T mtry_params = [[0] * p.S] * p.T tax_param_dict = { "etr_params": etr_params, "mtrx_params": mtrx_params, "mtry_params": mtry_params, "taxcalc_version": taxcalc_version, "mean_income_data": mean_income_data, "frac_tax_payroll": frac_tax_payroll, } return tax_param_dict
[docs] def read_tax_func_estimate(self, p, tax_func_path): """ This function reads in tax function parameters from pickle files. Args: tax_func_path (str): path to pickle with tax function parameter estimates Returns: dict_params (dict): dictionary containing arrays of tax function parameters run_micro (bool): whether to estimate tax function parameters """ flag = 0 if os.path.exists(tax_func_path): print("Tax Function Path Exists") dict_params = safe_read_pickle(tax_func_path) # check to see if tax_functions compatible try: if p.start_year != dict_params["start_year"]: print( "Model start year not consistent with tax " + "function parameter estimates" ) flag = 1 except KeyError: pass try: p.BW = dict_params["BW"] # QUICK FIX if p.BW != dict_params["BW"]: print( "Model budget window length is " + str(p.BW) + " but the tax function parameter " + "estimates have a budget window length of " + str(dict_params["BW"]) ) flag = 1 except KeyError: pass try: if p.tax_func_type != dict_params["tax_func_type"]: print( "Model tax function type is not " + "consistent with tax function parameter " + "estimates" ) flag = 1 except KeyError: pass if flag >= 1: raise RuntimeError( "Tax function parameter estimates at given path" + " are not consistent with model parameters" + " specified." ) else: flag = 1 print( "Tax function parameter estimates do not exist at" + " given path. Running new estimation." ) if flag >= 1: dict_params = None run_micro = True else: run_micro = False return dict_params, run_micro
# method to return all newly calibrated parameters in a dictionary def get_dict(self): dict = {} if self.estimate_tax_functions: dict.update(self.tax_function_params) if self.estimate_beta: dict["beta_annual"] = self.beta if self.estimate_chi_n: dict["chi_n"] = self.chi_n dict["eta"] = self.eta dict["zeta"] = self.zeta dict.update(self.macro_params) dict["e"] = self.e if self.estimate_pop: dict.update(self.demographic_params) return dict