Source code for kadlu.geospatial.data_sources.hycom

"""
    data source:
        https://www.hycom.org/data/glbv1pt08

    web interface for manual hycom data retrieval:
        https://tds.hycom.org/thredds/dodsC/GLBv0.08/expt_53.X/data/2015.html
"""

import logging
import requests
from functools import reduce
from datetime import datetime, timedelta

import numpy as np

from kadlu import index
from kadlu.geospatial.data_sources.data_util import (
    database_cfg,
    dt_2_epoch,
    fmt_coords,
    index_arr,
    logmsg,
    logmsg_nodata,
    storage_cfg,
    str_def,
)

hycom_src = "https://tds.hycom.org/thredds/dodsC/GLBv0.08/expt_53.X/data"
hycom_tables = [
    'hycom_salinity', 'hycom_water_temp', 'hycom_water_u', 'hycom_water_v'
]

slices_str = lambda var, slices, steps=(
    1, 1, 1, 1
): f"{var}{''.join(map(lambda tup, step : f'[{tup[0]}:{step}:{tup[1]}]', slices, steps))}"


[docs]def initdb(): conn, db = database_cfg() for var in hycom_tables: db.execute(f'CREATE TABLE IF NOT EXISTS {var}' '( val REAL NOT NULL,' ' lat REAL NOT NULL,' ' lon REAL NOT NULL,' ' time INT NOT NULL,' ' depth INT NOT NULL,' ' source TEXT NOT NULL )') db.execute(f'CREATE UNIQUE INDEX IF NOT EXISTS ' f'idx_{var} on {var}(time, lon, lat, depth, val, source)') conn.close()
[docs]def fetch_grid(**_): """ download lat/lon/time/depth arrays for grid indexing. times are formatted as epoch hours since 2000-01-01 00:00 """ import time logging.info("HYCOM fetching coordinate index...") url = f"{hycom_src}/2015.ascii?lat%5B0:1:3250%5D,lon%5B0:1:4499%5D" grid_netcdf = requests.get(url, timeout=450) assert (grid_netcdf.status_code == 200) meta, data = grid_netcdf.text.split\ ("---------------------------------------------\n") lat_csv, lon_csv = data.split("\n\n")[:-1] lat = np.array(lat_csv.split("\n")[1].split(", "), dtype=float) lon = np.array(lon_csv.split("\n")[1].split(", "), dtype=float) epoch = {} for year in map(str, range(1994, 2016)): logging.info(f"HYCOM fetching time index for {year}...") url = f"{hycom_src}/{year}.ascii?time" time_netcdf = requests.get(url, timeout=450) assert (time_netcdf.status_code == 200) meta, data = time_netcdf.text.split\ ("---------------------------------------------\n") csv = data.split("\n\n")[:-1][0] epoch[year] = np.array(csv.split("\n")[1].split(', ')[1:], dtype=float) time.sleep(1) logging.info(f"HYCOM fetching depth index...") depth = np.array([ 0.0, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 125.0, 150.0, 200.0, 250.0, 300.0, 350.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 1250.0, 1500.0, 2000.0, 2500.0, 3000.0, 4000.0, 5000.0 ]) return lat, lon, epoch, depth
[docs]def load_grid(): """ put spatial grid into memory """ with index(storagedir=storage_cfg(), bins=False, store=True) as fetchmap: return fetchmap(callback=fetch_grid, seed='hycom grids')[0]
[docs]class Hycom(): """ collection of module functions for fetching and loading attributes: lat, lon: arrays spatial grid arrays. used to convert grid index to coordinate point epoch: dict dictionary containing yearly time indexes. times formatted as epoch hours since 2000-01-01 00:00 depth: array depth scale. used to convert depth index to metres """ def __init__(self): self.ygrid, self.xgrid, self.epoch, self.depth = None, None, None, None
[docs] def load_salinity(self, **kwargs): return self.load_hycom('salinity', kwargs)
[docs] def load_temp(self, **kwargs): return self.load_hycom('water_temp', kwargs)
[docs] def load_water_u(self, **kwargs): return self.load_hycom('water_u', kwargs)
[docs] def load_water_v(self, **kwargs): return self.load_hycom('water_v', kwargs)
[docs] def callback(self, var, **kwargs): """ build indices for query, fetch from hycom, insert into database args: var: string variable to be fetched. complete list of variables here https://tds.hycom.org/thredds/dodsC/GLBv0.08/expt_53.X/data/2015.html kwargs: dict boundaries as keyword arguments """ # build request indexes year = str(kwargs['start'].year) haystack = np.array( [self.epoch[year], self.depth, self.ygrid, self.xgrid], dtype=object) needles1 = np.array([ dt_2_epoch(kwargs['start']), kwargs['top'], kwargs['south'], kwargs['west'] ]) needles2 = np.array([ dt_2_epoch(kwargs['end']), kwargs['bottom'], kwargs['north'], kwargs['east'] ]) slices = list( zip(map(index_arr, needles1, haystack), map(index_arr, needles2, haystack))) assert reduce( np.multiply, map(lambda s: s[1] - s[0] + 1, slices) ) > 0, "0 records available within query boundaries: {kwargs}" # generate request url = f"{hycom_src}/{year}.ascii?{slices_str(var, slices)}" with requests.get(url, stream=True, timeout=450) as payload_netcdf: assert payload_netcdf.status_code == 200, "couldn't access hycom" meta, data = payload_netcdf.text.split( "---------------------------------------------\n") # parse response into numpy array arrs = data.split("\n\n")[:-1] shape_str, payload = arrs[0].split("\n", 1) shape = tuple( [int(x) for x in shape_str.split("[", 1)[1][:-1].split("][")]) cube = np.ndarray(shape, dtype=float) for arr in payload.split("\n"): ix_str, row_csv = arr.split(", ", 1) a, b, c = [int(x) for x in ix_str[1:-1].split("][")] cube[a][b][c] = np.array(row_csv.split(", "), dtype=int) # build coordinate grid, populate values, adjust scaling, remove nulls flatten = reduce(np.multiply, map(lambda s: s[1] - s[0] + 1, slices)) add_offset = 20 if 'salinity' in var or 'water_temp' in var else 0 null_value = -10 if 'salinity' in var or 'water_temp' in var else -30 grid = np.array([ (None, y, x, t, d, 'hycom') for t in self.epoch[year][slices[0][0]:slices[0][1] + 1] for d in self.depth[slices[1][0]:slices[1][1] + 1] for y in self.ygrid[slices[2][0]:slices[2][1] + 1] for x in self.xgrid[slices[3][0]:slices[3][1] + 1] ]) grid[:, 0] = np.reshape(cube, flatten) * 0.001 + add_offset grid = grid[grid[:, 0] != null_value] # insert into db initdb() conn, db = database_cfg() n1 = db.execute(f"SELECT COUNT(*) FROM hycom_{var}").fetchall()[0][0] db.executemany( f"INSERT OR IGNORE INTO hycom_{var} VALUES " "(?, ?, ?, CAST(? AS INT), CAST(? AS INT), ?)", grid) n2 = db.execute(f"SELECT COUNT(*) FROM hycom_{var}").fetchall()[0][0] db.execute("COMMIT") conn.commit() conn.close() logmsg('hycom', var, (n1, n2), **kwargs) return
[docs] def fetch_hycom(self, var, kwargs): """ convert user query to grid index slices, handle edge cases """ assert kwargs['start'] <= kwargs['end'] assert kwargs['start'] > datetime( 1994, 1, 1), 'data not available in this range' assert kwargs['end'] < datetime(2016, 1, 1), 'data not available in this range' assert kwargs['south'] <= kwargs['north'] assert kwargs['top'] <= kwargs['bottom'] assert kwargs['start'] >= datetime(1994, 1, 1) assert kwargs['end'] < datetime(2016, 1, 1) logging.info(f'HYCOM fetching data: {var} {kwargs}') if self.ygrid is None: self.ygrid, self.xgrid, self.epoch, self.depth = load_grid() # if query spans antimeridian, make two seperate fetch requests if kwargs['west'] > kwargs['east']: logging.debug('splitting request') argsets = [kwargs.copy(), kwargs.copy()] argsets[0]['east'] = self.xgrid[-1] argsets[1]['west'] = self.xgrid[0] else: argsets = [kwargs] for argset in argsets: args = { k: v for k, v in argset.items() if k in [ 'west', 'east', 'south', 'north', 'top', 'bottom', 'start', 'end' ] } with index(storagedir=storage_cfg(), bins=True, dx=1, dy=1, dz=5000, dt=timedelta(hours=24), **args) as fetchmap: fetchmap(callback=self.callback, var=var) return True
[docs] def load_hycom(self, var, kwargs): """ load hycom data from local database args: var: variable to be fetched. complete list of variables here https://tds.hycom.org/thredds/dodsC/GLBv0.08/expt_53.X/data/2015.html south, north: float ymin, ymax coordinate values. range: -90, 90 kwargs: dict (boundaries as keyword arguments) west, east: float xmin, xmax coordinate values. range: -180, 180 start, end: datetime temporal boundaries in datetime format return: values: array values of the fetched variable lat: array y grid coordinates lon: array x grid coordinates epoch: array timestamps in epoch hours since jan 1 2000 depth: array measured in meters """ # check if grids are initialized if self.ygrid is None: self.ygrid, self.xgrid, self.epoch, self.depth = load_grid() # recursive function call for queries spanning antimeridian if (kwargs['west'] > kwargs['east']): kwargs1 = kwargs.copy() kwargs2 = kwargs.copy() kwargs1['west'] = self.xgrid[0] kwargs2['east'] = self.xgrid[-1] return np.hstack( (self.load_hycom(var, kwargs1), self.load_hycom(var, kwargs2))) # check for missing data self.fetch_hycom(var, kwargs) # validate and execute query assert 8 == sum( map(lambda kw: kw in kwargs.keys(), [ 'south', 'north', 'west', 'east', 'start', 'end', 'top', 'bottom' ])), 'malformed query' assert kwargs['start'] <= kwargs['end'] sql = ' AND '.join([ f"SELECT * FROM hycom_{var} WHERE lat >= ?", 'lat <= ?', 'lon >= ?', 'lon <= ?', 'time >= ?', 'time <= ?', 'depth >= ?', 'depth <= ?', "source == 'hycom' " ]) + 'ORDER BY time, depth, lat, lon ASC' conn, db = database_cfg() db.execute( sql, tuple( map(str, [ kwargs['south'], kwargs['north'], kwargs['west'], kwargs['east'], dt_2_epoch(kwargs['start']), dt_2_epoch(kwargs['end']), kwargs['top'], kwargs['bottom'] ]))) rowdata = np.array(db.fetchall(), dtype=object).T conn.close() if len(rowdata) == 0: logmsg_nodata('hycom', var, **kwargs) return np.array([[], [], [], [], []]) return rowdata[0:5].astype(float)
[docs] def load_water_uv(self, **kwargs): self.fetch_hycom('water_u', kwargs) self.fetch_hycom('water_v', kwargs) sql = ' AND '.join(['SELECT hycom_water_u.val, hycom_water_u.lat, hycom_water_u.lon, hycom_water_u.time, hycom_water_u.depth, hycom_water_v.val FROM hycom_water_u '\ 'INNER JOIN hycom_water_v '\ 'ON hycom_water_u.lat == hycom_water_v.lat', 'hycom_water_u.lon == hycom_water_v.lon', 'hycom_water_u.time == hycom_water_v.time '\ 'WHERE hycom_water_u.lat >= ?', 'hycom_water_u.lat <= ?', 'hycom_water_u.lon >= ?', 'hycom_water_u.lon <= ?', 'hycom_water_u.time >= ?', 'hycom_water_u.time <= ?', 'hycom_water_u.depth >= ?', 'hycom_water_u.depth <= ?', ]) + ' ORDER BY hycom_water_u.time, hycom_water_u.lat, hycom_water_u.lon ASC' conn, db = database_cfg() db.execute( sql, tuple( map(str, [ kwargs['south'], kwargs['north'], kwargs['west'], kwargs['east'], dt_2_epoch(kwargs['start']), dt_2_epoch(kwargs['end']), kwargs['top'], kwargs['bottom'], ]))) qry = np.array(db.fetchall()).T conn.close() if len(qry) == 0: logging.warning('HYCOM water_uv: no data found in region ' f'{fmt_coords(kwargs)}, returning empty arrays') return np.array([[], [], [], [], []]) water_u, lat, lon, epoch, depth, water_v = qry val = np.sqrt(np.square(water_u) + np.square(water_v)) return np.array((val, lat, lon, epoch, depth)).astype(float)
def __str__(self): info = '\n'.join([ "Native hycom .[ab] data converted to NetCDF at the Naval", "Research Laboratory, interpolated to 0.08° grid between", "40°S-40°N (0.04° poleward) containing 40 z-levels.", "Availability: 1994 to 2015", "\thttps://www.hycom.org/data/glbv0pt08" ]) args = ("(south, north, west, east, " "start, end, top, bottom)") return str_def(self, info, args)