"""
additional tools and utils used across data loading modules
"""
import configparser
import json
import logging
import os
import sqlite3
import sys
import warnings
from contextlib import contextmanager, redirect_stdout, redirect_stderr
from datetime import datetime, timedelta
from functools import reduce, partial
from io import StringIO
from os.path import dirname
import numpy as np
ext = lambda filepath, extensions: isinstance(extensions, tuple) and any(
x.lower() == filepath.lower()[-len(x):] for x in extensions)
[docs]def database_cfg():
""" configure and connect to sqlite database
time is stored as an integer in the database, where each value
is epoch hours since 2000-01-01 00:00
returns:
conn:
database connection object
db:
connection cursor object
"""
conn = sqlite3.connect(storage_cfg() + 'geospatial.db')
db = conn.cursor()
return conn, db
[docs]class storage_cfg(os.PathLike):
""" return filepath containing storage configuration string
first checks the config.ini file in kadlu root folder, then
defaults to kadlu/storage
"""
"""
__file__ = '/home/matt_s/kadlu/kadlu/geospatial/data_sources/data_util.py'
"""
cfg = configparser.ConfigParser() # read .ini into dictionary object
def __init__(self, setdir=None):
self.cfg.read(
os.path.join(dirname(dirname(dirname(dirname(__file__)))),
"config.ini"))
if setdir:
self.__call__(setdir)
[docs] def default_storage(self, msg):
""" helper function for storage_cfg() """
storage_location = os.path.join(os.path.expanduser('~'),
f'kadlu_data{os.path.sep}')
if not os.path.isdir(storage_location):
os.mkdir(storage_location)
warnings.warn(
f'''{msg} storage location will be set to {storage_location}.\n'''
'''use kadlu.storage_cfg('/path/to/new/directory') '''
'''to configure a different storage location''')
return storage_location
def __call__(self, setdir=None) -> str:
if 'storage' not in self.cfg.sections():
self.cfg.add_section('storage')
if setdir is not None:
assert os.path.isdir(setdir), 'directory does not exist'
self.cfg.set('storage', 'storage_location',
os.path.abspath(setdir))
with open(
os.path.join(dirname(dirname(dirname(dirname(__file__)))),
"config.ini"), 'w') as f:
self.cfg.write(f)
try:
storage_location = self.cfg['storage']['storage_location']
except KeyError: # missing config.ini file
return self.default_storage('storage location not configured.')
if storage_location == '': # null value in config.ini
return self.default_storage('null value in config.')
if not os.path.isdir(storage_location): # verify the location exists
return self.default_storage('storage location does not exist.')
if storage_location[-1] != os.path.sep:
return storage_location + os.path.sep
return str(os.path.abspath(storage_location))
def __repr__(self) -> str:
return self.__str__()
def __str__(self) -> str:
return self.__call__()
def __dir__(self) -> str:
return self.__call__()
def __fspath__(self) -> str:
return self.__call__()
def __add__(self, other) -> str:
return ''.join([self.__str__(), other.__str__()])
[docs]def verbosity(set_verbosity=None):
'''
__file__ = '/home/matt/kadlu/kadlu/geospatial/data_sources/data_util.py'
'''
cfg = configparser.ConfigParser()
cfg.read(
os.path.join(dirname(dirname(dirname(dirname(__file__)))),
"config.ini"))
if 'other' not in cfg.sections():
cfg.add_section('other')
cfg.set('other', 'verbosity', str(False))
with open(
os.path.join(dirname(dirname(dirname(dirname(__file__)))),
"config.ini"), 'w') as f:
cfg.write(f)
if set_verbosity is not None:
assert set_verbosity in (True,
False), 'verbosity must be True or False'
cfg.set('other', 'verbosity', str(set_verbosity))
with open(
os.path.join(dirname(dirname(dirname(dirname(__file__)))),
"config.ini"), 'w') as f:
cfg.write(f)
return cfg['other']['verbosity'] == 'True'
[docs]def dt_2_epoch(dt_arr, t0=datetime(2000, 1, 1, 0, 0, 0)):
""" convert datetimes to epoch hours """
delta = lambda dt: (dt - t0).total_seconds() / 60 / 60
if isinstance(dt_arr, (list, np.ndarray)):
return list(map(int, map(delta, dt_arr)))
elif isinstance(dt_arr, (datetime)):
return int(delta(dt_arr))
else:
raise ValueError('input must be datetime or array of datetimes')
[docs]def epoch_2_dt(ep_arr, t0=datetime(2000, 1, 1, 0, 0, 0), unit='hours'):
""" convert epoch hours to datetimes """
delta = lambda ep, unit: t0 + timedelta(**{f'{unit}': ep})
if isinstance(ep_arr, (list, np.ndarray)):
return list(map(partial(delta, unit=unit), ep_arr))
elif isinstance(ep_arr, (float, int)):
return delta(ep_arr, unit=unit)
else:
raise ValueError('input must be integer or array of integers')
[docs]def index_arr(val, sorted_arr):
""" converts value in coordinate array to grid index """
if val > sorted_arr[-1]:
return len(sorted_arr) - 1
return np.nonzero(sorted_arr >= val)[0][0]
[docs]def flatten(cols, frames):
""" dimensional reduction by taking average of time frames """
# assert that frames are of equal size
assert len(cols) == 5
assert reduce(lambda a, b: (a == b) * a, frames[1:] - frames[:-1])
# aggregate time frame splits and reduce them to average
frames_ix = np.append([0], frames)
split_num = range(len(frames_ix) - 1)
val_split = np.array(
[cols[0][frames_ix[f]:frames_ix[f + 1]] for f in split_num])
value_avg = (reduce(np.add, val_split) / len(val_split))
_, y, x, _, z = cols[:, frames_ix[0]:frames_ix[1]]
return value_avg, y, x, z
[docs]def reshape_2D(cols):
return dict(values=cols[0], lats=cols[1], lons=cols[2])
[docs]def reshape_3D(cols):
""" prepare loaded data for interpolation
args:
cols: flattened numpy array of shape (4, n)
cols[0]: values
cols[1]: latitude
cols[2]: longitude
cols[3]: depth
return: gridded
dict(values=gridspace, lats=ygrid, lons=xgrid, depths=zgrid)
"""
if isinstance(cols[0], (float, int)):
return dict(values=cols[0])
# if data is 4D, take average of values at time frame intervals
frames = np.append(
np.nonzero(cols[3][1:] > cols[3][:-1])[0] + 1, len(cols[3]))
if len(np.unique(frames)) > 1:
vals, y, x, z = flatten(cols, frames)
else:
vals, y, x, _, z = cols
rows = np.array((vals, y, x, z)).T
# reshape row data to 3D array
xgrid, ygrid, zgrid = np.unique(x), np.unique(y), np.unique(z)
gridspace = np.full((len(ygrid), len(xgrid), len(zgrid)),
fill_value=None,
dtype=float)
# this could potentially be optimized to avoid an index lookup cost
for row in rows:
x_ix = index_arr(row[2], xgrid)
y_ix = index_arr(row[1], ygrid)
z_ix = index_arr(row[3], zgrid)
gridspace[y_ix, x_ix, z_ix] = row[0]
# remove nulls for interpolation:
# fill missing depth values with last value in each column
for xi in range(0, gridspace.shape[0]):
for yi in range(0, gridspace.shape[1]):
col = gridspace[xi, yi]
if sum(np.isnan(col)) > 0 and sum(np.isnan(col)) < len(col):
col[np.isnan(col)] = col[~np.isnan(col)][-1]
gridspace[xi, yi] = col
# null depth columns are filled with the average value at each depth plane
for zi in range(0, gridspace.shape[2]):
gridspace[:, :, zi][np.isnan(gridspace[:, :, zi])] = np.average(
gridspace[:, :, zi][~np.isnan(gridspace[:, :, zi])])
return dict(values=gridspace, lats=ygrid, lons=xgrid, depths=zgrid)
[docs]def str_def(self, info, args):
""" builds string definition for data source class objects """
fcns = [
fcn for fcn in dir(self) if callable(getattr(self, fcn))
and 'load' in fcn and not fcn.startswith("__")
]
return (
f'{info}\n\nfunction input arguments:\n\t{args}\n\nclass functions:'
'\n\t' + '\n\t'.join(fcns) + '\n')
[docs]def ll_2_regionstr(south, north, west, east, regions, default=[]):
""" convert input bounds to region strings with seperating axis theorem """
if west > east: # recursive function call if query intersects antimeridian
return np.union1d(
ll_2_regionstr(south, north, west, +180, regions, default),
ll_2_regionstr(south, north, -180, east, regions, default))
query = Boundary(south, north, west, east)
matching = [str(reg) for reg in regions if query.intersects(reg)]
if len(matching) == 0:
warnings.warn("No regions matched for query. "
f"Defaulting to {default} ({len(default)} regions)")
return default
return np.unique(matching)
[docs]def fmt_coords(kwargs):
return (
f"{abs(kwargs['south']):.2f}°{'S' if kwargs['south'] <= 0 else 'N'},"
f"{abs(kwargs['north']):.2f}°{'S' if kwargs['north'] <= 0 else 'N'},"
f"{abs(kwargs['west']):.2f}°{'W' if kwargs['west'] <= 0 else 'E'}"
f"{abs(kwargs['east']):.2f}°{'W' if kwargs['east'] <= 0 else 'E'}"
f"{','+str(kwargs['bottom'])+'m' if 'bottom' in kwargs.keys() else ''}"
f"{','+str(kwargs['top'])+'m' if 'top' in kwargs.keys() else ''} : ")
[docs]def logmsg(source, var, ntup=(), **kwargs):
logging.info(f'{source.upper()} {var.lower()} '
f'logged {reduce(np.subtract, ntup[::-1])} values in region\t'
f'{json.dumps(kwargs, default=str)}')
[docs]def logmsg_nodata(source, var, **kwargs):
arguments = ("north", "south", "west", "east", "top", "bottom", "start",
"end")
region_string = {k: v for k, v in kwargs.items() if k in arguments}
logging.warning(
f'{source.upper()} {var.lower()}: no data found, '
'returning empty arrays for region\t'
f'{json.dumps(region_string , default=str, sort_keys=True)}')
[docs]class Boundary():
""" compute intersecting boundaries with separating axis theorem """
def __init__(self, south, north, west, east, fetchvar='', **_):
self.south = south
self.north = north
self.west = west
self.east = east
self.fetchvar = fetchvar
def __str__(self):
return self.fetchvar
[docs] def intersects(self, other):
return not (self.east < other.west or self.west > other.east
or self.north < other.south or self.south > other.north)