Source code for covid19_inference.model

import datetime
import platform
import logging

import theano
import theano.tensor as tt
import numpy as np
import pymc3 as pm
from pymc3 import Model  # this import is needed to get pymc3-style "with ... as model:"

from . import model_helper as mh

log = logging.getLogger(__name__)

if platform.system() == "Darwin":
    theano.config.gcc.cxxflags = "-Wno-c++11-narrowing"  # workaround for macos


[docs]class Cov19Model(Model): """ Model class used to create a covid-19 propagation dynamics model. Parameters below are passed to the constructor. Attributes (Variables) are available after creation and can be accessed from every instance. Some background: * The simulation starts `diff_data_sim` days before the data. * The data has a certain length, on which the inference is based. This length is given by `new_cases_obs`. * After the inference, a forecast takes of length `fcast_len` takes place, starting on the day after the last data point in `new_cases_obs`. * In total, traces produced by a model run have the length `sim_len = diff_data_sim + data_len + fcast_len` * Date ranges include both boundaries. For example, if `data_begin` is March 1 and `data_end` is March 3 then `data_len` will be 3. Parameters ---------- new_cases_obs : 1 or 2d array If the array is two-dimensional, an hierarchical model will be constructed. First dimension is then time, the second the region/country. data_begin : datatime.datetime Date of the first data point fcast_len : int Number of days the simulations runs longer than the data diff_data_sim : int Number of days the simulation starts earlier than the data. Should be significantly longer than the delay between infection and report of cases. N_population : number or 1d array Number of inhabitance in region, needed for the S(E)IR model. Is ideally 1 dimensional if new_cases_obs is 2 dimensional name : string suffix appended to the name of random variables saved in the trace model : specify a model, if this one should expand another Attributes ---------- new_cases_obs : 1 or 2d array as passed during construction data_begin : datatime.datetime date of the first data point in the data data_end : datatime.datetime date of the last data point in the data sim_begin : datatime.datetime date at which the simulation begins sim_end : datatime.datetime date at which the simulation ends (should match fcast_end) fcast_begin : datatime.datetime date at which the forecast starts (should be one day after data_end) fcast_end : datatime.datetime data at which the forecast ends data_len : int total number of days in the data sim_len : int total number of days in the simulation fcast_len : int total number of days in the forecast diff_data_sim : int difference in days between the simulation begin and the data begin. The simulation starting time is usually earlier than the data begin. Example ------- .. code-block:: with Cov19Model(**params) as model: # Define model here """ def __init__( self, new_cases_obs, data_begin, fcast_len, diff_data_sim, N_population, name="", model=None, ): super().__init__(name=name, model=model) # first dim time, second might be state self.new_cases_obs = np.array(new_cases_obs) self.sim_ndim = new_cases_obs.ndim self.N_population = N_population # these are dates specifying the bounds of data, simulation and forecast. # Jonas Sebastian and Paul agreed to use fully inclusive intervals this makes # calculating ranges a bit harder but function arguments are more intuitive. # 01 Mar, 02 Mar, 03 Mar # data_begin = 01 Mar # data_end = 03 Mar # [data_begin, data_end] # (data_end - data_begin).days = 2 # assign properties self._data_begin = data_begin self._sim_begin = self.data_begin - datetime.timedelta(days=diff_data_sim) self._data_end = self.data_begin + datetime.timedelta( days=len(new_cases_obs) - 1 ) self._sim_end = self.data_end + datetime.timedelta(days=fcast_len) # totel length of simulation, get later via the shape sim_len = len(new_cases_obs) + diff_data_sim + fcast_len if sim_len < len(new_cases_obs) + diff_data_sim: raise RuntimeError( "Simulation ends before the end of the data. Increase num_days_sim." ) # shape and dimension of simulation if self.sim_ndim == 1: self.sim_shape = (sim_len,) elif self.sim_ndim == 2: self.sim_shape = (sim_len, self.new_cases_obs.shape[1]) if self.data_end > datetime.datetime.today(): log.warning( f"Your last data point is in the future ({self.data_end}). " + "Are you traveling faster than light?" ) """ Properties ---------- Useful properties, mainly used by the plot module. """ """ Utility properties """ @property def shape_num_regions(self): # Number of regions as tuple of int return () if self.sim_ndim == 1 else self.sim_shape[1] @property def is_hierarchical(self): return self.new_cases_obs.ndim == 2 """ Forecast properties """ @property def fcast_begin(self): # Returns date on which the forecast starts i.e. the day after the data ends return self.data_end + datetime.timedelta(days=1) @property def fcast_end(self): # Returns date on which the simulation and the forecast end return self.sim_end @property def fcast_len(self): # Returns the length of the forecast in days return (self.sim_end - self.data_end).days """ Data properties """ @property def data_len(self): return self.new_cases_obs.shape[0] @property def data_dim(self): return self.new_cases_obs.shape[1] @property def data_begin(self): return self._data_begin @property def data_end(self): return self._data_end """ Simulation properties """ @property def sim_len(self): return self.sim_shape[0] @property def sim_begin(self): return self._sim_begin @property def sim_end(self): return self._sim_end @property def diff_data_sim(self): return (self.data_begin - self.sim_begin).days
[docs]def modelcontext(model): """ return the given model or try to find it in the context if there was none supplied. """ if model is None: return Cov19Model.get_context() return model
[docs]def student_t_likelihood( new_cases_inferred, pr_beta_sigma_obs=30, nu=4, offset_sigma=1, model=None, data_obs=None, name_student_t="_new_cases_studentT", name_sigma_obs="sigma_obs", ): r""" Set the likelihood to apply to the model observations (`model.new_cases_obs`) We assume a :class:`~pymc3.distributions.continuous.StudentT` distribution because it is robust against outliers [Lange1989]_. The likelihood follows: .. math:: P(\text{data\_obs}) &\sim StudentT(\text{mu} = \text{new\_cases\_inferred}, sigma =\sigma, \text{nu} = \text{nu})\\ \sigma &= \sigma_r \sqrt{\text{new\_cases\_inferred} + \text{offset\_sigma}} The parameter :math:`\sigma_r` follows a :class:`~pymc3.distributions.continuous.HalfCauchy` prior distribution with parameter beta set by ``pr_beta_sigma_obs``. If the input is 2 dimensional, the parameter :math:`\sigma_r` is different for every region. Parameters ---------- new_cases_inferred : :class:`~theano.tensor.TensorVariable` One or two dimensonal array. If 2 dimensional, the first dimension is time and the second are the regions/countries pr_beta_sigma_obs : float The beta of the :class:`~pymc3.distributions.continuous.HalfCauchy` prior distribution of :math:`\sigma_r`. nu : float How flat the tail of the distribution is. Larger nu should make the model more robust to outliers. Defaults to 4 [Lange1989]_. offset_sigma : float An offset added to the sigma, to make the inference procedure robust. Otherwise numbers of ``new_cases_inferred`` would lead to very small errors and diverging likelihoods. Defaults to 1. model: The model on which we want to add the distribution data_obs : array The data that is observed. By default it is ``model.new_cases_ob`` name_student_t : The name under which the studentT distribution is saved in the trace. name_sigma_obs : The name under which the distribution of the observable error is saved in the trace Returns ------- None References ---------- .. [Lange1989] Lange, K., Roderick J. A. Little, & Jeremy M. G. Taylor. (1989). Robust Statistical Modeling Using the t Distribution. Journal of the American Statistical Association, 84(408), 881-896. doi:10.2307/2290063 """ model = modelcontext(model) len_sigma_obs = () if model.sim_ndim == 1 else model.sim_shape[1] sigma_obs = pm.HalfCauchy( name_sigma_obs, beta=pr_beta_sigma_obs, shape=len_sigma_obs ) if data_obs is None: data_obs = model.new_cases_obs pm.StudentT( name=name_student_t, nu=nu, mu=new_cases_inferred[: len(data_obs)], sigma=tt.abs_(new_cases_inferred[: len(data_obs)] + offset_sigma) ** 0.5 * sigma_obs, # offset and tt.abs to avoid nans observed=data_obs, )
[docs]def SIR( lambda_t_log, mu, pr_I_begin=100, model=None, return_all=False, save_all=False, ): r""" Implements the susceptible-infected-recovered model. .. math:: I_{new}(t) &= \lambda_t I(t-1) \frac{S(t-1)}{N} \\ S(t) &= S(t-1) - I_{new}(t) \\ I(t) &= I(t-1) + I_{new}(t) - \mu I(t) The prior distribution of the recovery rate :math:`\mu` is set to :math:`LogNormal(\text{log(pr\_median\_mu)), pr\_sigma\_mu})`. And the prior distribution of :math:`I(0)` to :math:`HalfCauchy(\text{pr\_beta\_I\_begin})` Parameters ---------- lambda_t_log : :class:`~theano.tensor.TensorVariable` time series of the logarithm of the spreading rate, 1 or 2-dimensional. If 2-dimensional the first dimension is time. mu : :class:`~theano.tensor.TensorVariable` the recovery rate :math:`\mu`, typically a random variable. Can be 0 or 1-dimensional. If 1-dimensional, the dimension are the different regions. pr_I_begin : float or array_like or :class:`~theano.tensor.TensorVariable` Prior beta of the Half-Cauchy distribution of :math:`I(0)`. pr_median_mu : float or array_like Prior for the median of the lognormal distrubution of the recovery rate :math:`\mu`. pr_sigma_mu : float or array_like Prior for the sigma of the lognormal distribution of recovery rate :math:`\mu`. model : :class:`Cov19Model` if none, it is retrieved from the context return_all : bool if True, returns ``new_I_t``, ``I_t``, ``S_t`` otherwise returns only ``new_I_t`` save_all : bool if True, saves ``new_I_t``, ``I_t``, ``S_t`` in the trace, otherwise it saves only ``new_I_t`` Returns ------- new_I_t : :class:`~theano.tensor.TensorVariable` time series of the number daily newly infected persons. I_t : :class:`~theano.tensor.TensorVariable` time series of the infected (if return_all set to True) S_t : :class:`~theano.tensor.TensorVariable` time series of the susceptible (if return_all set to True) """ model = modelcontext(model) # Total number of people in population N = model.N_population # Number of regions as tuple of int num_regions = () if model.sim_ndim == 1 else model.sim_shape[1] # Prior distributions of starting populations (infectious, susceptibles) if isinstance(pr_I_begin, tt.TensorVariable): I_begin = pr_I_begin else: I_begin = pm.HalfCauchy(name="I_begin", beta=pr_I_begin, shape=num_regions) S_begin = N - I_begin lambda_t = tt.exp(lambda_t_log) new_I_0 = tt.zeros_like(I_begin) # Runs SIR model: def next_day(lambda_t, S_t, I_t, _, mu, N): new_I_t = lambda_t / N * I_t * S_t S_t = S_t - new_I_t I_t = I_t + new_I_t - mu * I_t I_t = tt.clip(I_t, -1, N) # for stability S_t = tt.clip(S_t, 0, N) return S_t, I_t, new_I_t # theano scan returns two tuples, first one containing a time series of # what we give in outputs_info : S, I, new_I outputs, _ = theano.scan( fn=next_day, sequences=[lambda_t], outputs_info=[S_begin, I_begin, new_I_0], non_sequences=[mu, N], ) S_t, I_t, new_I_t = outputs pm.Deterministic("new_I_t", new_I_t) if save_all: pm.Deterministic("S_t", S_t) pm.Deterministic("I_t", I_t) if return_all: return new_I_t, I_t, S_t else: return new_I_t
[docs]def SEIR( lambda_t_log, pr_beta_I_begin=100, pr_beta_new_E_begin=50, pr_median_mu=1 / 8, pr_mean_median_incubation=4, pr_sigma_median_incubation=1, sigma_incubation=0.4, pr_sigma_mu=0.2, model=None, return_all=False, save_all=False, name_median_incubation="median_incubation", ): r""" Implements a model similar to the susceptible-exposed-infected-recovered model. Instead of a exponential decaying incubation period, the length of the period is lognormal distributed. The complete equation is: .. math:: E_{\text{new}}(t) &= \lambda_t I(t-1) \frac{S(t)}{N} \\ S(t) &= S(t-1) - E_{\text{new}}(t) \\ I_\text{new}(t) &= \sum_{k=1}^{10} \beta(k) E_{\text{new}}(t-k) \\ I(t) &= I(t-1) + I_{\text{new}}(t) - \mu I(t) \\ \beta(k) & = P(k) \sim LogNormal(\text{log}(d_{\text{incubation}})), \text{sigma\_incubation}) The recovery rate :math:`\mu` and the incubation period is the same for all regions and follow respectively: .. math:: P(\mu) &\sim LogNormal(\text{log(pr\_median\_mu)), pr\_sigma\_mu}) \\ P(d_{\text{incubation}}) &\sim Normal(\text{pr\_mean\_median\_incubation, pr\_sigma\_median\_incubation}) The initial number of infected and newly exposed differ for each region and follow prior :class:`~pymc3.distributions.continuous.HalfCauchy` distributions: .. math:: E(t) &\sim HalfCauchy(\text{pr\_beta\_E\_begin}) \:\: \text{ for} \: t \in \{-9, -8, ..., 0\}\\ I(0) &\sim HalfCauchy(\text{pr\_beta\_I\_begin}). Parameters ---------- lambda_t_log : :class:`~theano.tensor.TensorVariable` time series of the logarithm of the spreading rate, 1 or 2-dimensional. If 2-dimensional, the first dimension is time. pr_beta_I_begin : float or array_like Prior beta of the :class:`~pymc3.distributions.continuous.HalfCauchy` distribution of :math:`I(0)`. pr_beta_new_E_begin : float or array_like Prior beta of the :class:`~pymc3.distributions.continuous.HalfCauchy` distribution of :math:`E(0)`. pr_median_mu : float or array_like Prior for the median of the :class:`~pymc3.distributions.continuous.Lognormal` distribution of the recovery rate :math:`\mu`. pr_mean_median_incubation : Prior mean of the :class:`~pymc3.distributions.continuous.Normal` distribution of the median incubation delay :math:`d_{\text{incubation}}`. Defaults to 4 days [Nishiura2020]_, which is the median serial interval (the important measure here is not exactly the incubation period, but the delay until a person becomes infectious which seems to be about 1 day earlier as showing symptoms). pr_sigma_median_incubation : Prior sigma of the :class:`~pymc3.distributions.continuous.Normal` distribution of the median incubation delay :math:`d_{\text{incubation}}`. Default is 1 day. sigma_incubation : Scale parameter of the :class:`~pymc3.distributions.continuous.Lognormal` distribution of the incubation time/ delay until infectiousness. The default is set to 0.4, which is about the scale found in [Nishiura2020]_, [Lauer2020]_. pr_sigma_mu : float or array_like Prior for the sigma of the lognormal distribution of recovery rate :math:`\mu`. model : :class:`Cov19Model` if none, it is retrieved from the context return_all : bool if True, returns ``new_I_t``, ``new_E_t``, ``I_t``, ``S_t`` otherwise returns only ``new_I_t`` save_all : bool if True, saves ``new_I_t``, ``new_E_t``, ``I_t``, ``S_t`` in the trace, otherwise it saves only ``new_I_t`` name_median_incubation : str The name under which the median incubation time is saved in the trace Returns ------- new_I_t : :class:`~theano.tensor.TensorVariable` time series of the number daily newly infected persons. new_E_t : :class:`~theano.tensor.TensorVariable` time series of the number daily newly exposed persons. (if return_all set to True) I_t : :class:`~theano.tensor.TensorVariable` time series of the infected (if return_all set to True) S_t : :class:`~theano.tensor.TensorVariable` time series of the susceptible (if return_all set to True) References ---------- .. [Nishiura2020] Nishiura, H.; Linton, N. M.; Akhmetzhanov, A. R. Serial Interval of Novel Coronavirus (COVID-19) Infections. Int. J. Infect. Dis. 2020, 93, 284–286. https://doi.org/10.1016/j.ijid.2020.02.060. .. [Lauer2020] Lauer, S. A.; Grantz, K. H.; Bi, Q.; Jones, F. K.; Zheng, Q.; Meredith, H. R.; Azman, A. S.; Reich, N. G.; Lessler, J. The Incubation Period of Coronavirus Disease 2019 (COVID-19) From Publicly Reported Confirmed Cases: Estimation and Application. Ann Intern Med 2020. https://doi.org/10.7326/M20-0504. """ model = modelcontext(model) # Build prior distrubutions: # -------------------------- # Prior distribution of recovery rate mu mu = pm.Lognormal(name="mu", mu=np.log(pr_median_mu), sigma=pr_sigma_mu,) # Total number of people in population N = model.N_population # Number of regions as tuple of int num_regions = () if model.sim_ndim == 1 else model.sim_shape[1] # Prior distributions of starting populations (exposed, infectious, susceptibles) # We choose to consider the transitions of newly exposed people of the last 10 days. if num_regions == (): new_E_begin = pm.HalfCauchy( name="new_E_begin", beta=pr_beta_new_E_begin, shape=11 ) else: new_E_begin = pm.HalfCauchy( name="new_E_begin", beta=pr_beta_new_E_begin, shape=(11, num_regions) ) I_begin = pm.HalfCauchy(name="I_begin", beta=pr_beta_I_begin, shape=num_regions) S_begin = N - I_begin - pm.math.sum(new_E_begin, axis=0) lambda_t = tt.exp(lambda_t_log) new_I_0 = tt.zeros_like(I_begin) median_incubation = pm.Normal( name_median_incubation, mu=pr_mean_median_incubation, sigma=pr_sigma_median_incubation, ) # Choose transition rates (E to I) according to incubation period distribution if not num_regions: x = np.arange(1, 11) else: x = np.arange(1, 11)[:, None] beta = mh.tt_lognormal(x, tt.log(median_incubation), sigma_incubation) # Runs SEIR model: def next_day( lambda_t, S_t, nE1, nE2, nE3, nE4, nE5, nE6, nE7, nE8, nE9, nE10, I_t, _, mu, beta, N, ): new_E_t = lambda_t / N * I_t * S_t S_t = S_t - new_E_t new_I_t = ( beta[0] * nE1 + beta[1] * nE2 + beta[2] * nE3 + beta[3] * nE4 + beta[4] * nE5 + beta[5] * nE6 + beta[6] * nE7 + beta[7] * nE8 + beta[8] * nE9 + beta[9] * nE10 ) I_t = I_t + new_I_t - mu * I_t I_t = tt.clip(I_t, -1, N - 1) # for stability S_t = tt.clip(S_t, -1, N) return S_t, new_E_t, I_t, new_I_t # theano scan returns two tuples, first one containing a time series of # what we give in outputs_info : S, E's, I, new_I outputs, _ = theano.scan( fn=next_day, sequences=[lambda_t], outputs_info=[ S_begin, dict(initial=new_E_begin, taps=[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10]), I_begin, new_I_0, ], non_sequences=[mu, beta, N], ) S_t, new_E_t, I_t, new_I_t = outputs pm.Deterministic("new_I_t", new_I_t) if save_all: pm.Deterministic("S_t", S_t) pm.Deterministic("I_t", I_t) pm.Deterministic("new_E_t", new_E_t) if return_all: return new_I_t, new_E_t, I_t, S_t else: return new_I_t
[docs]def delay_cases( new_I_t, pr_median_delay=10, pr_sigma_median_delay=0.2, pr_median_scale_delay=0.3, pr_sigma_scale_delay=None, model=None, save_in_trace=True, name_delay="delay", name_delayed_cases="new_cases_raw", len_input_arr=None, len_output_arr=None, diff_input_output=None, ): r""" Convolves the input by a lognormal distribution, in order to model a delay: .. math:: y_\text{delayed}(t) &= \sum_{\tau=0}^T y_\text{input}(\tau) LogNormal[log(\text{delay}), \text{pr\_median\_scale\_delay}](t - \tau)\\ log(\text{delay}) &= Normal(log(\text{pr\_sigma\_delay}), \text{pr\_sigma\_delay}) For clarification: the :math:`LogNormal` distribution is a function evaluated at :math:`t - \tau`. If the model is 2-dimensional, the :math:`log(\text{delay})` is hierarchically modelled with the :func:`hierarchical_normal` function using the default parameters except that the prior :math:`\sigma` of :math:`\text{delay}_\text{L2}` is HalfNormal distributed (``error_cauchy=False``). Parameters ---------- new_I_t : :class:`~theano.tensor.TensorVariable` The input, typically the number newly infected cases :math:`I_{new}(t)` of from the output of :func:`SIR` or :func:`SEIR`. pr_median_delay : float The mean of the :class:`~pymc3.distributions.continuous.normal` distribution which models the prior median of the :class:`~pymc3.distributions.continuous.LogNormal` delay kernel. pr_sigma_median_delay : float The standart devaiation of :class:`~pymc3.distributions.continuous.normal` distribution which models the prior median of the :class:`~pymc3.distributions.continuous.LogNormal` delay kernel. pr_median_scale_delay : float The scale (width) of the :class:`~pymc3.distributions.continuous.LogNormal` delay kernel. pr_sigma_scale_delay : float If it is not None, the scale is of the delay is kernel follows a prior :class:`~pymc3.distributions.continuous.LogNormal` distribution, with median ``pr_median_scale_delay`` and scale ``pr_sigma_scale_delay``. model : :class:`Cov19Model` if none, it is retrieved from the context save_in_trace : bool whether to save :math:`y_\text{delayed}` in the trace name_delay : str The name under which the delay is saved in the trace, suffixes and prefixes are added depending on which variable is saved. name_delayed_cases : str The name under which the delay is saved in the trace, suffixes and prefixes are added depending on which variable is saved. len_input_arr : Length of ``new_I_t``. By default equal to ``model.sim_len``. Necessary because the shape of theano tensors are not defined at when the graph is built. len_output_arr : int Length of the array returned. By default it set to the length of the cases_obs saved in the model plus the number of days of the forecast. diff_input_output : int Number of days the returned array begins later then the input. Should be significantly larger than the median delay. By default it is set to the ``model.diff_data_sim``. Returns ------- new_cases_inferred : :class:`~theano.tensor.TensorVariable` The delayed input :math:`y_\text{delayed}(t)`, typically the daily number new cases that one expects to measure. """ model = modelcontext(model) if len_output_arr is None: len_output_arr = model.data_len + model.fcast_len if diff_input_output is None: diff_input_output = model.diff_data_sim if len_input_arr is None: len_input_arr = model.sim_len len_delay = () if model.sim_ndim == 1 else model.sim_shape[1] delay_L2_log, delay_L1_log = hierarchical_normal( name_delay + "_log", "sigma_" + name_delay, np.log(pr_median_delay), pr_sigma_median_delay, len_delay, w=0.9, error_cauchy=False, ) if delay_L1_log is not None: pm.Deterministic(f"{name_delay}_L2", np.exp(delay_L2_log)) pm.Deterministic(f"{name_delay}_L1", np.exp(delay_L1_log)) else: pm.Deterministic(f"{name_delay}", np.exp(delay_L2_log)) if pr_sigma_scale_delay is not None: scale_delay_L2_log, scale_delay_L1_log = hierarchical_normal( "scale_" + name_delay, "sigma_scale_" + name_delay, np.log(pr_median_scale_delay), pr_sigma_scale_delay, len_delay, w=0.9, error_cauchy=False, ) if scale_delay_L1_log is not None: pm.Deterministic(f"scale_{name_delay}_L2", tt.exp(scale_delay_L2_log)) pm.Deterministic(f"scale_{name_delay}_L1", tt.exp(scale_delay_L1_log)) else: pm.Deterministic(f"scale_{name_delay}", tt.exp(scale_delay_L2_log)) else: scale_delay_L2_log = np.log(pr_median_scale_delay) new_cases_inferred = mh.delay_cases_lognormal( input_arr=new_I_t, len_input_arr=len_input_arr, len_output_arr=len_output_arr, median_delay=tt.exp(delay_L2_log), scale_delay=tt.exp(scale_delay_L2_log), delay_betw_input_output=diff_input_output, ) if save_in_trace: pm.Deterministic(name_delayed_cases, new_cases_inferred) return new_cases_inferred
[docs]def week_modulation( new_cases_raw, week_modulation_type="abs_sine", pr_mean_weekend_factor=0.3, pr_sigma_weekend_factor=0.5, week_end_days=(6, 7), model=None, save_in_trace=True, ): r""" Adds a weekly modulation of the number of new cases: .. math:: \text{new\_cases} &= \text{new\_cases\_raw} \cdot (1-f(t))\,, \qquad\text{with}\\ f(t) &= f_w \cdot \left(1 - \left|\sin\left(\frac{\pi}{7} t- \frac{1}{2}\Phi_w\right)\right| \right), if ``week_modulation_type`` is ``"abs_sine"`` (the default). If ``week_modulation_type`` is ``"step"``, the new cases are simply multiplied by the weekend factor on the days set by ``week_end_days`` The weekend factor :math:`f_w` follows a Lognormal distribution with median ``pr_mean_weekend_factor`` and sigma ``pr_sigma_weekend_factor``. It is hierarchically constructed if the input is two-dimensional by the function :func:`hierarchical_normal` with default arguments. The offset from Sunday :math:`\Phi_w` follows a flat :class:`~pymc3.distributions.continuous.VonMises` distribution and is the same for all regions. Parameters ---------- new_cases_raw : :class:`~theano.tensor.TensorVariable` The input array, can be one- or two-dimensional week_modulation_type : str The type of modulation, accepts ``"step"`` or ``"abs_sine`` (the default). pr_mean_weekend_factor : float Sets the prior mean of the factor :math:`f_w` by which weekends are counted. pr_sigma_weekend_factor : float Sets the prior sigma of the factor :math:`f_w` by which weekends are counted. week_end_days : tuple of ints The days counted as weekend if ``week_modulation_type`` is ``"step"`` model : :class:`Cov19Model` if none, it is retrieved from the context save_in_trace : bool If True (default) the new_cases are saved in the trace. Returns ------- new_cases : :class:`~theano.tensor.TensorVariable` """ model = modelcontext(model) shape_modulation = list(model.sim_shape) shape_modulation[0] -= model.diff_data_sim len_L2 = () if model.sim_ndim == 1 else model.sim_shape[1] week_end_factor_L2_log, week_end_factor_L1_log = hierarchical_normal( "weekend_factor_log", "sigma_weekend_factor", pr_mean=tt.log(pr_mean_weekend_factor), pr_sigma=pr_sigma_weekend_factor, len_L2=len_L2, ) week_end_factor_L2 = tt.exp(week_end_factor_L2_log) if model.sim_ndim == 1: pm.Deterministic("weekend_factor", week_end_factor_L2) elif model.sim_ndim == 2: week_end_factor_L1 = tt.exp(week_end_factor_L1_log) pm.Deterministic("weekend_factor_L2", week_end_factor_L2) pm.Deterministic("weekend_factor_L1", week_end_factor_L1) if week_modulation_type == "step": modulation = np.zeros(shape_modulation[0]) for i in range(shape_modulation[0]): date_curr = model.data_begin + datetime.timedelta(days=i) if date_curr.isoweekday() in week_end_days: modulation[i] = 1 elif week_modulation_type == "abs_sine": offset_rad = pm.VonMises("offset_modulation_rad", mu=0, kappa=0.01) offset = pm.Deterministic("offset_modulation", offset_rad / (2 * np.pi) * 7) t = np.arange(shape_modulation[0]) - model.data_begin.weekday() # Sunday @ zero modulation = 1 - tt.abs_(tt.sin(t / 7 * np.pi + offset_rad / 2)) if model.sim_ndim == 2: modulation = tt.shape_padaxis(modulation, axis=-1) multiplication_vec = tt.abs_( np.ones(shape_modulation) - week_end_factor_L2 * modulation ) new_cases_inferred_eff = new_cases_raw * multiplication_vec if save_in_trace: pm.Deterministic("new_cases", new_cases_inferred_eff) return new_cases_inferred_eff
[docs]def make_change_point_RVs( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0=1, model=None ): """ Parameters ---------- priors_dict change_points_list model Returns ------- """ default_priors_change_points = dict( pr_median_lambda=pr_median_lambda_0, pr_sigma_lambda=pr_sigma_lambda_0, pr_sigma_date_transient=2, pr_median_transient_len=4, pr_sigma_transient_len=0.5, pr_mean_date_transient=None, relative_to_previous=False, pr_factor_to_previous=1, ) for cp_priors in change_points_list: mh.set_missing_with_default(cp_priors, default_priors_change_points) model = modelcontext(model) len_L2 = () if model.sim_ndim == 1 else model.sim_shape[1] lambda_log_list = [] tr_time_list = [] tr_len_list = [] # lambda_0_L2_log, lambda_0_L1_log = hierarchical_normal( "lambda_0_log", "sigma_lambda_0", np.log(pr_median_lambda_0), pr_sigma_lambda_0, len_L2, w=0.4, error_cauchy=False, ) if lambda_0_L1_log is not None: pm.Deterministic("lambda_0_L2", tt.exp(lambda_0_L2_log)) pm.Deterministic("lambda_0_L1", tt.exp(lambda_0_L1_log)) else: pm.Deterministic("lambda_0", tt.exp(lambda_0_L2_log)) lambda_log_list.append(lambda_0_L2_log) for i, cp in enumerate(change_points_list): if cp["relative_to_previous"]: pr_sigma_lambda = lambda_log_list[-1] + tt.log(cp["pr_factor_to_previous"]) else: pr_sigma_lambda = np.log(cp["pr_median_lambda"]) lambda_cp_L2_log, lambda_cp_L1_log = hierarchical_normal( f"lambda_{i + 1}_log", f"sigma_lambda_{i + 1}", pr_sigma_lambda, cp["pr_sigma_lambda"], len_L2, w=0.7, error_cauchy=False, ) if lambda_cp_L1_log is not None: pm.Deterministic(f"lambda_{i + 1}_L2", tt.exp(lambda_cp_L2_log)) pm.Deterministic(f"lambda_{i + 1}_L1", tt.exp(lambda_cp_L1_log)) else: pm.Deterministic(f"lambda_{i + 1}", tt.exp(lambda_cp_L2_log)) lambda_log_list.append(lambda_cp_L2_log) dt_before = model.sim_begin for i, cp in enumerate(change_points_list): dt_begin_transient = cp["pr_mean_date_transient"] if dt_before is not None and dt_before > dt_begin_transient: raise RuntimeError("Dates of change points are not temporally ordered") prior_mean = (dt_begin_transient - model.sim_begin).days tr_time_L2, _ = hierarchical_normal( f"transient_day_{i + 1}", f"sigma_transient_day_{i + 1}", prior_mean, cp["pr_sigma_date_transient"], len_L2, w=0.5, error_cauchy=False, error_fact=1.0, ) tr_time_list.append(tr_time_L2) dt_before = dt_begin_transient for i, cp in enumerate(change_points_list): # if model.sim_ndim == 1: tr_len_L2_log, tr_len_L1_log = hierarchical_normal( f"transient_len_{i + 1}_log", f"sigma_transient_len_{i + 1}", np.log(cp["pr_median_transient_len"]), cp["pr_sigma_transient_len"], len_L2, w=0.7, error_cauchy=False, ) if tr_len_L1_log is not None: pm.Deterministic(f"transient_len_{i + 1}_L2", tt.exp(tr_len_L2_log)) pm.Deterministic(f"transient_len_{i + 1}_L1", tt.exp(tr_len_L1_log)) else: pm.Deterministic(f"transient_len_{i + 1}", tt.exp(tr_len_L2_log)) tr_len_list.append(tt.exp(tr_len_L2_log)) return lambda_log_list, tr_time_list, tr_len_list
[docs]def lambda_t_with_sigmoids( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0=0.5, model=None ): """ Parameters ---------- change_points_list pr_median_lambda_0 pr_sigma_lambda_0 model : :class:`Cov19Model` if none, it is retrieved from the context Returns ------- """ model = modelcontext(model) model.sim_shape = model.sim_shape lambda_list, tr_time_list, tr_len_list = make_change_point_RVs( change_points_list, pr_median_lambda_0, pr_sigma_lambda_0, model=model ) # model.sim_shape = (time, state) # build the time-dependent spreading rate if len(model.sim_shape) == 2: lambda_t_list = [lambda_list[0] * tt.ones(model.sim_shape)] else: lambda_t_list = [lambda_list[0] * tt.ones(model.sim_shape)] lambda_before = lambda_list[0] for tr_time, tr_len, lambda_after in zip( tr_time_list, tr_len_list, lambda_list[1:] ): t = np.arange(model.sim_shape[0]) tr_len = tr_len + 1e-5 if len(model.sim_shape) == 2: t = np.repeat(t[:, None], model.sim_shape[1], axis=-1) lambda_t = tt.nnet.sigmoid((t - tr_time) / tr_len * 4) * ( lambda_after - lambda_before ) # tr_len*4 because the derivative of the sigmoid at zero is 1/4, we want to set it to 1/tr_len lambda_before = lambda_after lambda_t_list.append(lambda_t) lambda_t_log = sum(lambda_t_list) pm.Deterministic("lambda_t", tt.exp(lambda_t_log)) return lambda_t_log
[docs]def hierarchical_normal( name, name_sigma, pr_mean, pr_sigma, len_L2, w=1.0, error_fact=2.0, error_cauchy=True, ): r""" Implements an hierarchical normal model: .. math:: x_\text{L1} &= Normal(\text{pr\_mean}, \text{pr\_sigma})\\ y_{i, \text{L2}} &= Normal(x_\text{L1}, \sigma_\text{L2})\\ \sigma_\text{L2} &= HalfCauchy(\text{error\_fact} \cdot \text{pr\_sigma}) It is however implemented in a non-centered way, that the second line is changed to: .. math:: y_{i, \text{L2}} &= x_\text{L1} + Normal(0,1) \cdot \sigma_\text{L2} See for example https://arxiv.org/pdf/1312.0906.pdf Parameters ---------- name : str Name under which :math:`x_\text{L1}` and :math:`y_\text{L2}` saved in the trace. ``'_L1'`` and ``'_L2'`` is appended name_sigma : str Name under which :math:`\sigma_\text{L2}` saved in the trace. ``'_L2'`` is appended. pr_mean : float Prior mean of :math:`x_\text{L1}` pr_sigma : float Prior sigma for :math:`x_\text{L1}` and (muliplied by ``error_fact``) for :math:`\sigma_\text{L2}` len_L2 : int length of :math:`y_\text{L2}` error_fact : float Factor by which ``pr_sigma`` is multiplied as prior for `\sigma_\text{L2}` error_cauchy : bool if False, a :math:`HalfNormal` distribution is used for :math:`\sigma_\text{L2}` instead of :math:`HalfCauchy` Returns ------- y : :class:`~theano.tensor.TensorVariable` the random variable :math:`y_\text{L2}` x : :class:`~theano.tensor.TensorVariable` the random variable :math:`x_\text{L1}` """ if not len_L2: # not hierarchical Y = pm.Normal(name, mu=pr_mean, sigma=pr_sigma) X = None else: w = 1.0 if error_cauchy: sigma_Y = pm.HalfCauchy(name_sigma + "_L2", beta=error_fact * pr_sigma) else: sigma_Y = pm.HalfNormal(name_sigma + "_L2", sigma=error_fact * pr_sigma) X = pm.Normal(name + "_L1", mu=pr_mean, sigma=pr_sigma) phi = pm.Normal( name + "_L2_raw", mu=0, sigma=1, shape=len_L2 ) # (1-w**2)*sigma_X+1*w**2, shape=len_Y) Y = w * X + phi * sigma_Y pm.Deterministic(name + "_L2", Y) return Y, X
[docs]def make_prior_I( lambda_t_log, mu, pr_median_delay, pr_sigma_I_begin=2, n_data_points_used=5, model=None, ): """ Builds the prior for I begin by solving the SIR differential from the first data backwards. This decorrelates the I_begin from the lambda_t at the beginning, allowing a more efficient sampling. The example_one_bundesland runs about 30\% faster with this prior, instead of a HalfCauchy. Parameters ---------- lambda_t_log : :class:`~theano.tensor.TensorVariable` mu : :class:`~theano.tensor.TensorVariable` pr_median_delay : float pr_sigma_I_begin : float n_data_points_used : int model : :class:`Cov19Model` if none, it is retrieved from the context Returns ------- I_begin: :class:`~theano.tensor.TensorVariable` """ model = modelcontext(model) num_regions = () if model.sim_ndim == 1 else model.sim_shape[1] lambda_t = tt.exp(lambda_t_log) delay = round(pr_median_delay) num_new_I_ref = np.mean(model.new_cases_obs[:n_data_points_used], axis=0) days_diff = model.diff_data_sim - delay + 3 I_ref = num_new_I_ref / lambda_t[days_diff] I0_ref = I_ref / (1 + lambda_t[days_diff // 2] - mu) ** days_diff I_begin = I0_ref * tt.exp( pm.Normal( name="I_begin_ratio_log", mu=0, sigma=pr_sigma_I_begin, shape=num_regions ) ) pm.Deterministic("I_begin", I_begin) return I_begin
def hierarchical_beta(name, name_sigma, pr_mean, pr_sigma, len_L2): if not len_L2: # not hierarchical Y = pm.Beta(name, alpha=pr_mean / pr_sigma, beta=1 / pr_sigma * (1 - pr_mean)) X = None else: sigma_Y = pm.HalfCauchy(name_sigma + "_L2", beta=pr_sigma) X = pm.Beta( name + "_L1", alpha=pr_mean / pr_sigma, beta=1 / pr_sigma * (1 - pr_mean) ) Y = pm.Beta( name + "_L2", alpha=X / sigma_Y, beta=1 / sigma_Y * (1 - X), shape=len_L2 ) return Y, X