Source code for covid19_inference.model.spreading_rate

# ------------------------------------------------------------------------------ #
# This file implements the change points in the spreading rate
# ------------------------------------------------------------------------------ #

import logging
import pymc3 as pm
import theano
import theano.tensor as tt

from .model import *
from . import utility as ut

log = logging.getLogger(__name__)


"""
    TODO
    ----
    def lambda_t_with_transient
"""


[docs]def lambda_t_with_sigmoids( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0=0.5, model=None, name_lambda_t="lambda_t", hierarchical=None, sigma_lambda_cp=None, sigma_lambda_week_cp=None, prefix_lambdas="", shape=None, ): """ Builds a time dependent spreading rate :math:`\lambda_t` with change points. The change points are marked by a transient with a sigmoidal shape, with at Parameters ---------- change_points_list pr_median_lambda_0 pr_sigma_lambda_0 model : :class:`Cov19Model` if none, it is retrieved from the context Returns ------- lambda_t_log TODO ---- Documentation on this """ log.info("Lambda_t with sigmoids") # Get our default mode context model = modelcontext(model) hierarchical = ( model.is_hierarchical if model.is_hierarchical is None else hierarchical ) # ?Get change points random variable? if shape is None: shape = model.shape_of_regions if isinstance(shape, int): shape = (shape,) lambda_log_list, tr_time_list, tr_len_list = _make_change_point_RVs( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0, model=model, hierarchical=hierarchical, sigma_lambda_cp=sigma_lambda_cp, sigma_lambda_week_cp=sigma_lambda_week_cp, prefix_lambdas=prefix_lambdas, shape=shape, ) # Build the time-dependent spreading rate lambda_log_t_list = [lambda_log_list[0] * tt.ones((model.sim_len,) + shape)] lambda_before = lambda_log_list[0] # Loop over all lambda values and there corresponding transient values for tr_time, tr_len, lambda_after in zip( tr_time_list, tr_len_list, lambda_log_list[1:] ): # Create the right shape for the time array t = np.arange(model.sim_shape[0]) # If the model is hierarchical repeatly add the t array to itself to match the shape if shape: t = np.repeat(t[:, None], shape, axis=-1) # Applies standart sigmoid nonlinearity lambda_t = tt.nnet.sigmoid((t - tr_time) / tr_len * 4) * ( lambda_after - lambda_before ) # tr_len*4 because the derivative of the sigmoid at zero is 1/4, we want to set it to 1/tr_len lambda_before = lambda_after lambda_log_t_list.append(lambda_t) # Sum up all lambda values from the list for i, lambda_t in enumerate(lambda_log_t_list): pm.Deterministic(f"{prefix_lambdas}lambda_t_part_{i}", lambda_t) lambda_t_log = sum(lambda_log_t_list) # Create responding lambda_t pymc3 variable with given name (from parameters) pm.Deterministic(name_lambda_t, tt.exp(lambda_t_log)) return lambda_t_log
def lambda_t_with_linear_interp( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0=0.5, model=None, name_lambda_t="lambda_t", ): """ Parameters ---------- change_points_list pr_median_lambda_0 pr_sigma_lambda_0 model : :class:`Cov19Model` if none, it is retrieved from the context Returns ------- lambda_t_log TODO ---- Documentation on this """ log.info("Lambda_t linear in lin-space") # Get our default mode context model = modelcontext(model) # ?Get change points random variable? lambda_log_list, tr_time_list, tr_len_list = _make_change_point_RVs( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0, model=model ) # Build the time-dependent spreading rate lambda_t_list = [ tt.exp(lambda_log_list[0]) * tt.ones(model.sim_shape) ] # model.sim_shape = (time, state) lambda_log_before = lambda_log_list[0] # Loop over all lambda values and there corresponding transient values for tr_time, tr_len, lambda_log_after in zip( tr_time_list, tr_len_list, lambda_log_list[1:] ): # Create the right shape for the time array t = np.arange(model.sim_shape[0]) # If the model is hierarchical repeatly add the t array to itself to match the shape if model.is_hierarchical: t = np.repeat(t[:, None], model.sim_shape[1], axis=-1) step_t = tt.clip((t - tr_time + tr_len / 2) / tr_len, 0, 1) # create the time series of lambda lambda_t = step_t * (tt.exp(lambda_log_after) - tt.exp(lambda_log_before)) lambda_log_before = lambda_log_after lambda_t_list.append(lambda_t) # Sum up all lambda values from the list lambda_t_log = tt.log(sum(lambda_t_list)) # Create responding lambda_t pymc3 variable with given name (from parameters) pm.Deterministic(name_lambda_t, tt.exp(lambda_t_log)) return lambda_t_log def _make_change_point_RVs( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0=1, model=None, hierarchical=None, sigma_lambda_cp=None, sigma_lambda_week_cp=None, prefix_lambdas="", shape=None, ): """ Parameters ---------- priors_dict change_points_list model Returns ------- TODO ---- Documentation on this Add a way to name the changepoints """ if shape is None: shape = model.shape_of_regions def hierarchical_mod(): lambda_0_hc_L2_log, lambda_0_hc_L1_log = ut.hierarchical_normal( name_L1="lambda_0_hc_L1_log_", name_L2="lambda_0_hc_L2_log", name_sigma="sigma_lambda_0_hc_L1", pr_mean=np.log(pr_median_lambda_0), pr_sigma=pr_sigma_lambda_0, error_cauchy=False, shape=shape, ) lambda_L1_log_list = [] pm.Deterministic("lambda_0_hc_L2", tt.exp(lambda_0_hc_L2_log)) pm.Deterministic("lambda_0_hc_L1", tt.exp(lambda_0_hc_L1_log)) lambda_log_list.append(lambda_0_hc_L2_log) lambda_L1_log_list.append(lambda_0_hc_L1_log) # Create lambda_log list for i, cp in enumerate(change_points_list): if cp["relative_to_previous"]: if sigma_lambda_cp is None: ( factor_lambda_cp_hc_L2_log, factor_lambda_cp_hc_L1_log, ) = ut.hierarchical_normal( name_L1=f"factor_lambda_{i + 1}_hc_L1_log", name_L2=f"factor_lambda_{i + 1}_hc_L2_log", name_sigma=f"sigma_lambda_{i + 1}_hc_L1", pr_mean=tt.log(cp["pr_factor_to_previous"]), pr_sigma=cp["pr_sigma_lambda"], error_cauchy=False, shape=shape, ) lambda_cp_hc_L2_log = ( lambda_log_list[-1] + factor_lambda_cp_hc_L2_log ) lambda_cp_hc_L1_log = ( lambda_L1_log_list[-1] + factor_lambda_cp_hc_L2_log ) else: if sigma_lambda_week_cp is None: raise RuntimeError("sigma_lambda_week_cp needs also to be set") factor_lambda_week_log = ( pm.Normal( name=f"diff_lambda_w_raw_{i+1}", mu=0, sigma=1.0, shape=() ) ) * sigma_lambda_week_cp lambda_cp_hc_L1_log = ( lambda_L1_log_list[-1] + factor_lambda_week_log ) pr_mean_lambda = lambda_log_list[-1] + factor_lambda_week_log lambda_cp_hc_L2_log = ( ( pm.Normal( name=f"diff_lambda_cw_raw_{i + 1}", mu=pr_mean_lambda, sigma=1.0, shape=shape, ) ) - pr_mean_lambda ) * sigma_lambda_cp + pr_mean_lambda else: pr_mean_lambda = np.log(cp["pr_median_lambda"]) lambda_cp_hc_L2_log, lambda_cp_hc_L1_log = ut.hierarchical_normal( name_L1=f"lambda_{i + 1}_hc_L1_log", name_L2=f"lambda_{i + 1}_hc_L2_log", name_sigma=f"sigma_lambda_{i + 1}_hc_L1", pr_mean=pr_mean_lambda, pr_sigma=cp["pr_sigma_lambda"], error_cauchy=False, shape=shape, ) pm.Deterministic(f"lambda_{i + 1}_hc_L2", tt.exp(lambda_cp_hc_L2_log)) pm.Deterministic(f"lambda_{i + 1}_hc_L1", tt.exp(lambda_cp_hc_L1_log)) lambda_log_list.append(lambda_cp_hc_L2_log) lambda_L1_log_list.append(lambda_cp_hc_L1_log) # Create transient time list dt_before = model.sim_begin if hierarchical == "only_lambda": for i, cp in enumerate(change_points_list): dt_begin_transient = cp["pr_mean_date_transient"] if dt_before is not None and dt_before > dt_begin_transient: raise RuntimeError( "Dates of change points are not temporally ordered" ) prior_mean = (dt_begin_transient - model.sim_begin).days tr_time = pm.Normal( name=f"{prefix_lambdas}transient_day_{i + 1}", mu=prior_mean, sigma=cp["pr_sigma_date_transient"], shape=shape, ) tr_time_list.append(tr_time) dt_before = dt_begin_transient # Create transient length list for i, cp in enumerate(change_points_list): tr_len_raw = pm.Normal( name=f"{prefix_lambdas}transient_len_{i + 1}_raw_", mu=cp["pr_median_transient_len"], sigma=cp["pr_sigma_transient_len"], shape=shape, ) pm.Deterministic( f"{prefix_lambdas}transient_len_{i + 1}", tt.nnet.softplus(tr_len_raw), ) tr_len_list.append(tt.nnet.softplus(tr_len_raw)) else: for i, cp in enumerate(change_points_list): dt_begin_transient = cp["pr_mean_date_transient"] if dt_before is not None and dt_before > dt_begin_transient: raise RuntimeError( "Dates of change points are not temporally ordered" ) prior_mean = (dt_begin_transient - model.sim_begin).days tr_time_L2, _ = ut.hierarchical_normal( name_L1=f"transient_day_{i + 1}_hc_L1", name_L2=f"transient_day_{i + 1}_hc_L2", name_sigma=f"sigma_transient_day_{i + 1}_L1", pr_mean=prior_mean, pr_sigma=cp["pr_sigma_date_transient"], error_fact=1.0, error_cauchy=False, shape=shape, ) tr_time_list.append(tr_time_L2) dt_before = dt_begin_transient # Create transient len list for i, cp in enumerate(change_points_list): # if model.sim_ndim == 1: tr_len_L2_raw, tr_len_L1_raw = ut.hierarchical_normal( name_L1=f"transient_len_{i + 1}_hc_L1_raw", name_L2=f"transient_len_{i + 1}_hc_L2_raw", name_sigma=f"sigma_transient_len_{i + 1}", pr_mean=cp["pr_median_transient_len"], pr_sigma=cp["pr_sigma_transient_len"], error_fact=1.0, error_cauchy=False, shape=shape, ) if tr_len_L1_raw is not None: pm.Deterministic( f"transient_len_{i + 1}_hc_L1", tt.nnet.softplus(tr_len_L1_raw) ) pm.Deterministic( f"transient_len_{i + 1}_hc_L2", tt.nnet.softplus(tr_len_L2_raw) ) else: pm.Deterministic( f"transient_len_{i + 1}", tt.nnet.softplus(tr_len_L2_raw) ) tr_len_list.append(tt.nnet.softplus(tr_len_L2_raw)) def non_hierarchical_mod(): lambda_0_log = pm.Normal( name=f"{prefix_lambdas}lambda_0_log_", mu=np.log(pr_median_lambda_0), sigma=pr_sigma_lambda_0, shape=shape, ) pm.Deterministic(f"{prefix_lambdas}lambda_0", tt.exp(lambda_0_log)) lambda_log_list.append(lambda_0_log) # Create lambda_log list for i, cp in enumerate(change_points_list): if cp["relative_to_previous"]: pr_mean_lambda = lambda_log_list[-1] + tt.log( cp["pr_factor_to_previous"] ) if sigma_lambda_cp is not None: lambda_cp_log = ( pm.Normal( name=f"{prefix_lambdas}lambda_{i + 1}_log_", mu=pr_mean_lambda, sigma=1.0, shape=shape, ) - pr_mean_lambda ) * sigma_lambda_cp + pr_mean_lambda else: lambda_cp_log = pm.Normal( name=f"{prefix_lambdas}lambda_{i + 1}_log_", mu=pr_mean_lambda, sigma=cp["pr_sigma_lambda"], shape=shape, ) else: pr_mean_lambda = np.log(cp["pr_median_lambda"]) lambda_cp_log = pm.Normal( name=f"{prefix_lambdas}lambda_{i + 1}_log_", mu=pr_mean_lambda, sigma=cp["pr_sigma_lambda"], shape=shape, ) pm.Deterministic(f"{prefix_lambdas}lambda_{i + 1}", tt.exp(lambda_cp_log)) lambda_log_list.append(lambda_cp_log) # Create transient time list dt_before = model.sim_begin for i, cp in enumerate(change_points_list): dt_begin_transient = cp["pr_mean_date_transient"] if dt_before is not None and dt_before > dt_begin_transient: raise RuntimeError("Dates of change points are not temporally ordered") prior_mean = (dt_begin_transient - model.sim_begin).days tr_time = pm.Normal( name=f"{prefix_lambdas}transient_day_{i + 1}", mu=prior_mean, sigma=cp["pr_sigma_date_transient"], shape=shape, ) tr_time_list.append(tr_time) dt_before = dt_begin_transient # Create transient length list for i, cp in enumerate(change_points_list): tr_len_raw = pm.Normal( name=f"{prefix_lambdas}transient_len_{i + 1}_raw_", mu=cp["pr_median_transient_len"], sigma=cp["pr_sigma_transient_len"], shape=shape, ) pm.Deterministic( f"{prefix_lambdas}transient_len_{i + 1}", tt.nnet.softplus(tr_len_raw), ) tr_len_list.append(tt.nnet.softplus(tr_len_raw)) # ------------------------------------------------------------------------------ # # Start of function body # ------------------------------------------------------------------------------ # default_priors_change_points = dict( pr_median_lambda=pr_median_lambda_0, pr_sigma_lambda=pr_sigma_lambda_0, pr_sigma_date_transient=2, pr_median_transient_len=4, pr_sigma_transient_len=1, pr_mean_date_transient=None, relative_to_previous=False, pr_factor_to_previous=1, ) for cp_priors in change_points_list: set_missing_priors_with_default(cp_priors, default_priors_change_points) model = modelcontext(model) lambda_log_list = [] tr_time_list = [] tr_len_list = [] if hierarchical is None: hierarchical = model.is_hierarchical if hierarchical: hierarchical_mod() else: non_hierarchical_mod() return lambda_log_list, tr_time_list, tr_len_list def _smooth_step_function(start_val, end_val, t_begin, t_end, t_total): """ Instead of going from start_val to end_val in one step, make the change a gradual linear slope. Parameters ---------- start_val : number Starting value end_val : number Target value t_begin : number or array (theano) Time point (inbetween 0 and t_total) where start_val is placed t_end : number or array (theano) Time point (inbetween 0 and t_total) where end_val is placed t_total : integer Total number of time points Returns ------- : theano vector vector of length t_total with the values of the parameterised f(t) """ t = np.arange(t_total) return ( tt.clip((t - t_begin) / (t_end - t_begin), 0, 1) * (end_val - start_val) + start_val )