Source code for ogcore.household

"""
------------------------------------------------------------------------
Household functions.
------------------------------------------------------------------------
"""

# Packages
import numpy as np
from ogcore import tax, utils

"""
------------------------------------------------------------------------
    Functions
------------------------------------------------------------------------
"""


[docs] def marg_ut_cons(c, sigma): r""" Compute the marginal utility of consumption. .. math:: MU_{c} = c^{-\sigma} Args: c (array_like): household consumption sigma (scalar): coefficient of relative risk aversion Returns: output (array_like): marginal utility of consumption """ if np.ndim(c) == 0: c = np.array([c]) epsilon = 0.003 cvec_cnstr = c < epsilon MU_c = np.zeros(c.shape) MU_c[~cvec_cnstr] = c[~cvec_cnstr] ** (-sigma) b2 = (-sigma * (epsilon ** (-sigma - 1))) / 2 b1 = (epsilon ** (-sigma)) - 2 * b2 * epsilon MU_c[cvec_cnstr] = 2 * b2 * c[cvec_cnstr] + b1 output = MU_c output = np.squeeze(output) return output
[docs] def marg_ut_labor(n, chi_n, p): r""" Compute the marginal disutility of labor. .. math:: MDU_{l} = \chi^n_{s}\biggl(\frac{b}{\tilde{l}}\biggr) \biggl(\frac{n_{j,s,t}}{\tilde{l}}\biggr)^{\upsilon-1} \Biggl[1-\biggl(\frac{n_{j,s,t}}{\tilde{l}}\biggr)^\upsilon \Biggr]^{\frac{1-\upsilon}{\upsilon}} Args: n (array_like): household labor supply chi_n (array_like): utility weights on disutility of labor p (OG-Core Specifications object): model parameters Returns: output (array_like): marginal disutility of labor supply """ nvec = n if np.ndim(nvec) == 0: nvec = np.array([nvec]) eps_low = 0.000001 eps_high = p.ltilde - 0.000001 nvec_low = nvec < eps_low nvec_high = nvec > eps_high nvec_uncstr = np.logical_and(~nvec_low, ~nvec_high) MDU_n = np.zeros(nvec.shape) MDU_n[nvec_uncstr] = ( (p.b_ellipse / p.ltilde) * ((nvec[nvec_uncstr] / p.ltilde) ** (p.upsilon - 1)) * ( (1 - ((nvec[nvec_uncstr] / p.ltilde) ** p.upsilon)) ** ((1 - p.upsilon) / p.upsilon) ) ) b2 = ( 0.5 * p.b_ellipse * (p.ltilde ** (-p.upsilon)) * (p.upsilon - 1) * (eps_low ** (p.upsilon - 2)) * ( (1 - ((eps_low / p.ltilde) ** p.upsilon)) ** ((1 - p.upsilon) / p.upsilon) ) * ( 1 + ((eps_low / p.ltilde) ** p.upsilon) * ((1 - ((eps_low / p.ltilde) ** p.upsilon)) ** (-1)) ) ) b1 = (p.b_ellipse / p.ltilde) * ( (eps_low / p.ltilde) ** (p.upsilon - 1) ) * ( (1 - ((eps_low / p.ltilde) ** p.upsilon)) ** ((1 - p.upsilon) / p.upsilon) ) - ( 2 * b2 * eps_low ) MDU_n[nvec_low] = 2 * b2 * nvec[nvec_low] + b1 d2 = ( 0.5 * p.b_ellipse * (p.ltilde ** (-p.upsilon)) * (p.upsilon - 1) * (eps_high ** (p.upsilon - 2)) * ( (1 - ((eps_high / p.ltilde) ** p.upsilon)) ** ((1 - p.upsilon) / p.upsilon) ) * ( 1 + ((eps_high / p.ltilde) ** p.upsilon) * ((1 - ((eps_high / p.ltilde) ** p.upsilon)) ** (-1)) ) ) d1 = (p.b_ellipse / p.ltilde) * ( (eps_high / p.ltilde) ** (p.upsilon - 1) ) * ( (1 - ((eps_high / p.ltilde) ** p.upsilon)) ** ((1 - p.upsilon) / p.upsilon) ) - ( 2 * d2 * eps_high ) MDU_n[nvec_high] = 2 * d2 * nvec[nvec_high] + d1 output = MDU_n * np.squeeze(chi_n) output = np.squeeze(output) return output
[docs] def get_bq(BQ, j, p, method): r""" Calculate bequests to each household. .. math:: bq_{j,s,t} = \zeta_{j,s}\frac{BQ_{t}}{\lambda_{j}\omega_{s,t}} Args: BQ (array_like): aggregate bequests j (int): index of lifetime ability group p (OG-Core Specifications object): model parameters method (str): adjusts calculation dimensions based on 'SS' or 'TPI' Returns: bq (array_like): bequests received by each household """ if p.use_zeta: if j is not None: if method == "SS": bq = (p.zeta[:, j] * BQ) / (p.lambdas[j] * p.omega_SS) else: len_T = BQ.shape[0] bq = ( np.reshape(p.zeta[:, j], (1, p.S)) * BQ.reshape((len_T, 1)) ) / (p.lambdas[j] * p.omega[:len_T, :]) else: if method == "SS": bq = (p.zeta * BQ) / ( p.lambdas.reshape((1, p.J)) * p.omega_SS.reshape((p.S, 1)) ) else: len_T = BQ.shape[0] bq = ( np.reshape(p.zeta, (1, p.S, p.J)) * utils.to_timepath_shape(BQ) ) / ( p.lambdas.reshape((1, 1, p.J)) * p.omega[:len_T, :].reshape((len_T, p.S, 1)) ) else: if j is not None: if method == "SS": bq = np.tile(BQ[j], p.S) / p.lambdas[j] if method == "TPI": len_T = BQ.shape[0] bq = np.tile( np.reshape(BQ[:, j] / p.lambdas[j], (len_T, 1)), (1, p.S) ) else: if method == "SS": BQ_per = BQ / np.squeeze(p.lambdas) bq = np.tile(np.reshape(BQ_per, (1, p.J)), (p.S, 1)) if method == "TPI": len_T = BQ.shape[0] BQ_per = BQ / p.lambdas.reshape(1, p.J) bq = np.tile(np.reshape(BQ_per, (len_T, 1, p.J)), (1, p.S, 1)) return bq
[docs] def get_tr(TR, j, p, method): r""" Calculate transfers to each household. .. math:: tr_{j,s,t} = \zeta_{j,s}\frac{TR_{t}}{\lambda_{j}\omega_{s,t}} Args: TR (array_like): aggregate transfers j (int): index of lifetime ability group p (OG-Core Specifications object): model parameters method (str): adjusts calculation dimensions based on 'SS' or 'TPI' Returns: tr (array_like): transfers received by each household """ if j is not None: if method == "SS": tr = (p.eta[-1, :, j] * TR) / (p.lambdas[j] * p.omega_SS) else: len_T = TR.shape[0] tr = (p.eta[:len_T, :, j] * TR.reshape((len_T, 1))) / ( p.lambdas[j] * p.omega[:len_T, :] ) else: if method == "SS": tr = (p.eta[-1, :, :] * TR) / ( p.lambdas.reshape((1, p.J)) * p.omega_SS.reshape((p.S, 1)) ) else: len_T = TR.shape[0] tr = (p.eta[:len_T, :, :] * utils.to_timepath_shape(TR)) / ( p.lambdas.reshape((1, 1, p.J)) * p.omega[:len_T, :].reshape((len_T, p.S, 1)) ) return tr
[docs] def get_cons(r, w, p_tilde, b, b_splus1, n, bq, net_tax, e, p): r""" Calculate household consumption. .. math:: c_{j,s,t} = \frac{(1 + r_{t})b_{j,s,t} + w_t e_{j,s} n_{j,s,t} + bq_{j,s,t} + tr_{j,s,t} - T_{j,s,t} - e^{g_y}b_{j,s+1,t+1}}{1 - \tau^{c}_{s,t}} Args: r (array_like): the real interest rate w (array_like): the real wage rate p_tilde (array_like): the ratio of real GDP to nominal GDP b (Numpy array): household savings b_splus1 (Numpy array): household savings one period ahead n (Numpy array): household labor supply bq (Numpy array): household bequests received net_tax (Numpy array): household net taxes paid e (Numpy array): effective labor units p (OG-Core Specifications object): model parameters Returns: cons (Numpy array): household consumption """ cons = ( (1 + r) * b + w * e * n + bq - b_splus1 * np.exp(p.g_y) - net_tax ) / p_tilde return cons
def get_ci(c_s, p_i, p_tilde, tau_c, alpha_c, method="SS"): r""" Compute consumption of good i given amount of composite consumption and prices. .. math:: c_{i,j,s,t} = \frac{c_{s,j,t}}{\alpha_{i,j}p_{i,j}} Args: c_s (array_like): composite consumption p_i (array_like): prices for consumption good i p_tilde (array_like): composite good price tau_c (array_like): consumption tax rate alpha_c (array_like): consumption share parameters method (str): adjusts calculation dimensions based on 'SS' or 'TPI' Returns: c_si (array_like): consumption of good i """ if method == "SS": I = alpha_c.shape[0] S = c_s.shape[0] J = c_s.shape[1] tau_c = tau_c.reshape(I, 1, 1) alpha_c = alpha_c.reshape(I, 1, 1) p_tilde.reshape(1, 1, 1) p_i = p_i.reshape(I, 1, 1) c_s = c_s.reshape(1, S, J) c_si = alpha_c * (((1 + tau_c) * p_i) / p_tilde) ** (-1) * c_s else: # Time path case I = alpha_c.shape[0] T = p_i.shape[0] S = c_s.shape[1] J = c_s.shape[2] tau_c = tau_c.reshape(T, I, 1, 1) alpha_c = alpha_c.reshape(1, I, 1, 1) p_tilde = p_tilde.reshape(T, 1, 1, 1) p_i = p_i.reshape(T, I, 1, 1) c_s = c_s.reshape(T, 1, S, J) c_si = alpha_c * (((1 + tau_c) * p_i) / p_tilde) ** (-1) * c_s return c_si
[docs] def FOC_savings( r, w, p_tilde, b, b_splus1, n, bq, factor, tr, ubi, theta, rho, etr_params, mtry_params, t, j, p, method, ): r""" Computes Euler errors for the FOC for savings in the steady state. This function is usually looped through over J, so it does one lifetime income group at a time. .. math:: \frac{c_{j,s,t}^{-\sigma}}{\tilde{p}_{t}} = e^{-\sigma g_y} \biggl[\chi^b_j\rho_s(b_{j,s+1,t+1})^{-\sigma} + \beta_j\bigl(1 - \rho_s\bigr)\Bigl(\frac{1 + r_{t+1} \bigl[1 - \tau^{mtry}_{s+1,t+1}\bigr]}{\tilde{p}_{t+1}}\Bigr) (c_{j,s+1,t+1})^{-\sigma}\biggr] Args: r (array_like): the real interest rate w (array_like): the real wage rate p_tilde (array_like): composite good price b (Numpy array): household savings b_splus1 (Numpy array): household savings one period ahead b_splus2 (Numpy array): household savings two periods ahead n (Numpy array): household labor supply bq (Numpy array): household bequests received factor (scalar): scaling factor converting model units to dollars tr (Numpy array): government transfers to household ubi (Numpy array): universal basic income payment theta (Numpy array): social security replacement rate for each lifetime income group rho (Numpy array): mortality rates etr_params (list): parameters of the effective tax rate functions mtry_params (list): parameters of the marginal tax rate on capital income functions t (int): model period j (int): index of ability type p (OG-Core Specifications object): model parameters method (str): adjusts calculation dimensions based on 'SS' or 'TPI' Returns: euler (Numpy array): Euler error from FOC for savings """ if j is not None: chi_b = p.chi_b[j] beta = p.beta[j] if method == "SS": tax_noncompliance = p.capital_income_tax_noncompliance_rate[-1, j] e = np.squeeze(p.e[-1, :, j]) elif method == "TPI_scalar": tax_noncompliance = p.capital_income_tax_noncompliance_rate[0, j] e = np.squeeze(p.e[0, :, j]) else: length = r.shape[0] tax_noncompliance = p.capital_income_tax_noncompliance_rate[ t : t + length, j ] e_long = np.concatenate( ( p.e, np.tile(p.e[-1, :, :].reshape(1, p.S, p.J), (p.S, 1, 1)), ), axis=0, ) e = np.diag(e_long[t : t + p.S, :, j], max(p.S - length, 0)) else: chi_b = p.chi_b beta = p.beta if method == "SS": tax_noncompliance = p.capital_income_tax_noncompliance_rate[-1, :] e = np.squeeze(p.e[-1, :, :]) elif method == "TPI_scalar": tax_noncompliance = p.capital_income_tax_noncompliance_rate[0, :] e = np.squeeze(p.e[0, :, :]) else: length = r.shape[0] tax_noncompliance = p.capital_income_tax_noncompliance_rate[ t : t + length, : ] e_long = np.concatenate( ( p.e, np.tile(p.e[-1, :, :].reshape(1, p.S, p.J), (p.S, 1, 1)), ), axis=0, ) e = np.diag(e_long[t : t + p.S, :, :], max(p.S - length, 0)) e = np.squeeze(e) if method == "SS": h_wealth = p.h_wealth[-1] m_wealth = p.m_wealth[-1] p_wealth = p.p_wealth[-1] p_tilde = np.ones_like(p.rho[-1, :]) * p_tilde elif method == "TPI_scalar": h_wealth = p.h_wealth[0] m_wealth = p.m_wealth[0] p_wealth = p.p_wealth[0] else: h_wealth = p.h_wealth[t] m_wealth = p.m_wealth[t] p_wealth = p.p_wealth[t] taxes = tax.net_taxes( r, w, b, n, bq, factor, tr, ubi, theta, t, j, False, method, e, etr_params, p, ) cons = get_cons(r, w, p_tilde, b, b_splus1, n, bq, taxes, e, p) deriv = ( (1 + r) - ( r * tax.MTR_income( r, w, b, n, factor, True, e, etr_params, mtry_params, tax_noncompliance, p, ) ) - tax.MTR_wealth(b, h_wealth, m_wealth, p_wealth) ) savings_ut = ( rho * np.exp(-p.sigma * p.g_y) * chi_b * b_splus1 ** (-p.sigma) ) euler_error = np.zeros_like(n) if n.shape[0] > 1: euler_error[:-1] = ( marg_ut_cons(cons[:-1], p.sigma) * (1 / p_tilde[:-1]) - beta * (1 - rho[:-1]) * deriv[1:] * marg_ut_cons(cons[1:], p.sigma) * (1 / p_tilde[1:]) * np.exp(-p.sigma * p.g_y) - savings_ut[:-1] ) euler_error[-1] = ( marg_ut_cons(cons[-1], p.sigma) * (1 / p_tilde[-1]) - savings_ut[-1] ) else: euler_error[-1] = ( marg_ut_cons(cons[-1], p.sigma) * (1 / p_tilde[-1]) - savings_ut[-1] ) return euler_error
[docs] def FOC_labor( r, w, p_tilde, b, b_splus1, n, bq, factor, tr, ubi, theta, chi_n, etr_params, mtrx_params, t, j, p, method, ): r""" Computes errors for the FOC for labor supply in the steady state. This function is usually looped through over J, so it does one lifetime income group at a time. .. math:: w_t e_{j,s}\bigl(1 - \tau^{mtrx}_{s,t}\bigr) \frac{(c_{j,s,t})^{-\sigma}}{ \tilde{p}_{t}} = \chi^n_{s} \biggl(\frac{b}{\tilde{l}}\biggr)\biggl(\frac{n_{j,s,t}} {\tilde{l}}\biggr)^{\upsilon-1}\Biggl[1 - \biggl(\frac{n_{j,s,t}}{\tilde{l}}\biggr)^\upsilon\Biggr] ^{\frac{1-\upsilon}{\upsilon}} Args: r (array_like): the real interest rate w (array_like): the real wage rate p_tilde (array_like): composite good price b (Numpy array): household savings b_splus1 (Numpy array): household savings one period ahead n (Numpy array): household labor supply bq (Numpy array): household bequests received factor (scalar): scaling factor converting model units to dollars tr (Numpy array): government transfers to household ubi (Numpy array): universal basic income payment theta (Numpy array): social security replacement rate for each lifetime income group chi_n (Numpy array): utility weight on the disutility of labor supply e (Numpy array): effective labor units etr_params (list): parameters of the effective tax rate functions mtrx_params (list): parameters of the marginal tax rate on labor income functions t (int): model period j (int): index of ability type p (OG-Core Specifications object): model parameters method (str): adjusts calculation dimensions based on 'SS' or 'TPI' Returns: FOC_error (Numpy array): error from FOC for labor supply """ if method == "SS": tau_payroll = p.tau_payroll[-1] elif method == "TPI_scalar": # for 1st donut ring only tau_payroll = p.tau_payroll[0] else: length = r.shape[0] tau_payroll = p.tau_payroll[t : t + length] if j is not None: if method == "SS": tax_noncompliance = p.labor_income_tax_noncompliance_rate[-1, j] e = np.squeeze(p.e[-1, :, j]) elif method == "TPI_scalar": tax_noncompliance = p.labor_income_tax_noncompliance_rate[0, j] e = np.squeeze(p.e[0, -1, j]) else: tax_noncompliance = p.labor_income_tax_noncompliance_rate[ t : t + length, j ] e_long = np.concatenate( ( p.e, np.tile(p.e[-1, :, :].reshape(1, p.S, p.J), (p.S, 1, 1)), ), axis=0, ) e = np.diag(e_long[t : t + p.S, :, j], max(p.S - length, 0)) else: if method == "SS": tax_noncompliance = p.labor_income_tax_noncompliance_rate[-1, :] e = np.squeeze(p.e[-1, :, :]) elif method == "TPI_scalar": tax_noncompliance = p.labor_income_tax_noncompliance_rate[0, :] e = np.squeeze(p.e[0, -1, :]) else: tax_noncompliance = p.labor_income_tax_noncompliance_rate[ t : t + length, : ] e_long = np.concatenate( ( p.e, np.tile(p.e[-1, :, :].reshape(1, p.S, p.J), (p.S, 1, 1)), ), axis=0, ) e = np.diag(e_long[t : t + p.S, :, j], max(p.S - length, 0)) if method == "SS": tau_payroll = p.tau_payroll[-1] elif method == "TPI_scalar": # for 1st donut ring only tau_payroll = p.tau_payroll[0] else: length = r.shape[0] tau_payroll = p.tau_payroll[t : t + length] if method == "TPI": if b.ndim == 2: r = r.reshape(r.shape[0], 1) w = w.reshape(w.shape[0], 1) tau_payroll = tau_payroll.reshape(tau_payroll.shape[0], 1) taxes = tax.net_taxes( r, w, b, n, bq, factor, tr, ubi, theta, t, j, False, method, e, etr_params, p, ) cons = get_cons(r, w, p_tilde, b, b_splus1, n, bq, taxes, e, p) deriv = ( 1 - tau_payroll - tax.MTR_income( r, w, b, n, factor, False, e, etr_params, mtrx_params, tax_noncompliance, p, ) ) FOC_error = marg_ut_cons(cons, p.sigma) * ( 1 / p_tilde ) * w * deriv * e - marg_ut_labor(n, chi_n, p) return FOC_error
[docs] def get_y(r_p, w, b_s, n, p, method): r""" Compute household income before taxes. .. math:: y_{j,s,t} = r_{p,t}b_{j,s,t} + w_{t}e_{j,s}n_{j,s,t} Args: r_p (array_like): real interest rate on the household portfolio w (array_like): real wage rate b_s (Numpy array): household savings coming into the period n (Numpy array): household labor supply p (OG-Core Specifications object): model parameters method (str): adjusts calculation dimensions based on 'SS' or 'TPI' """ if method == "SS": e = np.squeeze(p.e[-1, :, :]) elif method == "TPI": e = p.e y = r_p * b_s + w * e * n return y
[docs] def constraint_checker_SS(bssmat, nssmat, cssmat, ltilde): """ Checks constraints on consumption, savings, and labor supply in the steady state. Args: bssmat (Numpy array): steady state distribution of capital nssmat (Numpy array): steady state distribution of labor cssmat (Numpy array): steady state distribution of consumption ltilde (scalar): upper bound of household labor supply Returns: None Raises: Warnings: if constraints are violated, warnings printed """ print("Checking constraints on capital, labor, and consumption.") if (bssmat < 0).any(): print("\tWARNING: There is negative capital stock") flag2 = False if (nssmat < 0).any(): print( "\tWARNING: Labor supply violates nonnegativity ", "constraints." ) flag2 = True if (nssmat > ltilde).any(): print("\tWARNING: Labor supply violates the ltilde constraint.") flag2 = True if flag2 is False: print( "\tThere were no violations of the constraints on labor", " supply.", ) if (cssmat < 0).any(): print("\tWARNING: Consumption violates nonnegativity", " constraints.") else: print( "\tThere were no violations of the constraints on", " consumption." )
[docs] def constraint_checker_TPI(b_dist, n_dist, c_dist, t, ltilde): """ Checks constraints on consumption, savings, and labor supply along the transition path. Does this for each period t separately. Args: b_dist (Numpy array): distribution of capital at time t n_dist (Numpy array): distribution of labor at time t c_dist (Numpy array): distribution of consumption at time t t (int): time period ltilde (scalar): upper bound of household labor supply Returns: None Raises: Warnings: if constraints are violated, warnings printed """ if (b_dist <= 0).any(): print( "\tWARNING: Aggregate capital is less than or equal to ", "zero in period %.f." % t, ) if (n_dist < 0).any(): print( "\tWARNING: Labor supply violates nonnegativity", " constraints in period %.f." % t, ) if (n_dist > ltilde).any(): print( "\tWARNING: Labor suppy violates the ltilde constraint", " in period %.f." % t, ) if (c_dist < 0).any(): print( "\tWARNING: Consumption violates nonnegativity", " constraints in period %.f." % t, )