Source code for covid19_inference.data_retrieval._RKI

import pandas as pd
import datetime
import logging
import numpy as np

# Import base class
from .retrieval import Retrieval, get_data_dir, _data_dir_fallback

import urllib, json


log = logging.getLogger(__name__)


[docs]class RKI(Retrieval): """ This class can be used to retrieve and filter the dataset from the Robert Koch Institute `Robert Koch Institute <https://www.rki.de/>`_. The data gets retrieved from the `arcgis <https://www.arcgis.com/sharing/rest/content/items/f10774f1c63e40168479a1feb6c7ca74/data>`_ dashboard. Features - download the full dataset - filter by date - filter by bundesland - filter by recovered, deaths and confirmed cases Example ------- .. code-block:: rki = cov19.data_retrieval.RKI() rki.download_all_available_data() #Acess the data by rki.data #or rki.get_new("confirmed","Sachsen") rki.get_total(filter) """
[docs] def __init__(self, auto_download=False): """ On init of this class the base Retrieval Class __init__ is called, with rki specific arguments. Parameters ---------- auto_download : bool, optional Whether or not to automatically call the download_all_available_data() method. One should explicitly call this method for more configuration options (default: false) """ # ------------------------------------------------------------------------------ # # Init Retrieval Base Class # ------------------------------------------------------------------------------ # """ A name mainly used for the Local Filename """ name = "Rki" """ The url to the main dataset as csv, if none if supplied the fallback routines get used """ url_csv = "https://www.arcgis.com/sharing/rest/content/items/f10774f1c63e40168479a1feb6c7ca74/data" """ Kwargs for pandas read csv """ kwargs = {} # Surpress warning """ fallback array can be anything a filepath or callable methods """ fallbacks = [ self.__download_via_rest_api, _data_dir_fallback + "/" + name + "_fallback.csv.gz", ] """ If the local file is older than the update_interval it gets updated once the download all function is called. Can be diffent values depending on the parent class """ update_interval = datetime.timedelta(days=1) # Init the retrieval base class Retrieval.__init__(self, name, url_csv, fallbacks, update_interval, **kwargs) self.data = None if auto_download: self.download_all_available_data()
[docs] def download_all_available_data(self, force_local=False, force_download=False): """ Attempts to download from the main url (self.url_csv) which was given on initialization. If this fails download from the fallbacks. It can also be specified to use the local files or to force the download. The download methods get inhereted from the base retrieval class. Parameters ---------- force_local : bool, optional If True forces to load the local files. force_download : bool, optional If True forces the download of new files """ if force_local and force_download: raise ValueError("force_local and force_download cant both be True!!") # ------------------------------------------------------------------------------ # # 1 Download or get local file # ------------------------------------------------------------------------------ # retrieved_local = False if self._timestamp_local_old(force_local) or force_download: self._download_helper(**self.kwargs) else: retrieved_local = self._local_helper() # ------------------------------------------------------------------------------ # # 2 Save local # ------------------------------------------------------------------------------ # self._save_to_local() if not retrieved_local else None # ------------------------------------------------------------------------------ # # 3 Convert to useable format # ------------------------------------------------------------------------------ # self._to_iso()
def _to_iso(self): df = self.data if "Meldedatum" in df.columns: df["date"] = df["Meldedatum"].apply( lambda x: datetime.datetime.strptime(x, "%Y/%m/%d %H:%M:%S") ) df = df.drop(columns="Meldedatum") if "Refdatum" in df.columns: df["date_ref"] = df["Refdatum"].apply( lambda x: datetime.datetime.strptime(x, "%Y/%m/%d %H:%M:%S") ) df = df.drop(columns="Refdatum") # Rename the columns to match the JHU dataset if "AnzahlFall" in df.columns: df.rename(columns={"AnzahlFall": "confirmed"}, inplace=True) if "AnzahlTodesfall" in df.columns: df.rename(columns={"AnzahlTodesfall": "deaths"}, inplace=True) if "AnzahlGenesen" in df.columns: df.rename(columns={"AnzahlGenesen": "recovered"}, inplace=True) df["date"] = pd.to_datetime(df["date"]) df["date_ref"] = pd.to_datetime(df["date_ref"]) self.data = df def __download_via_rest_api(self, try_max=10): landkreise_max = 412 # Strangely there are 412 regions defined by the Robert Koch Insitute in contrast to the offical 294 rural districts or the 401 administrative districts. url_id = "https://services7.arcgis.com/mOBPykOjAyBO2ZKk/ArcGIS/rest/services/RKI_COVID19/FeatureServer/0/query?where=0%3D0&objectIds=&time=&resultType=none&outFields=idLandkreis&returnIdsOnly=false&returnUniqueIdsOnly=false&returnCountOnly=false&returnDistinctValues=true&cacheHint=false&orderByFields=&groupByFieldsForStatistics=&outStatistics=&having=&resultOffset=&resultRecordCount=&sqlFormat=none&f=pjson&token=" url = urllib.request.urlopen(url_id) json_data = json.loads(url.read().decode()) n_data = len(json_data["features"]) unique_ids = [ json_data["features"][i]["attributes"]["IdLandkreis"] for i in range(n_data) ] # If the number of landkreise is smaller than landkreise_max, uses local copy (query system can behave weirdly during updates) if n_data >= landkreise_max: log.info(f"Downloading {n_data} unique Landkreise. May take a while.\n") df_keys = [ "IdBundesland", "Bundesland", "Landkreis", "Altersgruppe", "Geschlecht", "AnzahlFall", "AnzahlTodesfall", "ObjectId", "IdLandkreis", "Datenstand", "NeuerFall", "NeuerTodesfall", "NeuGenesen", "AnzahlGenesen", "date", "date_ref", ] df = pd.DataFrame(columns=df_keys) # Fills DF with data from all landkreise for idlandkreis in unique_ids: url_str = ( "https://services7.arcgis.com/mOBPykOjAyBO2ZKk/ArcGIS/rest/services/RKI_COVID19/FeatureServer/0//query?where=IdLandkreis%3D" + idlandkreis + "&objectIds=&time=&resultType=none&outFields=Bundesland%2C+Landkreis%2C+IdBundesland%2C+ObjectId%2C+IdLandkreis%2C+Altersgruppe%2C+Geschlecht%2C+AnzahlFall%2C+AnzahlTodesfall%2C+Meldedatum%2C+NeuerFall%2C+Refdatum%2C+Datenstand%2C+NeuGenesen%2C+AnzahlGenesen&returnIdsOnly=false&returnUniqueIdsOnly=false&returnCountOnly=false&returnDistinctValues=false&cacheHint=false&orderByFields=&groupByFieldsForStatistics=&outStatistics=&having=&resultOffset=&resultRecordCount=&sqlFormat=none&f=pjson&token=" ) count_try = 0 while count_try < try_max: try: with urllib.request.urlopen(url_str) as url: json_data = json.loads(url.read().decode()) n_data = len(json_data["features"]) if n_data > 5000: raise ValueError("Query limit exceeded") data_flat = [ json_data["features"][i]["attributes"] for i in range(n_data) ] break except: count_try += 1 if count_try == try_max: raise ValueError("Maximum limit of tries exceeded.") df_temp = pd.DataFrame(data_flat) # Very inneficient, but it will do df = pd.concat([df, df_temp], ignore_index=True) df["date"] = df["Meldedatum"].apply( lambda x: datetime.datetime.fromtimestamp(x / 1e3) ) df["date_ref"] = df["Refdatum"].apply( lambda x: datetime.datetime.fromtimestamp(x / 1e3) ) df = df.drop(columns="Meldedatum") df = df.drop(columns="Refdatum") else: raise RuntimeError("Invalid response from REST api") return df
[docs] def get_total( self, value="confirmed", bundesland: str = None, landkreis: str = None, data_begin: datetime.datetime = None, data_end: datetime.datetime = None, date_type: str = "date", ): """ Gets all total confirmed cases for a region as dataframe with date index. Can be filtered with multiple arguments. Parameters ---------- value: str Which data to return, possible values are - "confirmed", - "recovered", - "deaths" (default: "confirmed") bundesland : str, optional if no value is provided it will use the full summed up dataset for Germany landkreis : str, optional if no value is provided it will use the full summed up dataset for the region (bundesland) data_begin : datetime.datetime, optional initial date, if no value is provided it will use the first possible date data_end : datetime.datetime, optional last date, if no value is provided it will use the most recent possible date date_type : str, optional type of date to use: reported date 'date' (Meldedatum in the original dataset), or symptom date 'date_ref' (Refdatum in the original dataset) Returns ------- :pandas.DataFrame """ # ------------------------------------------------------------------------------ # # Default parameters # ------------------------------------------------------------------------------ # if value not in ["confirmed", "recovered", "deaths"]: raise ValueError( 'Invalid value. Valid options: "confirmed", "deaths", "recovered"' ) if self.data is None: self.download_all_available_data() # Note: It should be fine to NOT check for the date since this is also done by the filter_date method # Set level for filter use bundesland if no landkreis is supplied else use landkreis level = None filter_value = None if bundesland is not None and landkreis is None: level = "Bundesland" filter_value = bundesland elif bundesland is None and landkreis is not None: level = "Landkreis" filter_value = landkreis elif bundesland is not None and landkreis is not None: raise ValueError("bundesland and landkreis cannot be simultaneously set.") # ------------------------------------------------------------------------------ # # Retrieve data and filter it # ------------------------------------------------------------------------------ # df = self.filter(data_begin, data_end, value, date_type, level, filter_value) return df
[docs] def get_new( self, value="confirmed", bundesland: str = None, landkreis: str = None, data_begin: datetime.datetime = None, data_end: datetime.datetime = None, date_type: str = "date", ): """ Retrieves all new cases from the Robert Koch Institute dataset as a DataFrame with datetime index. Can be filtered by value, bundesland and landkreis, if only a country is given all available states get summed up. Parameters ---------- value: str Which data to return, possible values are - "confirmed", - "recovered", - "deaths" (default: "confirmed") bundesland : str, optional if no value is provided it will use the full summed up dataset for Germany landkreis : str, optional if no value is provided it will use the full summed up dataset for the region (bundesland) data_begin : datetime.datetime, optional intial date for the returned data, if no value is given the first date in the dataset is used, if none is given could yield errors data_end : datetime.datetime, optional last date for the returned data, if no value is given the most recent date in the dataset is used Returns ------- : pandas.DataFrame table with daily new confirmed and the date as index """ # ------------------------------------------------------------------------------ # # Default parameters # ------------------------------------------------------------------------------ # if value not in ["confirmed", "recovered", "deaths"]: raise ValueError( 'Invalid value. Valid options: "confirmed", "deaths", "recovered"' ) if self.data is None: self.download_all_available_data() level = None filter_value = None if bundesland is not None and landkreis is None: level = "Bundesland" filter_value = bundesland elif bundesland is None and landkreis is not None: level = "Landkreis" filter_value = landkreis elif bundesland is not None and landkreis is not None: raise ValueError("bundesland and landkreis cannot be simultaneously set.") if data_begin is None: data_begin = self.data[date_type].iloc[0] if data_end is None: data_end = self.data[date_type].iloc[-1] if data_begin == self.data[date_type].iloc[0]: raise ValueError( "Date has to be after the first dataset entry. Set a data_begin date!" ) # ------------------------------------------------------------------------------ # # Retrieve data and filter it # ------------------------------------------------------------------------------ # df = self.filter( data_begin - datetime.timedelta(days=1), data_end, value, date_type, level, filter_value, ) # Get difference to the days beforehand df = ( df.diff().drop(df.index[0]).astype(int) ) # Neat oneliner to also drop the first row and set the type back to int return df.fillna(0)
[docs] def filter( self, data_begin: datetime.datetime = None, data_end: datetime.datetime = None, variable="confirmed", date_type="date", level=None, value=None, ): """ Filters the obtained dataset for a given time period and returns an array ONLY containing only the desired variable. Parameters ---------- data_begin : datetime.datetime, optional initial date, if no value is provided it will use the first possible date data_end : datetime.datetime, optional last date, if no value is provided it will use the most recent possible date variable : str, optional type of variable to return possible types are: "confirmed" : cases (default) "AnzahlTodesfall" : deaths "AnzahlGenesen" : recovered date_type : str, optional type of date to use: reported date 'date' (Meldedatum in the original dataset), or symptom date 'date_ref' (Refdatum in the original dataset) level : str, optional possible strings are: "None" : return data from all Germany (default) "Bundesland" : a state "Landkreis" : a region value : None, optional string of the state/region e.g. "Sachsen" Returns ------- : pd.DataFrame array with ONLY the requested variable, in the requested range. (one dimensional) """ # Input parsing if variable not in ["confirmed", "deaths", "recovered"]: raise ValueError( 'Invalid variable. Valid options: "confirmed", "deaths", "recovered"' ) if level not in ["Landkreis", "Bundesland", None]: raise ValueError( 'Invalid level. Valid options: "Landkreis", "Bundesland", None' ) if date_type not in ["date", "date_ref"]: raise ValueError('Invalid date_type. Valid options: "date", "date_ref"') df = self.data.sort_values(date_type) if data_begin is None: data_begin = df[date_type].iloc[0] if data_end is None: data_end = df[date_type].iloc[-1] if not isinstance(data_begin, datetime.datetime) and isinstance( data_end, datetime.datetime ): raise ValueError( "Invalid data_begin, data_end: has to be datetime.datetime object" ) # Keeps only the relevant data df = self.data if level is not None: df = df[df[level] == value][[date_type, variable]] df_series = df.groupby(date_type)[variable].sum().cumsum() df_series.index = pd.to_datetime(df_series.index) return df_series[data_begin:data_end].fillna(0)
[docs] def filter_all_bundesland( self, begin_date: datetime.datetime = None, end_date: datetime.datetime = None, variable="confirmed", date_type="date", ): """ Filters the full RKI dataset Parameters ---------- df : DataFrame RKI dataframe, from get_rki() begin_date : datetime.datetime initial date to return end_date : datetime.datetime last date to return variable : str, optional type of variable to return: cases ("AnzahlFall"), deaths ("AnzahlTodesfall"), recovered ("AnzahlGenesen") date_type : str, optional type of date to use: reported date 'date' (Meldedatum in the original dataset), or symptom date 'date_ref' (Refdatum in the original dataset) Returns ------- : pd.DataFrame DataFrame with datetime dates as index, and all German regions (bundesländer) as columns """ if variable not in ["confirmed", "deaths", "recovered"]: raise ValueError( 'Invalid variable. Valid options: "confirmed", "deaths", "recovered"' ) if date_type not in ["date", "date_ref"]: raise ValueError('Invalid date_type. Valid options: "date", "date_ref"') if begin_date is None: begin_date = self.data[date_type].iloc[0] if end_date is None: end_date = self.data[date_type].iloc[-1] if not isinstance(begin_date, datetime.datetime) and isinstance( end_date, datetime.datetime ): raise ValueError( "Invalid begin_date, end_date: has to be datetime.datetime object" ) # Nifty, if slightly unreadable one-liner df = self.data df2 = ( df.groupby([date_type, "Bundesland"])[variable] .sum() .reset_index() .pivot(index=date_type, columns="Bundesland", values=variable) .fillna(0) ) df2.index = pd.to_datetime(df2.index) # Returns cumsum of variable return df2[begin_date:end_date].cumsum()