Source code for ketos.utils

# ================================================================================ #
#   Authors: Fabio Frazao and Oliver Kirsebom                                      #
#   Contact: fsfrazao@dal.ca, oliver.kirsebom@dal.ca                               #
#   Organization: MERIDIAN (https://meridian.cs.dal.ca/)                           #
#   Team: Data Analytics                                                           #
#   Project: ketos                                                                 #
#   Project goal: The ketos library provides functionalities for handling          #
#   and processing acoustic data and applying deep neural networks to sound        #
#   detection and classification tasks.                                            #
#                                                                                  #
#   License: GNU GPLv3                                                             #
#                                                                                  #
#       This program is free software: you can redistribute it and/or modify       #
#       it under the terms of the GNU General Public License as published by       #
#       the Free Software Foundation, either version 3 of the License, or          #
#       (at your option) any later version.                                        #
#                                                                                  #
#       This program is distributed in the hope that it will be useful,            #
#       but WITHOUT ANY WARRANTY; without even the implied warranty of             #
#       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              #
#       GNU General Public License for more details.                               # 
#                                                                                  #
#       You should have received a copy of the GNU General Public License          #
#       along with this program.  If not, see <https://www.gnu.org/licenses/>.     #
# ================================================================================ #

""" Utilities module within the ketos library

    This module provides a number of auxiliary methods.
"""

import os
import numpy as np
import pandas as pd
from scipy.signal import find_peaks
from functools import reduce


def user_format_warning(message, category, filename, lineno, line=None):
    ''' Warning message formatted for users. 

        https://docs.python.org/3/library/warnings.html

        Args:
            message: str
                Warning message
            category: warnings.Warning
                Warning category.
            filename: str
                Path to the source code file.
            lineno: int
                Line in the source code that triggered the warning.
            line: str
                @line is a line of source code to be included in the warning message; 
                if line is not supplied, formatwarning() will try to read the line 
                specified by filename and lineno.

        Returns:
            : str
                Formatted warning message

        Example:
            >>> import warnings
            >>> from ketos.utils import user_format_warning
            >>> warnings.formatwarning = user_format_warning #switch format
            >>> warnings.warn("This is a warning intended for users") #print a warning
    '''
    return '%s: %s\n' % (category.__name__, message)


def dev_format_warning(message, category, filename, lineno, line=None):
    ''' Warning message formatted for developers. 
    
        https://docs.python.org/3/library/warnings.html

        Args:
            message: str
                Warning message
            category: warnings.Warning
                Warning category.
            filename: str
                Path to the source code file.
            lineno: int
                Line in the source code that triggered the warning.
            line: str
                @line is a line of source code to be included in the warning message; 
                if line is not supplied, formatwarning() will try to read the line 
                specified by filename and lineno.

        Returns:
            : str
                Formatted warning message

        Example:
            >>> import warnings
            >>> from ketos.utils import dev_format_warning
            >>> warnings.formatwarning = dev_format_warning #switch format
            >>> warnings.warn("This is a warning intended for developers") #print a warning    
    '''
    return '%s:%s: %s: %s\n' % (filename, lineno, category.__name__, message)


def fractional_overlap(a, b):
    ''' Compute the fractional overlap of two intervals, defined as
        (length of overlap) / (length of the shortest interval of the two).
        For example, if a=(3,7) and b=(5.5,10), the overlap interval is 
        (5.5,7) which has length 1.5. Interval a has length 4 while b has 
        length 4.5. Therefore, the fractional overlap is 1.5/4 = 0.375 = 37.5%.

        Args:
            a: tuple
                One interval.
            b: tuple
                Another interval.

        Returns:
            : float
                The fractional overlap.
    '''
    c1 = max(a[0], b[0])
    c2 = min(a[1], b[1])
    if a[1] - a[0] == 0 or b[1] - b[0] == 0:
        return 0        
    return (c2 - c1) / min(a[1] - a[0], b[1] - b[0])

[docs] def factors(n): """ Returns sorted set of all divisors of n Args: n: int Integer number Returns: s: set Sorted set of all divisors of n """ s = set(reduce(list.__add__, ([i, n//i] for i in range(1, int(n**0.5) + 1) if n % i == 0))) return s
[docs] def ensure_dir(file_path): """ Ensure that destination directory exists. If the directory does not exist, it is created. If it already exists, nothing happens. Args: file_path: str Full path to destination """ directory = os.path.dirname(file_path) if directory != "" and directory != "." and not os.path.exists(directory): os.makedirs(directory)
[docs] def random_floats(size=1, low=0, high=1, seed=1): """ Returns a random number or numpy array of randum numbers uniformly distributed in the half-open interval [low, high) Args: size: int Number of random numbers to be generated low: float Lower value high: float Upper value (not included) seed: int Seed for the random number generator Returns: res: float or numpy array Generated random number(s) Example: >>> from ketos.utils import random_floats >>> >>> x = random_floats(3, 0.4, 7.2) >>> print(x) [3.23574963 5.29820656 0.40077775] """ assert high >= low, "Upper limit must be greater than or equal to lower limit" assert size >= 1, "Size must be an int greater than or equal to 1" np.random.seed(seed) if high == low: if size == 1: res = high else: res = np.ones(size, dtype=float) * high else: rndm = np.random.random_sample(size) res = low + (high - low) * rndm if len(res) == 1: res = np.float(res) return res
[docs] def ndim(a): """ Returns the number of dimensions of a list/tuple/array. Args: a: list, tuple or numpy array Object that we wish to obtain the dimension of Returns: n: int Number of dimensions Example: >>> from ketos.utils import ndim >>> >>> x = [[0, 1, 2, 3],(4, 5)] >>> print(ndim(x)) 2 """ if not (type(a) == list or type(a) == tuple or type(a) == np.ndarray): return 0 if len(a) == 0: return 1 n = 1 + ndim(a[0]) return n
[docs] def tostring(box, decimals=None): """ Convert an array, tuple or list into a string. Args: box: array, tuple or list Array, tuple or list that will be converted into a string. decimals: int Number of decimals that will be kept in the conversion to string. Returns: s: str String representation of array/tuple/list. Example: >>> from ketos.utils import tostring >>> >>> y = [[0, 1, 2, 3],(4, 5)] >>> print(tostring(y)) [[0,1,2,3],[4,5]] """ if box is None: return '' box = np.array(box) if decimals is not None: box = np.around(box, decimals=int(decimals)) box = box.tolist() s = str(box) s = s.replace(' ', '') s = s.replace('(', '[') s = s.replace(')', ']') return s
[docs] def octave_bands(band_min=-1, band_max=9): """ Compute the min, central, and max frequency value of the specified octave bands, using the following formulas, f_centre = 10^3 * 2^p , f_min = f_centre / sqrt(2) , f_max = f_centre * sqrt(2) , where p = band_number - 5 Args: band_min: int Lower octave band band_max: int Upper octave band Returns: fcentre: numpy array Central frequency of each band (in Hz) flow: numpy array Minimum frequency of each band (in Hz) fhigh: numpy array Maximum frequency of each band (in Hz) Example: >>> from ketos.utils import octave_bands >>> >>> fc, fmin, fmax = octave_bands(1, 3) >>> print(fc) [ 62.5 125. 250. ] """ p = np.arange(band_min-5., band_max-4.) fcentre = np.power(10.,3) * np.power(2.,p) fd = np.sqrt(2.) flow = fcentre / fd fhigh = fcentre * fd return fcentre, flow, fhigh
[docs] def octave_bands_json(band_min, band_max): """ Produce a string of the specified octave bands in json format Args: band_min: int Lower octave band band_max: int Upper octave band Returns: s: str json format string Example: >>> from ketos.utils import octave_bands_json >>> >>> s = octave_bands_json(1, 2) """ fcentre, flow, fhigh = octave_bands(band_min, band_max) s = "\"frequency_bands\": [\n" n = len(flow) for i in range(n): s += "\t{\n" s += "\t\t\"name\": \"{0:.0f}Hz\",\n".format(fcentre[i]) s += "\t\t\"range\": [\"{0:.1f}Hz\", \"{1:.1f}Hz\"]".format(flow[i],fhigh[i]) endpar = "\n\t}" if i < n-1: endpar += "," s += endpar + "\n" s += "]" return s
[docs] def morlet_func(time, frequency, width, displacement, norm=True, dfdt=0): """ Compute Morlet wavelet function The function is implemented as in Eq. (15) in John Ashmead, "Morlet Wavelets in Quantum Mechanics", Quanta 2012; 1: 58-70, with the replacement f -> 2*pi*f*s, to allow f to be identified with the physical frequency. Args: time: float or numpy array Time in seconds at which the function is to be evaluated frequency: float Wavelet frequency in Hz width: float Wavelet width in seconds (1-sigma width of the Gaussian envelope function) displacement: float Wavelet centroid in seconds norm: bool Include [pi^1/4*sqrt(sigma)]^-1 normalization factor dfdt: float Rate of change in frequency as a function of time in Hz per second. If dfdt is non-zero, the frequency is computed as f = frequency + (time - displacement) * dfdt Returns: y: float or numpy array Value of Morlet wavelet function at time t Example: >>> from ketos.utils import morlet_func >>> >>> time = np.array([-1., 0., 0.5]) >>> f = morlet_func(time=time, frequency=10, width=3, displacement=0) >>> print(f) [0.41022718 0.43366254 0.42768108] """ if dfdt != 0: frequency += (time - displacement) * dfdt assert np.all(frequency > 0), "Frequency must be a strictly positive float" assert width > 0, "Width must be a strictly positive float" t = time w = 2 * np.pi * frequency * width s = width l = displacement x = (t-l)/s y = (np.exp(1j*w*x) - np.exp(-0.5*(w**2))) * np.exp(-0.5*(x**2)) if norm: y *= (s * np.sqrt(np.pi) * (1 + np.exp(-w**2) - 2*np.exp(-0.75*w**2)) )**-0.5 return np.real(y)
[docs] def nearest_values(x, i, n): """ Returns the n values nearest to index i from the array x. Here, nearest refers to the position in the array, not the value. Args: x: numpy array Input values i: int Index n: int Number of neighboring values Returns: y: numpy array n values nearest to index i from the array x Example: >>> from ketos.utils import nearest_values >>> >>> x = np.array([1.0, 4.0, 5.1, 6.0, 0.2, 0.3, 10.0]) >>> y = nearest_values(x=x, i=3, n=3) >>> print(y) [5.1 6. 0.2] """ if n >= x.shape[0]: return x if n%2 == 0: i1 = int(i - n/2) i2 = int(i + n/2 - 1) else: i1 = int(i - (n-1)/2) i2 = int(i + (n-1)/2) if i1 >= 0 and i2 < x.shape[0]: return x[i1:i2+1] v = list() v.append(x[i]) k = 1 while len(v) < n: if k%2 == 0: d = k/2 else: d = (k+1)/2 j = int(i + np.power(-1,k) * d) if j >= 0 and j < x.shape[0]: v.append(x[j]) k += 1 v = np.array(v) return v
[docs] def detect_peaks(df, distance=1, multiplicity=1, prominence=1.0, height=None, threshold=None): """ Detect peaks in time-series data. The time-series data is provided in the form of a Pandas DataFrame object, where each column contains a different time series. This is essentially a wrapper around a SciPy's find_peaks method: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks.html Args: df: Pandas DataFrame Data frame containing the input data. distance: int Minimum distance between adjacent peaks multiplicity: int Number of time series in which peaks must appear to be counted. prominence: float Required prominence of the peaks. The prominence of a peak measures how much a peak stands out from the surrounding baseline of the signal and is defined as the vertical distance between the peak and its lowest contour line. See also https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.peak_prominences.html#scipy.signal.peak_prominences height: float Required absolute height of the peaks. threshold: float Required threshold of peaks (the vertical distance to its neighbouring samples). Returns: y: Pandas DataFrame Data frame containing the detected peaks Example: >>> from ketos.utils import detect_peaks >>> import pandas as pd >>> >>> # create a two time series, where only the first contains a peak >>> d = {'series1' : pd.Series([1.0, 2.3, 22.0, 2.2, 1.5]), 'series2': pd.Series([1.0, 2.3, 1.8, 2.2, 1.5])} >>> df = pd.DataFrame(d) >>> >>> # detect peaks with multiplicity 1 and prominence of at least 2.0 >>> peaks = detect_peaks(df=df, multiplicity=1, prominence=2.0) >>> print(peaks) [0 0 1 0 0] >>> >>> # try again, but this time require multiplicity 2 >>> peaks = detect_peaks(df=df, multiplicity=2, prominence=2.0) >>> print(peaks) [0 0 0 0 0] """ peaks = pd.DataFrame(index=df.index) for column in df: x = df[column] m = np.median(np.abs(x - np.median(x))) if prominence > 0: min_prominence = m * prominence else: min_prominence = None positions, _ = find_peaks(x, height=height, threshold=threshold, distance=distance, prominence=(min_prominence,None)) y = np.zeros(len(x)) y[positions] = 1 peaks[column] = y # sum across each row peaks = peaks.sum(axis=1) # create column of FALSEs (no detection) and TRUES (detection) detections = pd.DataFrame((peaks >= multiplicity)) # convert to 0s and 1s and extract numpy array res = detections[detections.columns[0]].astype(int).values return res
[docs] def complex_value(mag, angle): """ Computes complex value from magnitude and phase angle. Args: mag: numpy array Magnitude angle: float or numpy array Phase angle in radians Returns: c: numpy array Complex value """ phase = np.cos(angle) + 1.j * np.sin(angle) c = mag * phase return c
[docs] def get_member(cls, member_name): """ Query class member by name. Returns ValueError if class does not contain a member by that name. Args: cls: Class Class member_name: str Member name Returns: member: The class member """ for name, member in cls.__members__.items(): if member_name == name: return member s = ", ".join(name for name, _ in cls.__members__.items()) raise ValueError("Unknown value \'{0}\'. Select between: {1}".format(member_name, s))
[docs] def str_is_int(s, signed=True): """ Check if a given string represents a (signed) integer. Args: s: str Input string. signed: bool Check if string represents a signed integer (default) or unsigned. Returns: res: bool Result of check """ if signed: res = s.isdigit() or (s.startswith('-') and s[1:].isdigit()) or (s.startswith('+') and s[1:].isdigit()) else: res = s.isdigit() return res
[docs] def signif(x, p): """ Round to a specified number of significant digits. Args: x: array-like Values to be rounded p: int Number of significant digits Returns: : array-like Rounded values """ x = np.asarray(x) x_positive = np.where(np.isfinite(x) & (x != 0), np.abs(x), 10**(p-1)) mags = 10 ** (p - 1 - np.floor(np.log10(x_positive))) return np.round(x * mags) / mags
[docs] def ceil(a, decimals=0): """ This function adds the ability to ceil to a decimal precision instead of to the nearest integer. Similar to `np.round()` `precision` argument. Args: a: array_like Input data. decimals: int Number of decimal places to round to (default: 0). Returns: : array_like The ceil of a Example: >>> from ketos.utils import ceil >>> ceil(13.84) 14.0 >>> ceil(13.36) 14.0 >>> ceil(13.84, decimals=1) 13.9 >>> ceil(13.844444, decimals=3) 13.845 """ return np.true_divide(np.ceil(np.asarray(a) * 10**decimals), 10**decimals)
[docs] def floor(a, decimals=0): """ This function adds the ability to floor to a decimal precision instead of to the nearest integer. Similar to `np.round()` `precision` argument. Args: a: array_like Input data. decimals: int Number of decimal places to round to (default: 0). Returns: : array_like The floor of a Example: >>> from ketos.utils import floor >>> floor(13.84) 13.0 >>> floor(13.36) 13.0 >>> floor(13.36, decimals=1) 13.3 >>> floor(13.3669999, decimals=3) 13.366 """ return np.true_divide(np.floor(np.asarray(a) * 10**decimals), 10**decimals)
[docs] def ceil_round_down(a, decimals=6): """ Provides a convenient way to use ceil while specifying a decimal precision to floor instead This helps deal with imprecision of finite number of floating points arithmetics For instance: `2.8/0.2` can be displayed as `14.00000000001` instead of `14`. And this leads to `np.ceil(2.8/0.2) == 15` With this function we can specify a decimal point to round down values that are very close to the previous integer. For instance with the default `decimals=6`, any number with decimals equal or smaller than .000001 is rounded down to the previous integer otherwise ceil is used. `ceil_round_down(13.000001, decimals=6) == 13` while ceil_round_down(13.0000011, decimals=6) == 14` More examples below. Args: a: array_like Input data. decimals: int Decimal places. decimals == 0 is the same as `np.ceil()` Returns : ndarray or scalar The ceil of each element in x. This is a scalar if x is a scalar. Floor is used depending on decimals Example: >>> from ketos.utils import ceil_round_down >>> ceil_round_down(13.000001, decimals=6) 13.0 >>> ceil_round_down(13.0000011, decimals=6) 14.0 >>> ceil_round_down(13.0000010000001, decimals=6) 14.0 >>> ceil_round_down(13.00000000001, decimals=0) 14.0 >>> ceil_round_down(13.1, decimals=1) 13.0 >>> ceil_round_down(13.10000001, decimals=1) 14.0 """ if decimals == 0: return np.ceil(a) p = 1*(10**-decimals) is_scalar = False if np.isscalar(a): is_scalar = True # Forcing the array to have 1-dim even if scalar so that we can iterate through it a = np.array(a, copy=False, ndmin=1) a = np.asarray([np.floor(x) if floor(x - np.floor(x), decimals) < p else np.ceil(x) for x in a]) if is_scalar: return a.item() return a
[docs] def floor_round_up(a, decimals=6): """ Provides a convenient way to use floor while specifying a decimal precision to ceil instead This helps deal with imprecision of finite number of floating points arithmetics For instance: `2.8/0.2` can be displayed as `13.99999999998` instead of `14`. And this leads to `np.floor(2.8/0.2) == 13` With this function we can specify a decimal point to round up values that are very close to the next integer. For instance with the default `decimals=6`, any number with decimals equal or bigger than .999999 is rounded up to the next integer otherwise floor is used. `floor_round_up(13.999999, decimals=6) == 14` while floor_round_up(13.999998, decimals=6) == 13` More examples below. Args: a: array_like Input data. decimals: int Decimal places. decimals == 0 is the same as `np.floor()` Returns : ndarray or scalar The floor of each element in x. This is a scalar if x is a scalar. Ceil is used depending on decimals Example: >>> from ketos.utils import floor_round_up >>> floor_round_up(13.999998, decimals=6) 13.0 >>> floor_round_up(13.999999, decimals=6) 14.0 >>> floor_round_up(13.9999989999, decimals=6) 13.0 >>> floor_round_up(13.9999998, decimals=0) 13.0 >>> floor_round_up(13.9, decimals=1) 14.0 >>> floor_round_up(13.8999999, decimals=1) 13.0 """ if decimals == 0: return np.floor(a) p = 1*(10**-decimals) is_scalar = False if np.isscalar(a): is_scalar = True # Forcing the array to have 1-dim even if scalar so that we can iterate through it a = np.array(a, copy=False, ndmin=1) a = np.asarray([np.ceil(x) if floor(np.ceil(x) - x, decimals) < p else np.floor(x) for x in a]) if is_scalar: return a.item() return a