Source code for ketos.audio.utils.misc

# ================================================================================ #
#   Authors: Fabio Frazao and Oliver Kirsebom                                      #
#   Contact: fsfrazao@dal.ca, oliver.kirsebom@dal.ca                               #
#   Organization: MERIDIAN (https://meridian.cs.dal.ca/)                           #
#   Team: Data Analytics                                                           #
#   Project: ketos                                                                 #
#   Project goal: The ketos library provides functionalities for handling          #
#   and processing acoustic data and applying deep neural networks to sound        #
#   detection and classification tasks.                                            #
#                                                                                  #
#   License: GNU GPLv3                                                             #
#                                                                                  #
#       This program is free software: you can redistribute it and/or modify       #
#       it under the terms of the GNU General Public License as published by       #
#       the Free Software Foundation, either version 3 of the License, or          #
#       (at your option) any later version.                                        #
#                                                                                  #
#       This program is distributed in the hope that it will be useful,            #
#       but WITHOUT ANY WARRANTY; without even the implied warranty of             #
#       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              #
#       GNU General Public License for more details.                               # 
#                                                                                  #
#       You should have received a copy of the GNU General Public License          #
#       along with this program.  If not, see <https://www.gnu.org/licenses/>.     #
# ================================================================================ #

""" 'audio.utils.misc' module within the ketos library

    This module provides utilities to perform various types of 
    operations on audio data, acting either in the time domain 
    (waveform) or in the frequency domain (spectrogram), or 
    both.
"""
import numpy as np
from sys import getsizeof
from psutil import virtual_memory
from ketos.utils import complex_value

[docs] def pad_reflect(x, pad_left=0, pad_right=0, invert=False): """ Pad array with its own (inverted) reflection along the first axis (0). Args: x: numpy.array The data to be padded. pad_left: int Amount of padding on the left pad_right: int Amount of padding on the right invert: bool Whether to invert the reflection. Default is False. Returns: x_padded: numpy.array Padded array Example: >>> from ketos.audio.utils.misc import pad_reflect >>> arr = np.arange(9) #create a simple array >>> print(arr) [0 1 2 3 4 5 6 7 8] >>> arr = pad_reflect(arr, pad_right=3) #pad on the right >>> print(arr) [0 1 2 3 4 5 6 7 8 7 6 5] """ if pad_left == 0 and pad_right == 0: x_padded = x else: pad_left_residual = 0 pad_right_residual = 0 x_padded = x.copy() if pad_left > 0: x_pad = np.flip(x[1:pad_left+1], axis=0) if invert: x_pad = 2*x[0] - x_pad pad_left_residual = max(0, pad_left - x_pad.shape[0]) x_padded = np.concatenate((x_pad, x_padded)) if pad_right > 0: x_pad = np.flip(x[-pad_right-1:-1], axis=0) if invert: x_pad = 2*x[-1] - x_pad pad_right_residual = max(0, pad_right - x_pad.shape[0]) x_padded = np.concatenate((x_padded, x_pad)) if pad_left_residual + pad_right_residual > 0: x_padded = pad_zero(x_padded, pad_left_residual, pad_right_residual) return x_padded
[docs] def pad_zero(x, pad_left=0, pad_right=0): """ Pad array with zeros along the first axis (0). Args: x: numpy.array The data to be padded. pad_left: int Amount of padding on the left pad_right: int Amount of padding on the right Returns: x_padded: numpy.array Padded array Example: >>> from ketos.audio.utils.misc import pad_zero >>> arr = np.arange(9) #create a simple array >>> print(arr) [0 1 2 3 4 5 6 7 8] >>> arr = pad_zero(arr, pad_right=3) #pad on the right >>> print(arr) [0 1 2 3 4 5 6 7 8 0 0 0] """ if pad_left == 0 and pad_right == 0: x_padded = x else: x_padded = x.copy() pad_shape = x.shape if pad_left > 0: pad_shape = tuple([pad_left] + list(x.shape)[1:]) x_pad = np.zeros(pad_shape, dtype=x.dtype) x_padded = np.concatenate((x_pad, x_padded)) if pad_right > 0: pad_shape = tuple([pad_right] + list(x.shape)[1:]) x_pad = np.zeros(pad_shape, dtype=x.dtype) x_padded = np.concatenate((x_padded, x_pad)) return x_padded
[docs] def num_samples(time, rate, even=False): """ Convert time interval to number of samples. If the time corresponds to a non-integer number of samples, round to the nearest larger integer value. Args: time: float Timer interval in seconds rate: float Sampling rate in Hz even: bool Convert to nearest larger even integer. Returns: n: int Number of samples Example: >>> from ketos.audio.utils.misc import num_samples >>> print(num_samples(rate=1000., time=0.0)) 0 >>> print(num_samples(rate=1000., time=2.0)) 2000 >>> print(num_samples(rate=1000., time=2.001)) 2001 >>> print(num_samples(rate=1000., time=2.001, even=True)) 2002 """ e = np.finfo(np.float32).eps #machine precision f = time * rate n = int(f) if f - n > e: n = int(np.ceil(f)) if even and n%2 == 1: n += 1 return n
[docs] def segment_args(rate, offset, window, step, duration): """ Computes input arguments for :func:`audio.utils.misc.make_segment` to produce a centered spectrogram with properties as close as possible to those specified. Args: rate: float Sampling rate in Hz offset: float Offset in seconds window: float Window size in seconds step: float Window size in seconds duration: float Duration in seconds Returns: : dict Dictionary with following keys and values: * win_len: Window size in number of samples (int) * step_len: Step size in number of samples (int) * num_segs: Number of steps (int) * offset_len: Offset in number of samples (int) Example: >>> from ketos.audio.utils.misc import segment_args >>> args = segment_args(rate=1000., duration=3., offset=0., window=0.1, step=0.02) >>> for key,value in sorted(args.items()): ... print(key,':',value) num_segs : 150 offset_len : -40 step_len : 20 win_len : 100 """ win_len = num_samples(window, rate=rate, even=True) step_len = num_samples(step, rate=rate, even=True) num_segs = num_samples(duration, rate=rate/step_len) offset_len = num_samples(offset, rate=rate) - int(win_len/2) + int(step_len/2) return {'win_len':win_len, 'step_len':step_len, 'num_segs':num_segs, 'offset_len':offset_len}
[docs] def segment(x, win_len, step_len, num_segs=None, offset_len=0, pad_mode='reflect', mem_warning=True): """ Divide an array into segments of equal length along its first axis (0), each segment being shifted by a fixed amount with respetive to the previous segment. If offset_len is negative the input array will be padded with its own inverted reflection on the left. If the combined length of the segments exceeds the length of the input array (minus any positive offset), the array will be padded with its own inverted reflection on the right. Args: x: numpy.array The data to be segmented win_len: int Window length in no. of samples step_len: float Step size in no. of samples num_segs: int Number of segments. Optional. offset_len: int Position of the first frame in no. of samples. Defaults to 0, if not specified. pad_mode: str Padding mode. Select between 'reflect' (default) and 'zero'. mem_warning: bool Print warning if the size of the array exceeds 10% of the available memory. Returns: segs: numpy.array Segmented data, has shape (num_segs, win_len, x.shape[1:]) Example: >>> from ketos.audio.utils.misc import segment >>> x = np.arange(10) >>> print(x) [0 1 2 3 4 5 6 7 8 9] >>> y = segment(x, win_len=4, step_len=2, num_segs=3, offset_len=0) >>> print(y) [[0 1 2 3] [2 3 4 5] [4 5 6 7]] >>> y = segment(x, win_len=4, step_len=2, num_segs=3, offset_len=-3) >>> print(y) [[3 2 1 0] [1 0 1 2] [1 2 3 4]] """ mem = virtual_memory() #memory available siz = getsizeof(x) * win_len / step_len #estimated size of output array if siz > 0.1 * mem.total: #print warning, if output array is very large print("Warning: size of output frames exceeds 10% of memory") print("Consider reducing the array size and/or increasing the step size and/or reducing the window size") # if not specified, compute number of segments so entire array is used if num_segs is None: num_segs = int(np.ceil((len(x) - offset_len - win_len) / step_len)) + 1 # pad, if necessary pad_left = max(0, -offset_len) pad_right = max(0, max(0, offset_len) + num_segs * step_len + win_len - x.shape[0]) if pad_mode.lower() == 'reflect': x_pad = pad_reflect(x, pad_left, pad_right) else: x_pad = pad_zero(x, pad_left, pad_right) # tile indices = np.tile(np.arange(0, win_len), (num_segs, 1)) + np.tile(np.arange(0, num_segs * step_len, step_len), (win_len, 1)).T segs = x_pad[indices.astype(np.int32, copy=False)] return segs
[docs] def stft(x, rate, window=None, step=None, seg_args=None, window_func='hamming', decibel=True, compute_phase=False): """ Compute Short Time Fourier Transform (STFT). Uses :func:`audio.utils.misc.segment_args` to convert the window size and step size into an even integer number of samples. The number of points used for the Fourier Transform is equal to the number of samples in the window. Args: x: numpy.array Audio signal rate: float Sampling rate in Hz window: float Window length in seconds step: float Step size in seconds seg_args: dict Input arguments for :func:`audio.utils.misc.segment_args`. Optional. If specified, the arguments `window` and `step` are ignored. window_func: str Window function (optional). Select between * bartlett * blackman * hamming (default) * hanning decibel: bool Convert to dB scale compute_phase: bool Compute complex phase angle. Default it False Returns: img: numpy.array Short Time Fourier Transform of the input signal. freq_max: float Maximum frequency in Hz num_fft: int Number of points used for the Fourier Transform. seg_args: dict Input arguments used for evaluating :func:`audio.utils.misc.segment_args`. cpx_angle: numpy.array Complex phase angle in radians. None unless compute_phase=True """ if seg_args is None: assert window and step, "if seg_args is not specified, window and step must both be specified." seg_args = segment_args(rate=rate, duration=len(x)/rate,\ offset=0, window=window, step=step) #compute input arguments for segment method segs = segment(x, **seg_args) #divide audio signal into segments if window_func: segs *= eval("np.{0}".format(window_func))(segs.shape[1]) #apply Window function fft = np.fft.rfft(segs) #Compute fast fourier transform img = np.abs(fft) if decibel: img = to_decibel(img) #Compute magnitude on dB scale num_fft = segs.shape[1] #Number of points used for the Fourier Transform freq_max = rate / 2. #Maximum frequency if compute_phase: cpx_angle = np.angle(fft) else: cpx_angle = None return img, freq_max, num_fft, seg_args, cpx_angle
[docs] def cqt(x, rate, step, bins_per_oct, freq_min, freq_max=None, window_func='hamming'): """ Compute the CQT spectrogram of an audio signal. Uses the librosa implementation, https://librosa.github.io/librosa/generated/librosa.core.cqt.html To compute the CQT spectrogram, the user must specify the step size, the minimum and maximum frequencies, :math:`f_{min}` and :math:`f_{max}`, and the number of bins per octave, :math:`m`. While :math:`f_{min}` and :math:`m` are fixed to the input values, the step size and :math:`f_{max}` are adjusted as detailed below, attempting to match the input values as closely as possible. The total number of bins is given by :math:`n = k \cdot m` where :math:`k` denotes the number of octaves, computed as .. math:: k = ceil(log_{2}[f_{max}/f_{min}]) For example, with :math:`f_{min}=10`, :math:`f_{max}=16000`, and :math:`m = 32` the number of octaves is :math:`k = 11` and the total number of bins is :math:`n = 352`. The frequency of a given bin, :math:`i`, is given by .. math:: f_{i} = 2^{i / m} \cdot f_{min} This implies that the maximum frequency is given by :math:`f_{max} = f_{n} = 2^{n/m} \cdot f_{min}`. For the above example, we find :math:`f_{max} = 20480` Hz, i.e., somewhat larger than the requested maximum value. Note that if :math:`f_{max}` exceeds the Nyquist frequency, :math:`f_{nyquist} = 0.5 \cdot s`, where :math:`s` is the sampling rate, the number of octaves, :math:`k`, is reduced to ensure that :math:`f_{max} \leq f_{nyquist}`. The CQT algorithm requires the step size to be an integer multiple :math:`2^k`. To ensure that this is the case, the step size is computed as follows, .. math:: h = ceil(s \cdot x / 2^k ) \cdot 2^k where :math:`s` is the sampling rate in Hz, and :math:`x` is the step size in seconds as specified via the argument `winstep`. For example, assuming a sampling rate of 32 kHz (:math:`s = 32000`) and a step size of 0.02 seconds (:math:`x = 0.02`) and adopting the same frequency limits as above (:math:`f_{min}=10` and :math:`f_{max}=16000`), the actual step size is determined to be :math:`h = 2^{11} = 2048`, corresponding to a physical bin size of :math:`t_{res} = 2048 / 32000 Hz = 0.064 s`, i.e., about three times as large as the requested step size. TODO: If possible, remove librosa dependency Args: x: numpy.array Audio signal rate: float Sampling rate in Hz step: float Step size in seconds bins_per_oct: int Number of bins per octave freq_min: float Minimum frequency in Hz freq_max: float Maximum frequency in Hz. If None, it is set equal to half the sampling rate. window_func: str Window function (optional). Select between * bartlett * blackman * hamming (default) * hanning Returns: img: numpy.array Resulting CQT spectrogram image. step: float Adjusted step size in seconds. """ from librosa.core import cqt f_nyquist = 0.5 * rate k_nyquist = int(np.floor(np.log2(f_nyquist / freq_min))) if freq_max is None: k = k_nyquist else: k = int(np.ceil(np.log2(freq_max/freq_min))) k = min(k, k_nyquist) h0 = int(2**k) b = bins_per_oct bins = k * b h = rate * step r = int(np.ceil(h / h0)) h = int(r * h0) img = cqt(y=x, sr=rate, hop_length=h, fmin=freq_min, n_bins=bins, bins_per_octave=b, window=window_func) img = to_decibel(np.abs(img)) img = np.swapaxes(img, 0, 1) step = h / rate return img, step
[docs] def to_decibel(x): """ Convert any data array, :math:`y`, typically a spectrogram, from linear scale to decibel scale by applying the operation :math:`20\log_{10}(y)`. Args: x : numpy array Input array Returns: y : numpy array Converted array Example: >>> import numpy as np >>> from ketos.audio.utils.misc import to_decibel >>> img = np.array([[10., 20.],[30., 40.]]) >>> img_db = to_decibel(img) >>> img_db = np.around(img_db, decimals=2) # only keep up to two decimals >>> print(img_db) [[20.0 26.02] [29.54 32.04]] """ y = 20 * np.ma.log10(x) return y
[docs] def from_decibel(y): """ Convert any data array, :math:`y`, typically a spectrogram, from decibel scale to linear scale by applying the operation :math:`10^{y/20}`. Args: y : numpy array Input array Returns: x : numpy array Converted array Example: >>> import numpy as np >>> from ketos.audio.utils.misc import from_decibel >>> img = np.array([[10., 20.],[30., 40.]]) >>> img_db = from_decibel(img) >>> img_db = np.around(img_db, decimals=2) # only keep up to two decimals >>> print(img_db) [[ 3.16 10. ] [ 31.62 100. ]] """ x = np.power(10., y/20.) return x
[docs] def mag2pow(img, num_fft): """ Convert a Magnitude spectrogram to a Power spectrogram. Args: img: numpy.array Magnitude spectrogram image, linear axis num_fft: int Number of points used for the FFT. Returns: : numpy.array Power spectrogram image """ return (1.0 / num_fft) * (img ** 2)
[docs] def hz_to_mel(freq): """ Convert frequency to position on Mel scale Args: freq: array-like Frequency in Hz Returns: mel: array-like Mel scale position """ return 2595 * np.log10(1 + freq / 700)
[docs] def mel_to_hz(mel): """ Convert position on Mel scale to frequency Args: mel: array-like Mel scale position Returns: freq: array-like Frequency in Hz """ return 700 * (10**(mel / 2595) - 1)
[docs] def mel_filter_bank(num_fft, rate, num_filters): """ Compute Mel-scale filter bank Args: num_fft: int Number of points used for the FFT. rate: float Sampling rate in Hz. num_filters: int The number of filters in the filter bank. Returns: fbank: numpy.array Mel filter bank, has shape (num_filters, int(np.floor(num_fft / 2 + 1)) """ low_freq_mel = 0 high_freq_mel = hz_to_mel(rate / 2) # Convert Hz to Mel mel_points = np.linspace(low_freq_mel, high_freq_mel, num_filters + 2) # Equally spaced in Mel scale hz_points = mel_to_hz(mel_points) # Convert Mel to Hz bin = np.floor((num_fft + 1) * hz_points / rate) bin_edges = hz_points[1:] - 0.5 * np.concatenate([np.diff(hz_points[1:]),[0]]) bin_edges[0] = 0 fbank = np.zeros((num_filters, int(np.floor(num_fft / 2 + 1)))) for m in range(1, num_filters + 1): f_m_minus = int(bin[m - 1]) # left f_m = int(bin[m]) # center f_m_plus = int(bin[m + 1]) # right for k in range(f_m_minus, f_m): fbank[m - 1, k] = (k - bin[m - 1]) / (bin[m] - bin[m - 1]) for k in range(f_m, f_m_plus): fbank[m - 1, k] = (bin[m + 1] - k) / (bin[m + 1] - bin[m]) return fbank
[docs] def mag2mel(img, num_fft, rate, num_filters): """ Convert a Magnitude spectrogram to a Mel spectrogram. Args: img: numpy.array Magnitude spectrogram image, linear axis num_fft: int Number of points used for the FFT. rate: float Sampling rate in Hz. num_filters: int The number of filters in the filter bank. Returns: mel_spec: numpy.array Mel spectrogram image """ power_spec = mag2pow(img, num_fft) fbank = mel_filter_bank(num_fft, rate, num_filters) mel_spec = np.dot(power_spec, fbank.T) return mel_spec
[docs] def mag2mfcc(img, num_fft, rate, num_filters, num_ceps, cep_lifter): """ Convert a Magnitude spectrogram to a Mel-frequency cepstrum. TODO: Check that the formulas used to compute the MFCCs are correct and consistent with those used in librosa: https://librosa.org/doc/main/generated/librosa.feature.mfcc.html# Args: img: numpy.array Magnitude spectrogram image, linear axis num_fft: int Number of points used for the FFT. rate: float Sampling rate in Hz. num_filters: int The number of filters in the filter bank. num_ceps: int The number of Mel-frequency cepstrums. cep_lifters: int The number of cepstum filters. Returns: mfcc: numpy.array Mel frequency cepstral coefficients """ img = mag2mel(img, num_fft, rate, num_filters) img = np.where(img == 0, np.finfo(float).eps, img) # Numerical Stability img = to_decibel(img) # dB mfcc = dct(img, type=2, axis=1, norm='ortho')[:, 1 : (num_ceps + 1)] # Keep 2-13 (nframes, ncoeff) = mfcc.shape n = np.arange(ncoeff) lift = 1 + (cep_lifter / 2) * np.sin(np.pi * n / cep_lifter) mfcc *= lift return mfcc
[docs] def spec2wave(image, phase_angle, num_fft, step_len, num_iters, window_func): """ Estimate audio signal from magnitude spectrogram. Implements the algorithm described in D. W. Griffin and J. S. Lim, “Signal estimation from modified short-time Fourier transform,” IEEE Trans. ASSP, vol.32, no.2, pp.236–243, Apr. 1984. Follows closely the implentation of https://github.com/tensorflow/magenta/blob/master/magenta/models/nsynth/utils.py TODO: If possible, remove librosa dependency Args: image: 2d numpy array Magnitude spectrogram, linear scale phase_angle: Initial condition for phase in radians num_fft: int Number of points used for the Fast-Fourier Transform. Same as window size. step_len: int Step size. num_iters: Number of iterations to perform. window_func: string, tuple, number, function, np.ndarray [shape=(num_fft,)] - a window specification (string, tuple, or number); see `scipy.signal.get_window` - a window function, such as `scipy.signal.hamming` - a user-specified window vector of length `num_fft` Returns: audio: 1d numpy array Audio signal Example: >>> #Create a simple sinusoidal audio signal with frequency of 10 Hz >>> import numpy as np >>> x = np.arange(1000) >>> audio = 32600 * np.sin(2 * np.pi * 10 * x / 1000) >>> #Compute the Short Time Fourier Transform of the audio signal >>> #using a window size of 200, step size of 40, and a Hamming window, >>> from ketos.audio.utils.misc import stft >>> win_fun = 'hamming' >>> mag, freq_max, num_fft, _, _ = stft(x=audio, rate=1000, seg_args={'win_len':200, 'step_len':40}, window_func=win_fun) >>> #Estimate the original audio signal >>> from ketos.audio.utils.misc import spec2wave >>> audio_est = spec2wave(image=mag, phase_angle=0, num_fft=num_fft, step_len=40, num_iters=25, window_func=win_fun) >>> #plot the original and the estimated audio signal >>> import matplotlib.pyplot as plt >>> plt.clf() >>> _ = plt.plot(audio) >>> plt.savefig("ketos/tests/assets/tmp/sig_orig.png") >>> _ = plt.plot(audio_est) >>> plt.savefig("ketos/tests/assets/tmp/sig_est.png") .. image:: ../../../ketos/tests/assets/tmp/sig_est.png """ from librosa import istft, magphase, stft # swap axis to conform with librosa image = np.swapaxes(image, 0, 1) if np.ndim(phase_angle) == 2: phase_angle = np.swapaxes(phase_angle, 0, 1) # settings for FFT and inverse FFT fft_config = dict(n_fft=num_fft, win_length=num_fft, hop_length=step_len, center=False, window=window_func) ifft_config = dict(win_length=num_fft, hop_length=step_len, center=False, window=window_func) # initial spectrogram for iterative algorithm complex_specgram = complex_value(image, phase_angle) # Griffin-Lim iterative algorithm for i in range(num_iters): audio = istft(complex_specgram, **ifft_config) if i != num_iters - 1: complex_specgram = stft(audio, **fft_config) _, phase = magphase(complex_specgram) angle = np.angle(phase) complex_specgram = complex_value(image, angle) # Cut return audio