Source code for covid19_inference.model.utility

# ------------------------------------------------------------------------------ #
# Helper functions that are used by other parts of the modeling
# ------------------------------------------------------------------------------ #

import platform
import logging
import pymc as pm
from aesara import scan
import aesara.tensor as at

from .model import *

log = logging.getLogger(__name__)

# workaround for macos, sufficient to do this once
# if platform.system() == "Darwin":
#    aesara.config.gcc.cxxflags = "-Wno-c++11-narrowing"


[docs]def hierarchical_normal( pr_mean, pr_sigma, name_L1="delay_hc_L1", name_L2="delay_hc_L2", name_sigma="delay_hc_sigma", model=None, error_fact=2.0, error_cauchy=True, shape=None, ): r""" Implements an hierarchical normal model: .. math:: x_\text{L1} &= Normal(\text{pr\_mean}, \text{pr\_sigma})\\ y_{i, \text{L2}} &= Normal(x_\text{L1}, \sigma_\text{L2})\\ \sigma_\text{L2} &= HalfCauchy(\text{error\_fact} \cdot \text{pr\_sigma}) It is however implemented in a non-centered way, that the second line is changed to: .. math:: y_{i, \text{L2}} &= x_\text{L1} + Normal(0,1) \cdot \sigma_\text{L2} See for example https://arxiv.org/pdf/1312.0906.pdf Parameters ---------- name_L1 : str Name under which :math:`x_\text{L1}` is saved in the trace. name_L2 : str Name under which :math:`x_\text{L2}` is saved in the trace. The non-centered distribution in addition saved with a suffix _raw added. name_sigma : str Name under which :math:`\sigma_\text{L2}` is saved in the trace. pr_mean : float Prior mean of :math:`x_\text{L1}` pr_sigma : float Prior sigma for :math:`x_\text{L1}` and (muliplied by ``error_fact``) for :math:`\sigma_\text{L2}` len_L2 : int length of :math:`y_\text{L2}` error_fact : float Factor by which ``pr_sigma`` is multiplied as prior for `\sigma_\text{L2}` error_cauchy : bool if False, a :math:`HalfNormal` distribution is used for :math:`\sigma_\text{L2}` instead of :math:`HalfCauchy` Returns ------- y : :class:`~aesara.tensor.TensorVariable` the random variable :math:`y_\text{L2}` x : :class:`~aesara.tensor.TensorVariable` the random variable :math:`x_\text{L1}` """ model = modelcontext(model) if shape is None: shape = model.shape_of_regions if not model.is_hierarchical: raise RuntimeError("Model is not hierarchical.") if error_cauchy: sigma_Y = ( ( pm.HalfCauchy( name_sigma, beta=1, transform=pm.distributions.transforms.log_exp_m1 ) ) * error_fact * pr_sigma ) else: sigma_Y = ( ( pm.HalfNormal( name_sigma, sigma=1, transform=pm.distributions.transforms.log_exp_m1, ) ) * error_fact * pr_sigma ) X = pm.Normal(name_L1, mu=pr_mean, sigma=pr_sigma) phi = pm.Normal( name_L2 + "_raw_", mu=0, sigma=1, shape=shape ) # (1-w**2)*sigma_X+1*w**2, shape=len_Y) Y = X + phi * sigma_Y pm.Deterministic(name_L2, Y) return Y, X
# utility.py # names # still do decide def hierarchical_beta(name, name_sigma, pr_mean, pr_sigma, len_L2, model=None): model = modelcontext(model) if not model.is_hierarchical: # not hierarchical Y = pm.Beta(name, alpha=pr_mean / pr_sigma, beta=1 / pr_sigma * (1 - pr_mean)) X = None else: sigma_Y = pm.HalfCauchy(name_sigma + "_hc_L2", beta=pr_sigma) X = pm.Beta( name + "_hc_L1", alpha=pr_mean / pr_sigma, beta=1 / pr_sigma * (1 - pr_mean) ) Y = pm.Beta( name + "_hc_L2", alpha=X / sigma_Y, beta=1 / sigma_Y * (1 - X), shape=len_L2 ) return Y, X # utility.py
[docs]def tt_lognormal(x, mu, sigma): """ Calculates a lognormal pdf for integer spaced x input. """ x = at.nnet.relu(x - 1e-12) + 1e-12 # clip values at 1e-12 distr = 1 / x * at.exp(-((at.log(x) - mu) ** 2) / (2 * sigma ** 2)) # normalize, add a small offset in case the sum is zero return distr / at.sum(distr, axis=0) + 1e-8
[docs]def tt_gamma(x, mu=None, sigma=None, alpha=None, beta=None): """ Calculates a gamma distribution pdf for integer spaced x input. Parametrized similarly to :class:`~pymc3.distributions.continuous.Gamma` """ assert (alpha is None and beta is None) != (mu is None and sigma is None) if alpha is None and beta is None: alpha = mu ** 2 / (sigma ** 2 + 1e-8) beta = mu / (sigma ** 2 + 1e-8) x = at.nnet.relu(x - 1e-12) + 1e-12 # clip values at 1e-12 distr = beta ** alpha * x ** (alpha - 1) * at.exp(-beta * x) # normalize, add a small offset in case the sum is zero return distr / (at.sum(distr, axis=0) + 1e-8)