Source code for kadlu.geospatial.data_sources.wwiii

"""
    Kadlu API for the NOAA WaveWatch III Datastore

    User guides:
        https://github.com/NOAA-EMC/WW3/wiki/WAVEWATCH-III-User-Guide

    Data model description (boundary definitions, map visualizations, etc)
        https://polar.ncep.noaa.gov/waves/implementations.php
"""

import shutil
import logging
import requests
from datetime import datetime, timedelta
from os.path import isfile
from hashlib import md5

import numpy as np
import pygrib

from kadlu import index
from kadlu.geospatial.data_sources.data_util import (
    Boundary,
    database_cfg,
    dt_2_epoch,
    logmsg,
    logmsg_nodata,
    storage_cfg,
    str_def,
)

wwiii_tables = ['hs', 'dp', 'tp', 'windU', 'windV']

wwiii_src = "https://data.nodc.noaa.gov/ncep/nww3/"

# region boundaries as defined in WWIII docs:
#    https://polar.ncep.noaa.gov/waves/implementations.php
wwiii_global = Boundary(-90, 90, -180, 180, 'glo_30m')  # global
wwiii_regions = [
    Boundary(15, 47, -99, -60, 'at_4m'),  # atlantic
    Boundary(15, 50, -165, -116, 'wc_4m'),  # US west
    Boundary(48, 74, 140, 180, 'ak_4m'),  # alaska (west)
    Boundary(48, 74, -180, -120, 'ak_4m'),  # alaska (east)
    Boundary(65, 84, -180, 180, 'ao_30m'),  # arctic ocean
    Boundary(-20, 30, 130, 180, 'ep_10m'),  # pacific (west)
    Boundary(-20, 30, -180, -145, 'ep_10m')
]  # pacific (east)


[docs]def initdb(): conn, db = database_cfg() for var in wwiii_tables: db.execute(f'CREATE TABLE IF NOT EXISTS {var}' '( val REAL NOT NULL, ' ' lat REAL NOT NULL, ' ' lon REAL NOT NULL, ' ' time INT NOT NULL, ' ' source TEXT NOT NULL) ') db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS ' f'idx_{var} on {var}(time, lon, lat, val, source)') conn.close()
[docs]def insert(table, agg): """ insert parsed data into local database """ conn, db = database_cfg() db.executemany( f"INSERT OR IGNORE INTO {table} VALUES (?,?,?,CAST(? AS INT),?)", agg.T) conn.commit() conn.close()
[docs]def fetch_wwiii(var, **kwargs): """ download wwiii data and return associated filepaths args: var: string the variable name of desired parameter according to WWIII docs the complete list of variables can be found at the following URL under 'model output' https://polar.ncep.noaa.gov/waves/implementations.php south, north: float ymin, ymax coordinate boundaries (latitude). range: -90, 90 west, east: float xmin, xmax coordinate boundaries (longitude). range: -180, 180 start: datetime the start of the desired time range end: datetime the end of the desired time range return: True if new data was fetched, else False """ assert 6 == sum( map(lambda kw: kw in kwargs.keys(), ['south', 'north', 'west', 'east', 'start', 'end' ])), 'malformed query' assert var in ('hs', 'tp', 'dp', 'wind'), 'invalid fetch var' assert kwargs['end'].month <= kwargs['start'].month+1 and kwargs['start'].year <= kwargs['end'].year+1, \ f'fetch function can only load one month at a time. got start: {kwargs["start"]} end: {kwargs["end"]}' #print("WWIII NOTICE: resolution selection not implemented yet. defaulting to 0.5°") regions = ['glo_30m'] assert regions == ['glo_30m'], 'invalid region string' reg = regions[0] t = datetime(kwargs['start'].year, kwargs['start'].month, 1) fname = f"multi_1.{reg}.{var}.{t.strftime('%Y%m')}.grb2" fetchfile = f"{storage_cfg()}{fname}" # if file hasnt been downloaded, fetch it if not isfile(fetchfile): # and kwargs['start'].day == 1: logging.info(f'WWIII {kwargs["start"].date().isoformat()} {var}: ' f'downloading {fname} from NOAA WaveWatch III...') if reg == 'glo_30m' and not 'wind' in var and t.year >= 2018: fetchurl = f"{wwiii_src}{t.strftime('%Y/%m')}/gribs/{fname}" else: fetchurl = f"{wwiii_src}{t.strftime('%Y/%m')}/{reg}/{fname}" with requests.get(fetchurl, stream=True, timeout=300) as payload: assert payload.status_code == 200, f'couldn\'t retrieve file at {fetchurl}' with open(fetchfile, 'wb') as f: shutil.copyfileobj(payload.raw, f) # open the file, parse data, insert values grib = pygrib.open(fetchfile) assert grib.messages > 0, f'problem opening {fetchfile}' null = 0 agg = np.array([[], [], [], [], []]) grbvar = grib[1]['name'] table = f'{var}{grbvar[0]}' if var == 'wind' else var initdb() conn, db = database_cfg() n1 = db.execute(f"SELECT COUNT(*) FROM {table}").fetchall()[0][0] for msg, num in zip(grib, range(1, grib.messages)): if (msg['name'] != grbvar and md5(agg.tobytes()).hexdigest() != md5( np.array([[], [], [], [], []]).tobytes()).hexdigest()): insert(table, agg) table = f'{var}{msg["name"][0]}' if var == 'wind' else var agg = np.array([[], [], [], [], []]) grbvar = msg['name'] null = 0 if msg.validDate < kwargs['start']: continue if msg.validDate >= kwargs['end']: continue z, y, x = msg.data() src = np.array(['wwiii' for each in z[~z.mask].data]) grid = np.vstack( (z[~z.mask].data, y[~z.mask], ((x[~z.mask] + 180) % 360) - 180, dt_2_epoch([msg.validDate for each in z[~z.mask].data]), src)).astype(object) agg = np.hstack((agg, grid)) null += sum(sum(z.mask)) insert(table, agg) n2 = db.execute(f"SELECT COUNT(*) FROM {table}").fetchall()[0][0] conn.close() logmsg('wwiii', var, (n1, n2), **kwargs) return True
[docs]def load_wwiii(var, kwargs): """ return downloaded wwiii data for specified wavevar according to given time, lat, lon boundaries args: var: string the variable short name of desired wave parameter according to WWIII docs the complete list of variable short names can be found here (under 'model output') https://polar.ncep.noaa.gov/waves/implementations.php south, north: float ymin, ymax coordinate boundaries (latitude). range: -90, 90 west, east: float xmin, xmax coordinate boundaries (longitude). range: -180, 180 start: datetime the start of the desired time range end: datetime the end of the desired time range return: val, lat, lon, epoch as np arrays of floats """ if 'time' in kwargs.keys() and 'start' not in kwargs.keys(): kwargs['start'] = kwargs['time'] del kwargs['time'] if 'end' not in kwargs.keys(): kwargs['end'] = kwargs['start'] + timedelta(hours=3) assert 6 == sum( map(lambda kw: kw in kwargs.keys(), ['south', 'north', 'west', 'east', 'start', 'end' ])), 'malformed query' assert var in ('hs', 'tp', 'dp', 'windU', 'windV'), 'invalid load var' with index(dx=180, dy=90, dz=5000, dt=timedelta(days=1), storagedir=storage_cfg(), **{ k: v for k, v in kwargs.items() if k in ['south', 'north', 'west', 'east', 'start', 'end'] }) as fetchmap: fetchvar = var if var not in ('windU', 'windV') else 'wind' fetchmap(callback=fetch_wwiii, var=fetchvar) conn, db = database_cfg() db.execute( ' AND '.join([ f'SELECT * FROM {var} WHERE lat >= ?', 'lat <= ?', 'lon >= ?', 'lon <= ?', 'time >= ?', 'time <= ? ' ]) + ' ORDER BY time, lat, lon ASC', tuple( map(str, [ kwargs['south'], kwargs['north'], kwargs['west'], kwargs['east'], dt_2_epoch(kwargs['start']), dt_2_epoch(kwargs['end']) ]))) slices = np.array(db.fetchall(), dtype=object).T if len(slices) == 0: logmsg_nodata('wwiii', 'wind', **kwargs) return np.array([[], [], [], []]) conn.close() val, lat, lon, epoch, source = slices return np.array((val, lat, lon, epoch), dtype=float)
[docs]class Wwiii(): """ collection of module functions for fetching and loading """
[docs] def load_wavedirection(self, **kwargs): return load_wwiii('dp', kwargs)
[docs] def load_waveperiod(self, **kwargs): return load_wwiii('tp', kwargs)
[docs] def load_windwaveheight(self, **kwargs): return load_wwiii('hs', kwargs)
[docs] def load_wind_u(self, **kwargs): return load_wwiii('windU', kwargs)
[docs] def load_wind_v(self, **kwargs): return load_wwiii('windV', kwargs)
[docs] def load_wind_uv(self, **kwargs): with index(dx=180, dy=90, dz=5000, dt=timedelta(days=1), storagedir=storage_cfg(), **kwargs) as fetchmap: fetchmap(callback=fetch_wwiii, var='wind') sql = ' AND '.join(['SELECT windU.val, windU.lat, windU.lon, windU.time, windV.val FROM windU '\ 'INNER JOIN windV '\ 'ON windU.lat == windV.lat', 'windU.lon == windV.lon', 'windU.time == windV.time '\ 'WHERE windU.lat >= ?', 'windU.lat <= ?', 'windU.lon >= ?', 'windU.lon <= ?', 'windU.time >= ?', 'windU.time <= ?']) + ' ORDER BY windU.time, windU.lat, windU.lon ASC' conn, db = database_cfg() db.execute( sql, tuple( map(str, [ kwargs['south'], kwargs['north'], kwargs['west'], kwargs['east'], dt_2_epoch(kwargs['start']), dt_2_epoch(kwargs['end']) ]))) qry = np.array(db.fetchall()).T conn.close() if len(qry) == 0: logmsg_nodata('wwiii', 'wind', **kwargs) return np.array([[], [], [], [], []]) wind_u, lat, lon, epoch, wind_v = qry val = np.sqrt(np.square(wind_u) + np.square(wind_v)) return np.array((val, lat, lon, epoch)).astype(float)
def __str__(self): info = '\n'.join([ "WAVEWATCH III: a third generation wave height,", "water depth, and current hindcasting modeling framework.", "Developed for the community at NOAA/NCEP. About WWIII:", "\thttps://github.com/NOAA-EMC/WW3/wiki/About-WW3", "Model descriptions:", "\thttps://polar.ncep.noaa.gov/waves/implementations.php" ]) args = "(south, north, west, east, start, end)" return str_def(self, info, args)