Source code for kadlu.geospatial.data_sources.data_util

    additional tools and utils used across data loading modules

import configparser
import json
import logging
import os
import sqlite3
import sys
import warnings
from contextlib import contextmanager, redirect_stdout, redirect_stderr
from datetime import datetime, timedelta
from functools import reduce, partial
from io import StringIO
from os.path import dirname

import numpy as np

ext = lambda filepath, extensions: isinstance(extensions, tuple) and any(
    x.lower() == filepath.lower()[-len(x):] for x in extensions)

[docs]def database_cfg(): """ configure and connect to sqlite database time is stored as an integer in the database, where each value is epoch hours since 2000-01-01 00:00 returns: conn: database connection object db: connection cursor object """ conn = sqlite3.connect(storage_cfg() + 'geospatial.db') db = conn.cursor() return conn, db
[docs]class storage_cfg(os.PathLike): """ return filepath containing storage configuration string first checks the config.ini file in kadlu root folder, then defaults to kadlu/storage """ """ __file__ = '/home/matt_s/kadlu/kadlu/geospatial/data_sources/' """ cfg = configparser.ConfigParser() # read .ini into dictionary object def __init__(self, setdir=None): os.path.join(dirname(dirname(dirname(dirname(__file__)))), "config.ini")) if setdir: self.__call__(setdir)
[docs] def default_storage(self, msg): """ helper function for storage_cfg() """ storage_location = os.path.join(os.path.expanduser('~'), f'kadlu_data{os.path.sep}') if not os.path.isdir(storage_location): os.mkdir(storage_location) warnings.warn( f'''{msg} storage location will be set to {storage_location}.\n''' '''use kadlu.storage_cfg('/path/to/new/directory') ''' '''to configure a different storage location''') return storage_location
def __call__(self, setdir=None) -> str: if 'storage' not in self.cfg.sections(): self.cfg.add_section('storage') if setdir is not None: assert os.path.isdir(setdir), 'directory does not exist' self.cfg.set('storage', 'storage_location', os.path.abspath(setdir)) with open( os.path.join(dirname(dirname(dirname(dirname(__file__)))), "config.ini"), 'w') as f: self.cfg.write(f) try: storage_location = self.cfg['storage']['storage_location'] except KeyError: # missing config.ini file return self.default_storage('storage location not configured.') if storage_location == '': # null value in config.ini return self.default_storage('null value in config.') if not os.path.isdir(storage_location): # verify the location exists return self.default_storage('storage location does not exist.') if storage_location[-1] != os.path.sep: return storage_location + os.path.sep return str(os.path.abspath(storage_location)) def __repr__(self) -> str: return self.__str__() def __str__(self) -> str: return self.__call__() def __dir__(self) -> str: return self.__call__() def __fspath__(self) -> str: return self.__call__() def __add__(self, other) -> str: return ''.join([self.__str__(), other.__str__()])
[docs]def verbosity(set_verbosity=None): ''' __file__ = '/home/matt/kadlu/kadlu/geospatial/data_sources/' ''' cfg = configparser.ConfigParser() os.path.join(dirname(dirname(dirname(dirname(__file__)))), "config.ini")) if 'other' not in cfg.sections(): cfg.add_section('other') cfg.set('other', 'verbosity', str(False)) with open( os.path.join(dirname(dirname(dirname(dirname(__file__)))), "config.ini"), 'w') as f: cfg.write(f) if set_verbosity is not None: assert set_verbosity in (True, False), 'verbosity must be True or False' cfg.set('other', 'verbosity', str(set_verbosity)) with open( os.path.join(dirname(dirname(dirname(dirname(__file__)))), "config.ini"), 'w') as f: cfg.write(f) return cfg['other']['verbosity'] == 'True'
[docs]def dt_2_epoch(dt_arr, t0=datetime(2000, 1, 1, 0, 0, 0)): """ convert datetimes to epoch hours """ delta = lambda dt: (dt - t0).total_seconds() / 60 / 60 if isinstance(dt_arr, (list, np.ndarray)): return list(map(int, map(delta, dt_arr))) elif isinstance(dt_arr, (datetime)): return int(delta(dt_arr)) else: raise ValueError('input must be datetime or array of datetimes')
[docs]def epoch_2_dt(ep_arr, t0=datetime(2000, 1, 1, 0, 0, 0), unit='hours'): """ convert epoch hours to datetimes """ delta = lambda ep, unit: t0 + timedelta(**{f'{unit}': ep}) if isinstance(ep_arr, (list, np.ndarray)): return list(map(partial(delta, unit=unit), ep_arr)) elif isinstance(ep_arr, (float, int)): return delta(ep_arr, unit=unit) else: raise ValueError('input must be integer or array of integers')
[docs]def index_arr(val, sorted_arr): """ converts value in coordinate array to grid index """ if val > sorted_arr[-1]: return len(sorted_arr) - 1 return np.nonzero(sorted_arr >= val)[0][0]
[docs]def flatten(cols, frames): """ dimensional reduction by taking average of time frames """ # assert that frames are of equal size assert len(cols) == 5 assert reduce(lambda a, b: (a == b) * a, frames[1:] - frames[:-1]) # aggregate time frame splits and reduce them to average frames_ix = np.append([0], frames) split_num = range(len(frames_ix) - 1) val_split = np.array( [cols[0][frames_ix[f]:frames_ix[f + 1]] for f in split_num]) value_avg = (reduce(np.add, val_split) / len(val_split)) _, y, x, _, z = cols[:, frames_ix[0]:frames_ix[1]] return value_avg, y, x, z
[docs]def reshape_2D(cols): return dict(values=cols[0], lats=cols[1], lons=cols[2])
[docs]def reshape_3D(cols): """ prepare loaded data for interpolation args: cols: flattened numpy array of shape (4, n) cols[0]: values cols[1]: latitude cols[2]: longitude cols[3]: depth return: gridded dict(values=gridspace, lats=ygrid, lons=xgrid, depths=zgrid) """ if isinstance(cols[0], (float, int)): return dict(values=cols[0]) # if data is 4D, take average of values at time frame intervals frames = np.append( np.nonzero(cols[3][1:] > cols[3][:-1])[0] + 1, len(cols[3])) if len(np.unique(frames)) > 1: vals, y, x, z = flatten(cols, frames) else: vals, y, x, _, z = cols rows = np.array((vals, y, x, z)).T # reshape row data to 3D array xgrid, ygrid, zgrid = np.unique(x), np.unique(y), np.unique(z) gridspace = np.full((len(ygrid), len(xgrid), len(zgrid)), fill_value=None, dtype=float) # this could potentially be optimized to avoid an index lookup cost for row in rows: x_ix = index_arr(row[2], xgrid) y_ix = index_arr(row[1], ygrid) z_ix = index_arr(row[3], zgrid) gridspace[y_ix, x_ix, z_ix] = row[0] # remove nulls for interpolation: # fill missing depth values with last value in each column for xi in range(0, gridspace.shape[0]): for yi in range(0, gridspace.shape[1]): col = gridspace[xi, yi] if sum(np.isnan(col)) > 0 and sum(np.isnan(col)) < len(col): col[np.isnan(col)] = col[~np.isnan(col)][-1] gridspace[xi, yi] = col # null depth columns are filled with the average value at each depth plane for zi in range(0, gridspace.shape[2]): gridspace[:, :, zi][np.isnan(gridspace[:, :, zi])] = np.average( gridspace[:, :, zi][~np.isnan(gridspace[:, :, zi])]) return dict(values=gridspace, lats=ygrid, lons=xgrid, depths=zgrid)
[docs]def str_def(self, info, args): """ builds string definition for data source class objects """ fcns = [ fcn for fcn in dir(self) if callable(getattr(self, fcn)) and 'load' in fcn and not fcn.startswith("__") ] return ( f'{info}\n\nfunction input arguments:\n\t{args}\n\nclass functions:' '\n\t' + '\n\t'.join(fcns) + '\n')
[docs]def ll_2_regionstr(south, north, west, east, regions, default=[]): """ convert input bounds to region strings with seperating axis theorem """ if west > east: # recursive function call if query intersects antimeridian return np.union1d( ll_2_regionstr(south, north, west, +180, regions, default), ll_2_regionstr(south, north, -180, east, regions, default)) query = Boundary(south, north, west, east) matching = [str(reg) for reg in regions if query.intersects(reg)] if len(matching) == 0: warnings.warn("No regions matched for query. " f"Defaulting to {default} ({len(default)} regions)") return default return np.unique(matching)
[docs]def fmt_coords(kwargs): return ( f"{abs(kwargs['south']):.2f}°{'S' if kwargs['south'] <= 0 else 'N'}," f"{abs(kwargs['north']):.2f}°{'S' if kwargs['north'] <= 0 else 'N'}," f"{abs(kwargs['west']):.2f}°{'W' if kwargs['west'] <= 0 else 'E'}" f"{abs(kwargs['east']):.2f}°{'W' if kwargs['east'] <= 0 else 'E'}" f"{','+str(kwargs['bottom'])+'m' if 'bottom' in kwargs.keys() else ''}" f"{','+str(kwargs['top'])+'m' if 'top' in kwargs.keys() else ''} : ")
[docs]def logmsg(source, var, ntup=(), **kwargs):'{source.upper()} {var.lower()} ' f'logged {reduce(np.subtract, ntup[::-1])} values in region\t' f'{json.dumps(kwargs, default=str)}')
[docs]def logmsg_nodata(source, var, **kwargs): arguments = ("north", "south", "west", "east", "top", "bottom", "start", "end") region_string = {k: v for k, v in kwargs.items() if k in arguments} logging.warning( f'{source.upper()} {var.lower()}: no data found, ' 'returning empty arrays for region\t' f'{json.dumps(region_string , default=str, sort_keys=True)}')
[docs]class Boundary(): """ compute intersecting boundaries with separating axis theorem """ def __init__(self, south, north, west, east, fetchvar='', **_): self.south = south self.north = north self.west = west self.east = east self.fetchvar = fetchvar def __str__(self): return self.fetchvar
[docs] def intersects(self, other): return not (self.east < other.west or self.west > other.east or self.north < other.south or self.south > other.north)