Source code for kadlu.geospatial.data_sources.era5

"""
    API for Era5 dataset from Copernicus Climate Datastore

    Metadata regarding the dataset can be found here:
        https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-single-levels?tab=overview
"""

import os
import logging
import configparser
from os.path import isfile, dirname
from datetime import datetime, timedelta

import cdsapi
import pygrib
import numpy as np

from kadlu import index
from kadlu.geospatial.data_sources.data_util import (
    database_cfg,
    dt_2_epoch,
    logmsg,
    logmsg_nodata,
    storage_cfg,
    str_def,
)

era5_tables = [
    'significant_height_of_combined_wind_waves_and_swell',
    'mean_wave_direction',
    'mean_wave_period',
    'u_component_of_wind',
    'v_component_of_wind',
    'convective_precipitation',
    'convective_snowfall',
    'normalized_energy_flux_into_ocean',
    'normalized_energy_flux_into_waves',
    'normalized_stress_into_ocean',
    'precipitation_type',
]

logging.getLogger('cdsapi').setLevel(logging.WARNING)


[docs]def initdb(): conn, db = database_cfg() for var in era5_tables: db.execute(f'CREATE TABLE IF NOT EXISTS {var}' '( val REAL NOT NULL, ' ' lat REAL NOT NULL, ' ' lon REAL NOT NULL, ' ' time INT NOT NULL, ' ' source TEXT NOT NULL) ') db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS ' f'idx_{var} on {var}(time, lon, lat, val, source)') conn.close()
[docs]def era5_cfg(key=None, url=None): cfg = configparser.ConfigParser() # read .ini into dictionary object cfgfile = os.path.join(dirname(dirname(dirname(dirname(__file__)))), "config.ini") cfg.read(cfgfile) if 'cdsapi' not in cfg.sections(): cfg.add_section('cdsapi') if key is not None: cfg.set('cdsapi', 'key', key) with open(cfgfile, 'w') as f: cfg.write(f) else: cfg.set('cdsapi', 'key', '20822:2d1c1841-7d27-4f72-bb8a-9680a073b4c3') with open(cfgfile, 'w') as f: cfg.write(f) if url is not None: cfg.set('cdsapi', 'url', url) with open(cfgfile, 'w') as f: cfg.write(f) else: cfg.set('cdsapi', 'url', 'https://cds.climate.copernicus.eu/api/v2') with open(cfgfile, 'w') as f: cfg.write(f) return
[docs]def fetch_era5(var, *, west, east, south, north, start, end, **_): """ fetch global era5 data for specified variable and time range args: var: string the variable short name of desired wave parameter according to ERA5 docs. the complete list can be found here (table 7 for wave params): https://confluence.ecmwf.int/display/CKB/ERA5+data+documentation#ERA5datadocumentation-Temporalfrequency kwargs: dict keyword arguments passed from the Era5() class as a dictionary return: True if new data was fetched, else False """ # cleaner stack trace by raising outside of try/except err = False era5_cfg() cfg = configparser.ConfigParser() # read .ini into dictionary object cfgfile = os.path.join(dirname(dirname(dirname(dirname(__file__)))), "config.ini") cfg.read(cfgfile) try: c = cdsapi.Client(url=cfg['cdsapi']['url'], key=cfg['cdsapi']['key']) except KeyError: try: c = cdsapi.Client() except Exception: err = True if err: raise KeyError('CDS API has not been configured for the ERA5 module. ' 'obtain an API token from the following URL and run ' 'kadlu.era5_cfg(url="URL_HERE", key="TOKEN_HERE"). ' 'https://cds.climate.copernicus.eu/api-how-to') t = datetime(start.year, start.month, start.day, start.hour) assert end - start <= timedelta(days=1, hours=1), 'use index to bin fetch requests' # fetch the data fname = f'ERA5_reanalysis_{var}_{t.strftime("%Y-%m-%d")}.grb2' fpath = f'{storage_cfg()}{fname}' if not isfile(fpath): logging.info(f'fetching {fpath}...') c.retrieve( 'reanalysis-era5-single-levels', { 'product_type': 'reanalysis', 'format': 'grib', 'variable': var, 'year': t.strftime("%Y"), 'month': t.strftime("%m"), 'day': t.strftime("%d"), 'time': [ datetime(t.year, t.month, t.day, h).strftime('%H:00') for h in range(24) ] }, fpath) # load the data file and insert it into the database assert isfile(fpath) grb = pygrib.open(fpath) agg = np.array([[], [], [], [], []]) table = var[4:] if var[0:4] == '10m_' else var for msg, num in zip(grb, range(1, grb.messages)): if msg.validDate < start or msg.validDate > end: continue # read grib data z, y, x = msg.data() if np.ma.is_masked(z): z2 = z[~z.mask].data y2 = y[~z.mask] x2 = x[~z.mask] else: # wind data has no mask z2 = z.reshape(-1) y2 = y.reshape(-1) x2 = x.reshape(-1) # adjust latitude-zero to 180th meridian x3 = ((x2 + 180) % 360) - 180 # index coordinates, select query range subset, aggregate results xix = np.logical_and(x3 >= west, x3 <= east) yix = np.logical_and(y2 >= south, y2 <= north) idx = np.logical_and(xix, yix) agg = np.hstack((agg, [ z2[idx], y2[idx], x3[idx], dt_2_epoch([msg.validDate for i in z2[idx]]), ['era5' for i in z2[idx]] ])) # perform the insertion initdb() conn, db = database_cfg() n1 = db.execute(f"SELECT COUNT(*) FROM {table}").fetchall()[0][0] db.executemany( f"INSERT OR IGNORE INTO {table} " f"VALUES (?,?,?,CAST(? AS INT),?)", agg.T) n2 = db.execute(f"SELECT COUNT(*) FROM {table}").fetchall()[0][0] db.execute("COMMIT") conn.commit() conn.close() #insert_hash(kwargs, f'fetch_era5_{era5_varmap[var]}') logmsg( 'era5', var, (n1, n2), **dict(south=south, west=west, north=north, east=east, start=start, end=end)) return True
[docs]def load_era5(var, *, west, east, south, north, start, end, **_): """ load era5 data from local database args: var: variable to be fetched (string) kwargs: dictionary containing the keyword arguments used for the fetch request. must contain south, north, west, east keys as float values, and start, end keys as datetimes return: values: values of the fetched var lat: y grid coordinates lon: x grid coordinates epoch: timestamps in epoch hours since jan 1 2000 """ # check for missing data with index(storagedir=storage_cfg(), west=west, east=east, south=south, north=north, start=start, end=end) as fetchmap: fetchmap(callback=fetch_era5, var=var) # load the data table = var[4:] if var[0:4] == '10m_' else var # table cant start with int sql = ' AND '.join([ f"SELECT * FROM {table} WHERE lat >= ?", 'lat <= ?', 'lon >= ?', 'lon <= ?', 'time >= ?', 'time <= ?' ]) + ' ORDER BY time, lat, lon ASC' conn, db = database_cfg() db.execute( sql, tuple( map(str, [south, north, west, east, dt_2_epoch(start), dt_2_epoch(end)]))) rowdata = np.array(db.fetchall(), dtype=object).T conn.close() if len(rowdata) == 0: logmsg_nodata('era5', var, west=west, east=east, south=south, north=north, start=start, end=end) return np.array([[], [], [], []]) val, lat, lon, epoch, source = rowdata return np.array((val, lat, lon, epoch), dtype=float)
[docs]class Era5(): """ collection of module functions for fetching and loading """
[docs] def load_windwaveswellheight(self, **kwargs): return load_era5('significant_height_of_combined_wind_waves_and_swell', **kwargs)
[docs] def load_wavedirection(self, **kwargs): return load_era5('mean_wave_direction', **kwargs)
[docs] def load_waveperiod(self, **kwargs): return load_era5('mean_wave_period', **kwargs)
[docs] def load_precipitation(self, **kwargs): return load_era5('convective_precipitation', **kwargs)
[docs] def load_snowfall(self, **kwargs): return load_era5('convective_snowfall', **kwargs)
[docs] def load_flux_ocean(self, **kwargs): return load_era5('normalized_energy_flux_into_ocean', **kwargs)
[docs] def load_flux_waves(self, **kwargs): return load_era5('normalized_energy_flux_into_waves', **kwargs)
[docs] def load_stress_ocean(self, **kwargs): return load_era5('normalized_stress_into_ocean', **kwargs)
[docs] def load_precip_type(self, **kwargs): return load_era5('precipitation_type', **kwargs)
[docs] def load_wind_u(self, **kwargs): return load_era5('10m_u_component_of_wind', **kwargs)
[docs] def load_wind_v(self, **kwargs): return load_era5('10m_v_component_of_wind', **kwargs)
[docs] def load_wind_uv(self, **kwargs): with index(storagedir=storage_cfg(), west=kwargs['west'], east=kwargs['east'], south=kwargs['south'], north=kwargs['north'], start=kwargs['start'], end=kwargs['end']) as fetchmap: fetchmap(callback=fetch_era5, var='10m_u_component_of_wind') fetchmap(callback=fetch_era5, var='10m_v_component_of_wind') sql = ' AND '.join(['SELECT u_component_of_wind.val, u_component_of_wind.lat, u_component_of_wind.lon, u_component_of_wind.time, v_component_of_wind.val FROM u_component_of_wind '\ 'INNER JOIN v_component_of_wind '\ 'ON u_component_of_wind.lat == v_component_of_wind.lat', 'u_component_of_wind.lon == v_component_of_wind.lon', 'u_component_of_wind.time == v_component_of_wind.time '\ 'WHERE u_component_of_wind.lat >= ?', 'u_component_of_wind.lat <= ?', 'u_component_of_wind.lon >= ?', 'u_component_of_wind.lon <= ?', 'u_component_of_wind.time >= ?', 'u_component_of_wind.time <= ?']) + ' ORDER BY u_component_of_wind.time, u_component_of_wind.lat, u_component_of_wind.lon ASC' conn, db = database_cfg() db.execute( sql, tuple( map(str, [ kwargs['south'], kwargs['north'], kwargs['west'], kwargs['east'], dt_2_epoch(kwargs['start']), dt_2_epoch(kwargs['end']) ]))) wind_u, lat, lon, epoch, wind_v = np.array(db.fetchall()).T val = np.sqrt(np.square(wind_u) + np.square(wind_v)) conn.close() return np.array((val, lat, lon, epoch)).astype(float)
def __str__(self): info = '\n'.join([ "Era5 Global Dataset from Copernicus Climate Datastore.", "Combines model data with observations from across", "the world into a globally complete and consistent dataset", "\thttps://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-single-levels" ]) args = "(south, north, west, east, datetime, end)" return str_def(self, info, args)