Source code for ogcore.demographics

"""
-------------------------------------------------------------------------------
Functions for generating demographic objects necessary for the OG-USA model. A
list of UN official 3-digit country codes and corresponding 3-character country
abbreviations is available at https://unstats.un.org/unsd/methodology/m49/
-------------------------------------------------------------------------------
"""

# Import packages
import os
import numpy as np
from io import StringIO
import scipy.optimize as opt
import pandas as pd
from ogcore.utils import get_legacy_session
from ogcore import parameter_plots as pp

START_YEAR = 2024
END_YEAR = 2024
UN_COUNTRY_CODE = "840"  # UN code for USA
# create output director for figures
CUR_PATH = os.path.split(os.path.abspath(__file__))[0]
OUTPUT_DIR = os.path.join(CUR_PATH, "..", "data", "OUTPUT", "Demographics")
if os.access(OUTPUT_DIR, os.F_OK) is False:
    os.makedirs(OUTPUT_DIR)


"""
------------------------------------------------------------------------
Define functions
------------------------------------------------------------------------
"""


[docs] def get_un_data( variable_code, country_id=UN_COUNTRY_CODE, start_year=START_YEAR, end_year=END_YEAR, ): """ This function retrieves data from the United Nations Data Portal API for UN population data (see https://population.un.org/dataportal/about/dataapi) Args: variable_code (str): variable code for UN data country_id (str): country id for UN data start_year (int): start year for UN data end_year (int): end year for UN data Returns: df (Pandas DataFrame): DataFrame of UN data """ target = ( "https://population.un.org/dataportalapi/api/v1/data/indicators/" + variable_code + "/locations/" + country_id + "/start/" + str(start_year) + "/end/" + str(end_year) + "?format=csv" ) # Check for a file named "un_api_token.txt" in the current directory if os.path.exists(os.path.join("un_api_token.txt")): with open(os.path.join("un_api_token.txt"), "r") as file: UN_TOKEN = file.read().strip() else: # if file not exist, prompt user for token try: UN_TOKEN = input( "Please enter your UN API token (press return if you do not have one): " ) # write the UN_TOKEN to a file to find in the future with open(os.path.join("un_api_token.txt"), "w") as file: file.write(UN_TOKEN) except EOFError: UN_TOKEN = "" # get data from url payload = {} headers = {"Authorization": "Bearer " + UN_TOKEN} response = get_legacy_session().get(target, headers=headers, data=payload) # Check if the request was successful before processing if response.status_code == 200: csvStringIO = StringIO(response.text) df = pd.read_csv(csvStringIO, sep="|", header=1) # keep just what is needed from data df = df[df.Variant == "Median"] df = df[df.Sex == "Both sexes"][["TimeLabel", "AgeLabel", "Value"]] df.rename( {"TimeLabel": "year", "AgeLabel": "age", "Value": "value"}, axis=1, inplace=True, ) df.loc[df.age == "100+", "age"] = 100 df.age = df.age.astype(int) df.year = df.year.astype(int) df = df[df.age < 100] # need to drop 100+ age category else: # Read from UN GH Repo: print( f"Failed to retrieve population data from UN. Reading " + " from https://github.com/EAPD-DRB/Population-Data " + "instead of UN WPP API" ) country_dict = { "840": "USA", "710": "ZAF", "458": "MYS", "356": "IND", "826": "UK", "360": "IDN", "608": "PHL", "764": "THA", "076": "BRA", "410": "KOR", } un_variable_dict = { "68": "fertility_rates", "80": "mortality_rates", "47": "population", } country = country_dict[country_id] variable = un_variable_dict[variable_code] url = ( "https://raw.githubusercontent.com/EAPD-DRB/" + "Population-Data/main/" + "Data/{c}/UN_{v}_data.csv".format(c=country, v=variable) ) df = pd.read_csv(url) # keep just the years requested df = df[(df.year >= start_year) & (df.year <= end_year)] # Do we still want to keep the status code for failures? # print( # f"Failed to retrieve population data. HTTP status code: {response.status_code}" # ) # assert False return df
[docs] def get_fert( totpers=100, min_age=0, max_age=99, country_id=UN_COUNTRY_CODE, start_year=START_YEAR, end_year=END_YEAR, graph=False, plot_path=None, download_path=None, ): """ This function generates a vector of fertility rates by model period age that corresponds to the fertility rate data by age in years. Args: totpers (int): total number of agent life periods (E+S), >= 3 min_age (int): age in years at which agents are born, >= 0 max_age (int): age in years at which agents die with certainty, >= 4, < 100 (max age in UN data is 99, 100+ i same group) country_id (str): country id for UN data start_year (int): start year for UN data end_year (int): end year for UN data graph (bool): =True if want graphical output plot_path (str): path to save fertility rate plot download_path (str): path to save fertility rate data Returns: fert_rates (Numpy array): fertility rates for each year of data and model age fig (Matplotlib Figure): figure object if graph=True and plot_path=None """ # initialize fert rates array fert_rates_2D = np.zeros((end_year + 1 - start_year, totpers)) # Read UN data df = get_un_data( "68", country_id=country_id, start_year=start_year, end_year=end_year ) # CLean and rebin data for y in range(start_year, end_year + 1): df_y = df[(df.age >= min_age) & (df.age <= max_age) & (df.year == y)] # put in vector fert_rates = df_y.value.values # fill in with zeros for ages < 15 and > 49 # NOTE: this assumes min_year < 15 and max_age > 49 fert_rates = np.append(fert_rates, np.zeros(max_age - 49)) fert_rates = np.append(np.zeros(15 - min_age), fert_rates) # divide by 1000 because fertility rates are number of births per # 1000 woman and we want births per person (might update to account # from fraction men more correctly - below assumes 50/50 men and women) fert_rates = fert_rates / 2000 # Rebin data in the case that model period not equal to one calendar # year fert_rates = pop_rebin(fert_rates, totpers) fert_rates_2D[y - start_year, :] = fert_rates if download_path: np.savetxt( os.path.join(download_path, "fert_rates.csv"), fert_rates_2D, delimiter=",", ) # Create plots if needed if graph: if start_year == end_year: years_to_plot = [start_year] else: years_to_plot = [start_year, end_year] if plot_path is not None: pp.plot_fert_rates( [fert_rates_2D], start_year=start_year, years_to_plot=years_to_plot, path=plot_path, ) return fert_rates_2D else: fig = pp.plot_fert_rates( [fert_rates_2D], start_year=start_year, years_to_plot=years_to_plot, ) return fert_rates_2D, fig else: return fert_rates_2D
[docs] def get_mort( totpers=100, min_age=0, max_age=99, country_id=UN_COUNTRY_CODE, start_year=START_YEAR, end_year=END_YEAR, graph=False, plot_path=None, download_path=None, ): """ This function generates a vector of mortality rates by model period age. Args: totpers (int): total number of agent life periods (E+S), >= 3 min_age (int): age in years at which agents are born, >= 0 max_age (int): age in years at which agents die with certainty, >= 4, < 100 (max age in UN data is 99, 100+ i same group) country_id (str): country id for UN data start_year (int): start year for UN data end_year (int): end year for UN data graph (bool): =True if want graphical output plot_path (str): path to save mortality rate plot download_path (str): path to save mortality rate data Returns: mort_rates (Numpy array) mortality rates for each year of data and model age infmort_rate_vec (Numpy array): infant mortality rates for each fig (Matplotlib Figure): figure object if graph=True and plot_path=None """ mort_rates_2D = np.zeros((end_year + 1 - start_year, totpers)) infmort_rate_vec = np.zeros(end_year + 1 - start_year) # Read UN data df = get_un_data( "80", country_id=country_id, start_year=start_year, end_year=end_year ) # CLean and rebin data for y in range(start_year, end_year + 1): df_y = df[(df.age >= min_age) & (df.age <= max_age) & (df.year == y)] # put in vector mort_rates_data = df_y.value.values # In UN data, mortality rates for 0 year olds are the infant # mortality rates infmort_rate = mort_rates_data[0] # Rebin data in the case that model period not equal to one calendar # year # make mort rates those from age 1-100 and set to 1 for age 100 mort_rates_data = np.append(mort_rates_data[1:], 1.0) mort_rates = pop_rebin(mort_rates_data, totpers) # put in 2D array mort_rates_2D[y - start_year, :] = mort_rates infmort_rate_vec[y - start_year] = infmort_rate if download_path: np.savetxt( os.path.join(download_path, "mort_rates.csv"), mort_rates_2D, delimiter=",", ) np.savetxt( os.path.join(download_path, "infmort_rates.csv"), infmort_rate_vec, delimiter=",", ) # Create plots if needed if graph: if start_year == end_year: years_to_plot = [start_year] else: years_to_plot = [start_year, end_year] if plot_path is not None: pp.plot_mort_rates_data( mort_rates_2D, start_year, years_to_plot, path=plot_path, ) return mort_rates_2D, infmort_rate_vec else: fig = pp.plot_mort_rates_data( mort_rates_2D, start_year, years_to_plot, ) return mort_rates_2D, infmort_rate_vec, fig else: return mort_rates_2D, infmort_rate_vec
[docs] def get_pop( E=20, S=80, min_age=0, max_age=99, infer_pop=False, fert_rates=None, mort_rates=None, infmort_rates=None, imm_rates=None, initial_pop=None, pre_pop_dist=None, country_id=UN_COUNTRY_CODE, start_year=START_YEAR, end_year=END_YEAR, download_path=None, ): """ Retrieves the population distribution data from the UN data API Args: E (int): number of model periods in which agent is not economically active, >= 1 S (int): number of model periods in which agent is economically active, >= 3 min_age (int): age in years at which agents are born, >= 0 max_age (int): age in years at which agents die with certainty, >= 4, < 100 (max age in UN data is 99, 100+ i same group) infer_pop (bool): =True if want to infer the population from the given fertility, mortality, and immigration rates fert_rates (Numpy array): fertility rates for each year of data and model age mort_rates (Numpy array): mortality rates for each year of data and model age infmort_rates (Numpy array): infant mortality rates for each year of data imm_rates (Numpy array): immigration rates for reach year of data and model age initial_pop_data (Pandas DataFrame): initial population data for the first year of model calibration (start_year) pre_pop_dist (Numpy array): population distribution for the year before the initial year for calibration country_id (str): country id for UN data start_year (int): start year data end_year (int): end year for data download_path (str): path to save population distribution data Returns: pop_2D (Numpy array): population distribution over T0 periods pre_pop (Numpy array): population distribution one year before initial year for calibration of omega_S_preTP """ # Generate time path of the nonstationary population distribution # Get path up to end of data year pop_2D = np.zeros((end_year + 2 - start_year, E + S)) if infer_pop: if pre_pop_dist is None: pre_pop_data = get_un_data( "47", country_id=country_id, start_year=start_year - 1, end_year=start_year - 1, ) if download_path: pre_pop_data.to_csv( os.path.join(download_path, "raw_pre_pop_data_UN.csv"), index=False, ) pre_pop_sample = pre_pop_data[ (pre_pop_data["age"] >= min_age) & (pre_pop_data["age"] <= max_age) ] pre_pop = pre_pop_sample.value.values pre_pop_dist = pop_rebin(pre_pop, E + S) else: pre_pop = pre_pop_dist if initial_pop is None: initial_pop_data = get_un_data( "47", country_id=country_id, start_year=start_year, end_year=start_year, ) initial_pop_sample = initial_pop_data[ (initial_pop_data["age"] >= min_age) & (initial_pop_data["age"] <= max_age) ] initial_pop = initial_pop_sample.value.values initial_pop = pop_rebin(initial_pop, E + S) # Check that have all necessary inputs to infer the population # distribution assert not [ x for x in (fert_rates, mort_rates, infmort_rates, imm_rates) if x is None ] len_pop_dist = end_year + 1 - start_year pop_2D = np.zeros((len_pop_dist, E + S)) # set initial population distribution in the counterfactual to # the first year of the user provided distribution pop_2D[0, :] = initial_pop for t in range(1, len_pop_dist): # find newborns next period newborns = np.dot(fert_rates[t - 1, :], pop_2D[t - 1, :]) pop_2D[t, 0] = (1 - infmort_rates[t - 1]) * newborns + imm_rates[ t - 1, 0 ] * pop_2D[t - 1, 0] pop_2D[t, 1:] = ( pop_2D[t - 1, :-1] * (1 - mort_rates[t - 1, :-1]) + pop_2D[t - 1, 1:] * imm_rates[t - 1, 1:] ) else: # Read UN data pop_data = get_un_data( "47", country_id=country_id, start_year=start_year, end_year=end_year + 2, # note go to + 2 because needed to infer immigration for end_year ) # CLean and rebin data for y in range(start_year, end_year + 2): pop_data_sample = pop_data[ (pop_data["age"] >= min_age) & (pop_data["age"] <= max_age) & (pop_data["year"] == y) ] pop = pop_data_sample.value.values # Generate the current population distribution given that E+S might # be less than max_age-min_age+1 # age_per_EpS = np.arange(1, E + S + 1) pop_EpS = pop_rebin(pop, E + S) pop_2D[y - start_year, :] = pop_EpS # get population distribution one year before initial year for # calibration of omega_S_preTP pre_pop_data = get_un_data( "47", country_id=country_id, start_year=start_year - 1, end_year=start_year - 1, ) pre_pop_sample = pre_pop_data[ (pre_pop_data["age"] >= min_age) & (pre_pop_data["age"] <= max_age) ] pre_pop = pre_pop_sample.value.values if download_path: np.savetxt( os.path.join(download_path, "population_distribution.csv"), pop_2D, delimiter=",", ) np.savetxt( os.path.join( download_path, "pre_period_population_distribution.csv" ), pre_pop, delimiter=",", ) return pop_2D, pre_pop
[docs] def pop_rebin(curr_pop_dist, totpers_new): """ For cases in which totpers (E+S) is less than the number of periods in the population distribution data, this function calculates a new population distribution vector with totpers (E+S) elements. Args: curr_pop_dist (Numpy array): population distribution over N periods totpers_new (int): number of periods to which we are transforming the population distribution, >= 3 Returns: curr_pop_new (Numpy array): new population distribution over totpers (E+S) periods that approximates curr_pop_dist """ # Number of periods in original data assert totpers_new >= 3 # Number of periods in original data totpers_orig = len(curr_pop_dist) if int(totpers_new) == totpers_orig: curr_pop_new = curr_pop_dist elif int(totpers_new) < totpers_orig: num_sub_bins = float(10000) curr_pop_sub = np.repeat( np.float64(curr_pop_dist) / num_sub_bins, num_sub_bins ) len_subbins = (np.float64(totpers_orig * num_sub_bins)) / totpers_new curr_pop_new = np.zeros(totpers_new, dtype=np.float64) end_sub_bin = 0 for i in range(totpers_new): beg_sub_bin = int(end_sub_bin) end_sub_bin = int(np.rint((i + 1) * len_subbins)) curr_pop_new[i] = curr_pop_sub[beg_sub_bin:end_sub_bin].sum() # Return curr_pop_new to single precision float (float32) # datatype curr_pop_new = np.float32(curr_pop_new) return curr_pop_new
[docs] def get_imm_rates( totpers=100, min_age=0, max_age=99, fert_rates=None, mort_rates=None, infmort_rates=None, pop_dist=None, country_id=UN_COUNTRY_CODE, start_year=START_YEAR, end_year=END_YEAR, graph=False, plot_path=None, download_path=None, ): """ Calculate immigration rates by age as a residual given population levels in different periods, then output average calculated immigration rate. We have to replace the first mortality rate in this function in order to adjust the first implied immigration rate Args: totpers (int): total number of agent life periods (E+S), >= 3 min_age (int): age in years at which agents are born, >= 0 max_age (int): age in years at which agents die with certainty, >= 4 fert_rates (Numpy array): fertility rates for each year of data and model age mort_rates (Numpy array): mortality rates for each year of data and model age infmort_rates (Numpy array): infant mortality rates for each year of data pop_dist (Numpy array): population distribution over T0+1 periods country_id (str): country id for UN data start_year (int): start year for UN data end_year (int): end year for UN data graph (bool): =True if want graphical output plot_path (str): path to save figure to download_path (str): path to save immigration rate data Returns: imm_rates_2D (Numpy array):immigration rates that correspond to each year of data and period of life, length E+S """ imm_rates_2D = np.zeros((end_year + 1 - start_year, totpers)) if fert_rates is None: # get fert rates from UN data from initial year to data year fert_rates = get_fert( totpers, min_age, max_age, country_id, start_year, end_year ) else: # ensure that user provided fert_rates and mort rates of same size assert fert_rates.shape == mort_rates.shape if mort_rates is None: # get mort rates from UN data from initial year to data year mort_rates, infmort_rates = get_mort( totpers, min_age, max_age, country_id, start_year, end_year ) else: # ensure that user provided fert_rates and mort rates of same size assert fert_rates.shape == mort_rates.shape assert infmort_rates is not None assert infmort_rates.shape[0] == mort_rates.shape[0] if pop_dist is None: # need to read UN population data df = get_un_data( "47", country_id=country_id, start_year=start_year, end_year=end_year + 2, ) pop_dist = np.zeros((end_year + 2 - start_year, totpers)) for y in range(start_year, end_year + 1): pop_t = df[ (df.age < 100) & (df.age >= 0) & (df.year == y) ].value.values pop_t = pop_rebin(pop_t, totpers) pop_dist[y - start_year, :] = pop_t # Make sure shape conforms assert pop_dist.shape[1] == mort_rates.shape[1] assert pop_dist.shape[0] == end_year - start_year + 2 for y in range(start_year, end_year + 1): pop_t = pop_dist[y - start_year, :] pop_tp1 = pop_dist[y + 1 - start_year, :] # initialize imm_rate vector imm_rates = np.zeros(totpers) # back out imm rates by age for each year newborns = np.dot(fert_rates[y - start_year, :], pop_t) # new born imm_rate imm_rates[0] = ( pop_tp1[0] - (1 - infmort_rates[y - start_year]) * newborns ) / pop_t[0] # all other age imm_rates imm_rates[1:] = ( pop_tp1[1:] - (1 - mort_rates[y - start_year, :-1]) * pop_t[:-1] ) / pop_t[1:] imm_rates_2D[y - start_year, :] = imm_rates if download_path: np.savetxt( os.path.join(download_path, "immigration_rates.csv"), imm_rates_2D, delimiter=",", ) # Create plots if needed if graph: if start_year == end_year: years_to_plot = [start_year] else: years_to_plot = [start_year, end_year] if plot_path is not None: pp.plot_imm_rates( imm_rates_2D, start_year, years_to_plot, path=plot_path, ) return imm_rates_2D else: fig = pp.plot_imm_rates( imm_rates_2D, start_year, years_to_plot, ) return imm_rates_2D, fig else: return imm_rates_2D
[docs] def immsolve(imm_rates, *args): """ This function generates a vector of errors representing the difference in two consecutive periods stationary population distributions. This vector of differences is the zero-function objective used to solve for the immigration rates vector, similar to the original immigration rates vector from get_imm_rates(), that sets the steady-state population distribution by age equal to the population distribution in period int(1.5*S) Args: imm_rates (Numpy array):immigration rates that correspond to each period of life, length E+S args (tuple): (fert_rates, mort_rates, infmort_rates, omega_cur, g_n_SS) Returns: omega_errs (Numpy array): difference between omega_new and omega_cur_pct, length E+S """ fert_rates, mort_rates, infmort_rates, omega_cur_lev, g_n_SS = args omega_cur_pct = omega_cur_lev / omega_cur_lev.sum() totpers = len(fert_rates) OMEGA = np.zeros((totpers, totpers)) OMEGA[0, :] = (1 - infmort_rates) * fert_rates + np.hstack( (imm_rates[0], np.zeros(totpers - 1)) ) OMEGA[1:, :-1] += np.diag(1 - mort_rates[:-1]) OMEGA[1:, 1:] += np.diag(imm_rates[1:]) omega_new = np.dot(OMEGA, omega_cur_pct) / (1 + g_n_SS) omega_errs = omega_new - omega_cur_pct return omega_errs
[docs] def get_pop_objs( E=20, S=80, T=320, min_age=0, max_age=99, fert_rates=None, mort_rates=None, infmort_rates=None, imm_rates=None, infer_pop=False, pop_dist=None, pre_pop_dist=None, country_id=UN_COUNTRY_CODE, initial_data_year=START_YEAR - 1, final_data_year=START_YEAR + 2, GraphDiag=True, download_path=None, ): """ This function produces the demographics objects to be used in the OG-USA model package. Args: E (int): number of model periods in which agent is not economically active, >= 1 S (int): number of model periods in which agent is economically active, >= 3 T (int): number of periods to be simulated in TPI, > 2*S min_age (int): age in years at which agents are born, >= 0 max_age (int): age in years at which agents die with certainty, >= 4, < 100 (max age in UN data is 99, 100+ i same group) fert_rates (array_like): user provided fertility rates, dimensions are T0 x E+S mort_rates (array_like): user provided mortality rates, dimensions are T0 x E+S infmort_rates (array_like): user provided infant mortality rates, length T0 imm_rates (array_like): user provided immigration rates, dimensions are T0 x E+S infer_pop (bool): =True if want to infer the population pop_dist (array_like): user provided population distribution, dimensions are T0+1 x E+S pre_pop_dist (array_like): user provided population distribution for the year before the initial year for calibration, length E+S country_id (str): country id for UN data initial_data_year (int): initial year of data to use (not relevant if have user provided data) final_data_year (int): final year of data to use, T0=initial_year-final_year + 1 pop_dist (array_like): user provided population distribution, last dimension is of length E+S GraphDiag (bool): =True if want graphical output and printed diagnostics Returns: pop_dict (dict): includes: omega_path_S (Numpy array), time path of the population distribution from the current state to the steady-state, size T+S x S g_n_SS (scalar): steady-state population growth rate omega_SS (Numpy array): normalized steady-state population distribution, length S surv_rates (Numpy array): survival rates that correspond to each model period of life, length S mort_rates (Numpy array): mortality rates that correspond to each model period of life, length S g_n_path (Numpy array): population growth rates over the time path, length T + S """ # TODO: this function does not generalize with T. # It assumes one model period is equal to one calendar year in the # time dimesion (it does adjust for S, however) T0 = ( final_data_year - initial_data_year + 1 ) # number of periods until constant fertility and mortality rates print( "Demographics data: Initial Data year = ", initial_data_year, ", Final Data year = ", final_data_year, ) assert E + S <= max_age - min_age + 1 assert initial_data_year >= 2011 and initial_data_year <= 2100 - 1 assert final_data_year >= 2011 and final_data_year <= 2100 - 1 # Ensure that the last year of data used is before SS transition assumed # Really, it will need to be well before this assert final_data_year > initial_data_year assert final_data_year < initial_data_year + T assert ( T > 2 * T0 ) # ensure time path 2x as long as allows rates to fluctuate if imm_rates is not None and pop_dist is None: assert ( infer_pop is True ) # if pass immigration rates, need to infer population # Get fertility rates if not provided if fert_rates is None: # get fert rates from UN data from initial year to data year fert_rates = get_fert( E + S, min_age, max_age, country_id, initial_data_year, final_data_year, download_path=download_path, ) else: # ensure that user provided fert_rates are of the correct shape assert fert_rates.shape[0] == T0 assert fert_rates.shape[-1] == E + S # Extrapolate fertility rates for the rest of the transition path # the implicit assumption is that they are constant after the # last year of UN or user provided data fert_rates = np.concatenate( ( fert_rates, np.tile( fert_rates[-1, :].reshape(1, E + S), (T + S - fert_rates.shape[0], 1), ), ), axis=0, ) # Get mortality rates if not provided if mort_rates is None: # get mort rates from UN data from initial year to data year mort_rates, infmort_rates = get_mort( E + S, min_age, max_age, country_id, initial_data_year, final_data_year, download_path=download_path, ) else: # ensure that user provided mort_rates are of the correct shape assert mort_rates.shape[0] == T0 assert mort_rates.shape[-1] == E + S assert infmort_rates is not None assert infmort_rates.shape[0] == mort_rates.shape[0] # Extrapolate mortality rates for the rest of the transition path # the implicit assumption is that they are constant after the # last year of UN or user provided data mort_rates = np.concatenate( ( mort_rates, np.tile( mort_rates[-1, :].reshape(1, E + S), (T + S - mort_rates.shape[0], 1), ), ), axis=0, ) infmort_rates = np.concatenate( ( infmort_rates, np.tile(infmort_rates[-1], (T + S - infmort_rates.shape[0])), ) ) mort_rates_S = mort_rates[:, E:] # Get population distribution if not provided # or if just provide initial pop and infer_pop=True if (pop_dist is None) or (pop_dist is not None and infer_pop is True): if infer_pop: if pop_dist is not None: initial_pop = pop_dist[0, :].reshape(1, pop_dist.shape[-1]) else: initial_pop = None pop_2D, pre_pop = get_pop( E, S, min_age, max_age, infer_pop, fert_rates, mort_rates, infmort_rates, imm_rates, initial_pop, pre_pop_dist, country_id, initial_data_year, final_data_year, download_path=download_path, ) else: pop_2D, pre_pop = get_pop( E, S, min_age, max_age, country_id=country_id, start_year=initial_data_year, end_year=final_data_year, download_path=download_path, ) else: # Check first dims of pop_dist as input by user print("T0 = ", T0) assert pop_dist.shape[0] == T0 + 1 # population needs to be # one year longer in order to find immigration rates assert pop_dist.shape[-1] == E + S # Check that pre_pop specified assert pre_pop_dist is not None assert pre_pop_dist.shape[0] == pop_dist.shape[1] pre_pop = pre_pop_dist # Create 2D array of population distribution pop_2D = np.zeros((T0 + 1, E + S)) for t in range(T0 + 1): pop_EpS = pop_rebin(pop_dist[t, :], E + S) pop_2D[t, :] = pop_EpS # Get percentage distribution for S periods for pre-TP period pre_pop_EpS = pop_rebin(pre_pop, E + S) # Get immigration rates if not provided if imm_rates is None: imm_rates_orig = get_imm_rates( E + S, min_age, max_age, fert_rates, mort_rates, infmort_rates, pop_2D, country_id, initial_data_year, final_data_year, download_path=download_path, ) else: # ensure that user provided imm_rates are of the correct shape assert imm_rates.shape[0] == T0 assert imm_rates.shape[-1] == E + S imm_rates_orig = imm_rates # Extrapolate immigration rates for the rest of the transition path # the implicit assumption is that they are constant after the # last year of UN or user provided data imm_rates_orig = np.concatenate( ( imm_rates_orig, np.tile( imm_rates_orig[-1, :].reshape(1, E + S), (T + S - imm_rates_orig.shape[0], 1), ), ), axis=0, ) # If the population distribution was given, check it for consistency # with the fertility, mortality, and immigration rates # if pop_dist is not None and not infer_pop: # len_pop_dist = pop_dist.shape[0] # pop_counter_2D = np.zeros((len_pop_dist, E + S)) len_pop_dist = pop_2D.shape[0] pop_counter_2D = np.zeros((len_pop_dist, E + S)) # set initial population distribution in the counterfactual to # the first year of the user provided distribution # pop_counter_2D[0, :] = pop_dist[0, :] pop_counter_2D[0, :] = pop_2D[0, :] for t in range(1, len_pop_dist): # find newborns next period # newborns = np.dot(fert_rates[t - 1, :], pop_counter_2D[t - 1, :]) # pop_counter_2D[t, 0] = ( # 1 - infmort_rates[t - 1] # ) * newborns + imm_rates[t - 1, 0] * pop_counter_2D[t - 1, 0] # pop_counter_2D[t, 1:] = ( # pop_counter_2D[t - 1, :-1] * (1 - mort_rates[t - 1, :-1]) # + pop_counter_2D[t - 1, 1:] * imm_rates_orig[t - 1, 1:] # ) newborns = np.dot(fert_rates[t - 1, :], pop_counter_2D[t - 1, :]) pop_counter_2D[t, 0] = ( 1 - infmort_rates[t - 1] ) * newborns + imm_rates_orig[t - 1, 0] * pop_counter_2D[t - 1, 0] pop_counter_2D[t, 1:] = ( pop_counter_2D[t - 1, :-1] * (1 - mort_rates[t - 1, :-1]) + pop_counter_2D[t - 1, 1:] * imm_rates_orig[t - 1, 1:] ) # Check that counterfactual pop dist is close to pop dist given # assert np.allclose(pop_counter_2D, pop_dist) assert np.allclose(pop_counter_2D, pop_2D) """" CHANGE - in OG-Core, we are implicitLy assuming pre-TP rates of mortality, fertility, and immigration are the same as the period 0 rates. So let's just infer the pre-pop_dist from those. """ pop1 = pop_2D[0, :] fert0 = fert_rates[0, :] mort0 = mort_rates[0, :] infmort0 = infmort_rates[0] imm0 = imm_rates_orig[0, :] pre_pop_guess = pop1.copy() # I can't solve this analytically, so set up a system of equation # to solve def pre_pop_solve(pre_pop_guess, pop1, fert0, mort0, infmort0, imm0): pre_pop = pre_pop_guess errors = np.zeros(E + S) errors[0] = pop1[0] - ( (1 - infmort0) * (fert0 * pre_pop).sum() + imm0[0] * pre_pop[0] ) errors[1:] = pop1[1:] - ( pre_pop[:-1] * (1 - mort0[:-1]) + pre_pop[1:] * imm0[1:] ) # print("Max error = ", np.abs(errors).max()) return errors opt_res = opt.root( pre_pop_solve, pre_pop_guess, args=(pop1, fert0, mort0, infmort0, imm0), method="lm", ) pre_pop = opt_res.x print( "Success? ", opt_res.success, ", Max diff = ", np.abs(opt_res.fun).max(), ) pre_pop_EpS = pop_rebin(pre_pop, E + S) # Check result initial_pop_counter = np.zeros(E + S) newborns = (fert_rates[0, :] * pre_pop[:]).sum() initial_pop_counter[0] = ( 1 - infmort_rates[0] ) * newborns + imm_rates_orig[0, 0] * pre_pop[0] initial_pop_counter[1:] = ( pre_pop[:-1] * (1 - mort_rates[0, :-1]) + pre_pop[1:] * imm_rates_orig[0, 1:] ) # Test that using pre pop get to pop in period 1 print("Max diff = ", np.abs(pop_2D[0, :] - initial_pop_counter).max()) # assert np.allclose(initial_pop_counter, pop_2D[0, :]) # Create the transition matrix for the population distribution # from T0 going forward (i.e., past when we have data on forecasts) OMEGA_orig = np.zeros((E + S, E + S)) OMEGA_orig[0, :] = (1 - infmort_rates[-1]) * fert_rates[-1, :] + np.hstack( (imm_rates_orig[-1, 0], np.zeros(E + S - 1)) ) OMEGA_orig[1:, :-1] += np.diag(1 - mort_rates[-1, :-1]) OMEGA_orig[1:, 1:] += np.diag(imm_rates_orig[-1, 1:]) # Solve for steady-state population growth rate and steady-state # population distribution by age using eigenvalue and eigenvector # decomposition eigvalues, eigvectors = np.linalg.eig(OMEGA_orig) g_n_SS = (eigvalues[np.isreal(eigvalues)].real).max() - 1 eigvec_raw = eigvectors[ :, (eigvalues[np.isreal(eigvalues)].real).argmax() ].real omega_SS_orig = eigvec_raw / eigvec_raw.sum() # Generate time path of the population distribution after final # year of data omega_path_lev = np.zeros((T + S, E + S)) pop_curr = pop_2D[T0 - 1, :] omega_path_lev[:T0, :] = pop_2D[:T0, :] for per in range(T0, T + S): pop_next = np.dot(OMEGA_orig, pop_curr) omega_path_lev[per, :] = pop_next.copy() pop_curr = pop_next.copy() # Force the population distribution after 1.5*S periods to be the # steady-state distribution by adjusting immigration rates, holding # constant mortality, fertility, and SS growth rates imm_tol = 1e-14 fixper = int(1.5 * S + T0) omega_SSfx = omega_path_lev[fixper, :] / omega_path_lev[fixper, :].sum() imm_objs = ( fert_rates[fixper, :], mort_rates[fixper, :], infmort_rates[fixper], omega_path_lev[fixper, :], g_n_SS, ) imm_fulloutput = opt.fsolve( immsolve, imm_rates_orig[fixper, :], args=(imm_objs), full_output=True, xtol=imm_tol, ) imm_rates_adj = imm_fulloutput[0] imm_diagdict = imm_fulloutput[1] omega_path_S = omega_path_lev[:, -S:] / ( omega_path_lev[:, -S:].sum(axis=1).reshape((T + S, 1)) ) omega_path_S[fixper:, :] = np.tile( omega_path_S[fixper, :].reshape((1, S)), (T + S - fixper, 1) ) g_n_path = np.zeros(T + S) g_n_path[1:] = ( omega_path_lev[1:, -S:].sum(axis=1) - omega_path_lev[:-1, -S:].sum(axis=1) ) / omega_path_lev[:-1, -S:].sum(axis=1) g_n_path[0] = ( omega_path_lev[0, -S:].sum() - pre_pop_EpS[-S:].sum() ) / pre_pop_EpS[-S:].sum() g_n_path[fixper + 1 :] = g_n_SS omega_S_preTP = pre_pop_EpS[-S:] / pre_pop_EpS[-S:].sum() imm_rates_mat = np.concatenate( ( imm_rates_orig[:fixper, E:], np.tile( imm_rates_adj[E:].reshape(1, S), (T + S - fixper, 1), ), ), axis=0, ) if GraphDiag: # Check whether original SS population distribution is close to # the period-T population distribution omegaSSmaxdif = np.absolute( omega_SS_orig - (omega_path_lev[T, :] / omega_path_lev[T, :].sum()) ).max() if omegaSSmaxdif > 0.0003: print( "POP. WARNING: Max. abs. dist. between original SS " + "pop. dist'n and period-T pop. dist'n is greater than" + " 0.0003. It is " + str(omegaSSmaxdif) + "." ) else: print( "POP. SUCCESS: orig. SS pop. dist is very close to " + "period-T pop. dist'n. The maximum absolute " + "difference is " + str(omegaSSmaxdif) + "." ) # Plot the adjusted steady-state population distribution versus # the original population distribution. The difference should be # small omegaSSvTmaxdiff = np.absolute(omega_SS_orig - omega_SSfx).max() if omegaSSvTmaxdiff > 0.0003: print( "POP. WARNING: The maximum absolute difference " + "between any two corresponding points in the original" + " and adjusted steady-state population " + "distributions is" + str(omegaSSvTmaxdiff) + ", " + "which is greater than 0.0003." ) else: print( "POP. SUCCESS: The maximum absolute difference " + "between any two corresponding points in the original" + " and adjusted steady-state population " + "distributions is " + str(omegaSSvTmaxdiff) ) # Print whether or not the adjusted immigration rates solved the # zero condition immtol_solved = np.absolute(imm_diagdict["fvec"].max()) < imm_tol if immtol_solved: print( "POP. SUCCESS: Adjusted immigration rates solved " + "with maximum absolute error of " + str(np.absolute(imm_diagdict["fvec"].max())) + ", which is less than the tolerance of " + str(imm_tol) ) else: print( "POP. WARNING: Adjusted immigration rates did not " + "solve. Maximum absolute error of " + str(np.absolute(imm_diagdict["fvec"].max())) + " is greater than the tolerance of " + str(imm_tol) ) # Test whether the steady-state growth rates implied by the # adjusted OMEGA matrix equals the steady-state growth rate of # the original OMEGA matrix OMEGA2 = np.zeros((E + S, E + S)) OMEGA2[0, :] = (1 - infmort_rates[-1]) * fert_rates[-1, :] + np.hstack( (imm_rates_adj[0], np.zeros(E + S - 1)) ) OMEGA2[1:, :-1] += np.diag(1 - mort_rates[-1, :-1]) OMEGA2[1:, 1:] += np.diag(imm_rates_adj[1:]) eigvalues2, eigvectors2 = np.linalg.eig(OMEGA2) g_n_SS_adj = (eigvalues[np.isreal(eigvalues2)].real).max() - 1 if np.max(np.absolute(g_n_SS_adj - g_n_SS)) > 10 ** (-8): print( "FAILURE: The steady-state population growth rate" + " from adjusted OMEGA is different (diff is " + str(g_n_SS_adj - g_n_SS) + ") than the steady-" + "state population growth rate from the original" + " OMEGA." ) elif np.max(np.absolute(g_n_SS_adj - g_n_SS)) <= 10 ** (-8): print( "SUCCESS: The steady-state population growth rate" + " from adjusted OMEGA is close to (diff is " + str(g_n_SS_adj - g_n_SS) + ") the steady-" + "state population growth rate from the original" + " OMEGA." ) # Do another test of the adjusted immigration rates. Create the # new OMEGA matrix implied by the new immigration rates. Plug in # the adjusted steady-state population distribution. Hit is with # the new OMEGA transition matrix and it should return the new # steady-state population distribution omega_new = np.dot(OMEGA2, omega_SSfx) omega_errs = np.absolute(omega_new - omega_SSfx) print( "The maximum absolute difference between the adjusted " + "steady-state population distribution and the " + "distribution generated by hitting the adjusted OMEGA " + "transition matrix is " + str(omega_errs.max()) ) # Plot the original immigration rates versus the adjusted # immigration rates immratesmaxdiff = np.absolute(imm_rates_orig - imm_rates_adj).max() print( "The maximum absolute distance between any two points " + "of the original immigration rates and adjusted " + "immigration rates is " + str(immratesmaxdiff) ) # plots age_per_EpS = np.arange(1, E + S + 1) pp.plot_omega_fixed( age_per_EpS, omega_SS_orig, omega_SSfx, E, S, path=OUTPUT_DIR ) pp.plot_imm_fixed( age_per_EpS, imm_rates_orig[fixper - 1, :], imm_rates_adj, E, S, path=OUTPUT_DIR, ) pp.plot_population_path( age_per_EpS, omega_path_lev, omega_SSfx, initial_data_year, initial_data_year, initial_data_year, S, path=OUTPUT_DIR, ) # Return objects in a dictionary pop_dict = { "omega": omega_path_S, "g_n_ss": g_n_SS, "omega_SS": omega_SSfx[-S:] / omega_SSfx[-S:].sum(), "rho": mort_rates_S, "g_n": g_n_path, "imm_rates": imm_rates_mat, "omega_S_preTP": omega_S_preTP, } return pop_dict