Source code for ketos.audio.waveform
# ================================================================================ #
# Authors: Fabio Frazao and Oliver Kirsebom #
# Contact: fsfrazao@dal.ca, oliver.kirsebom@dal.ca #
# Organization: MERIDIAN (https://meridian.cs.dal.ca/) #
# Team: Data Analytics #
# Project: ketos #
# Project goal: The ketos library provides functionalities for handling #
# and processing acoustic data and applying deep neural networks to sound #
# detection and classification tasks. #
# #
# License: GNU GPLv3 #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# ================================================================================ #
""" Waveform module within the ketos library
This module provides utilities to work with audio data.
Contents:
Waveform class
"""
import os
import numpy as np
import soundfile as sf
import warnings
import scipy.io.wavfile as wave
import scipy.signal
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from ketos.utils import ensure_dir, morlet_func
from ketos.data_handling.data_handling import read_wave
from ketos.audio.annotation import AnnotationHandler
from ketos.audio.utils.axis import LinearAxis
from ketos.audio.base_audio import BaseAudioTime, segment_data
import ketos.audio.utils.misc as aum
def _validate_wf_args(path, offset, duration):
''' Validate and standardize values
Args:
path: str or list(str)
Path to input audio file(s)
offset: float or list(float)
Start of segment measured in seconds from the start of the file.
duration: float or list(float)
Segment length in seconds.
Returns:
path, offset, duration: list
Validated and standardized values
'''
if np.ndim(path) == 0:
path = [path]
if np.ndim(offset) == 0:
offset = [offset for _ in path]
if np.ndim(duration) == 0:
duration = [duration for _ in path]
assert len(offset) == len(path), "offset and path must have the same length"
assert len(duration) == len(path), "duration and path must have the same length"
return path, offset, duration
def get_sampling_rate(path):
''' Get the (common or lowest) sampling rate of the specified audio segments.
Args:
path: str or list(str)
Path to input audio file(s)
Returns:
: float
Inferred sampling rate in Hz
'''
if np.ndim(path) == 0:
path = [path]
# get the sampling rates of the audio file(s)
rates = []
for p in path:
if p is not None:
with sf.SoundFile(p, "r") as f:
rates.append(f.samplerate)
if len(rates) == 0:
warnings.warn("Sampling rate could not be inferred. This may cause problems.", UserWarning)
return None
elif len(rates) == 1:
return rates[0]
else:
if np.sum(np.diff(rates)) > 0:
warnings.warn("Audio files have different sampling rates. Files with higher sampling rate "\
"will be downsampled to obtain consisten sampling rates as required to stitch the files "\
"together.", UserWarning)
rate = np.min(rates)
return rate
def get_duration(path, offset=0, duration=None):
''' Get the durations of the specified audio file segments.
Args:
path: str or list(str)
Path to input audio file(s)
offset: float or list(float)
Start of segment measured in seconds from the start of the file.
duration: float or list(float)
Segment length in seconds.
Returns:
res: list
Durations in seconds
'''
path, offset, duration = _validate_wf_args(path, offset, duration)
res = []
for i in range(len(path)):
if duration[i] is None:
assert path[i] is not None, "duration must be specified if path is None"
try:
with sf.SoundFile(path[i], "r") as f:
d = f.frames / f.samplerate - offset[i]
except sf.LibsndfileError as e:
# print(f"{e} Skipping File.")
d = 0 # set duration to 0 or any default value for corrupted files
# raise RuntimeError(e)
else:
d = duration[i]
res.append(d)
return res
def merge(waveforms, smooth=0.01):
''' Merge waveforms by stitching them together with the `append` method.
All waveforms must have the same sampling rate. If this is not the case,
an AssertionError is thrown.
Args:
waveforms: list
Waveform instances to be merged
smooth: float
Width in seconds of the smoothing region used for stitching together audio files.
Returns:
wf0: Instance of Waveform
Merged waveforms
'''
if np.ndim(waveforms) == 0:
waveforms = [waveforms]
if len(waveforms) == 1:
return waveforms[0]
wf0 = waveforms[0].deepcopy()
for wf in waveforms[1:]:
n_smooth = int(smooth * wf.rate)
wf0.append(wf, n_smooth=n_smooth)
return wf0
def plot(waveforms, labels="", figsize=(5,4), title="", offset=0, duration=None):
""" Plot one or several waveforms superimposed on one another.
Note: The resulting figure can be shown (fig.show())
or saved (fig.savefig(file_name))
Args:
waveforms: Waveform or list(Waveform)
Waveforms to be plotted
labels: str or list(str)
Labels used to identify the waveforms.
Must have the same length as waveforms.
figsize: tuple
Figure size
title: str
Figure title.
offset, duration: float
Start time and length of the plotted segment in seconds.
If not specified, the full waveform will be plotted.
Returns:
fig: matplotlib.figure.Figure
Figure object.
"""
if isinstance(waveforms, Waveform): waveforms = [waveforms]
if isinstance(labels, str): labels = [labels]
assert len(waveforms) == len(labels), "waveforms and labels must have the same length"
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=figsize)
colors = [f"C{i}" for i in range(6)]
lstyles = ['-','--',':','-.']
for i,wf in enumerate(waveforms):
start = min(offset, wf.duration())
end = wf.duration()
if duration != None: end = min(end, start + duration)
wfc = wf.crop(start=start, end=end, make_copy=True)
col = colors[i%len(colors)]
lsty = lstyles[i%len(lstyles)]
x = np.linspace(start=start, stop=end, num=wfc.data.shape[0])
y = wfc.get_data()
ax.plot(x, y, label=labels[i], color=col, linestyle=lsty)
ax.set_xlabel(wfc.time_ax.label)
ax.set_ylabel('Amplitude')
ax.set_title(title)
if len(waveforms) > 1: ax.legend()
return fig
[docs]class Waveform(BaseAudioTime):
""" Audio signal
Args:
rate: float
Sampling rate in Hz
data: numpy array
Audio data
filename: str
Filename of the original audio file, if available (optional)
offset: float
Position within the original audio file, in seconds
measured from the start of the file. Defaults to 0 if not specified.
label: int
Spectrogram label. Optional
annot: AnnotationHandler
AnnotationHandler object. Optional
transforms: list(dict)
List of dictionaries, where each dictionary specifies the name of
a transformation to be applied to this instance. For example,
{"name":"normalize", "mean":0.5, "std":1.0}
transform_log: list(dict)
List of transforms that have been applied to this instance
Attributes:
rate: float
Sampling rate in Hz
data: 1numpy array
Audio data
time_ax: LinearAxis
Axis object for the time dimension
filename: str
Filename of the original audio file, if available (optional)
offset: float
Position within the original audio file, in seconds
measured from the start of the file. Defaults to 0 if not specified.
label: int
Spectrogram label.
annot: AnnotationHandler
AnnotationHandler object.
transform_log: list(dict)
List of transforms that have been applied to this instance
"""
def __init__(self, data, time_res=None, filename='', offset=0, label=None, annot=None, transforms=None,
transform_log=None, **kwargs):
assert time_res is not None or 'rate' in kwargs, "either time_res or rate must be specified"
if time_res is None:
self.rate = kwargs['rate']
else:
self.rate = 1. / time_res
super().__init__(data=data, time_res=1./self.rate, filename=filename, offset=offset, label=label, annot=annot,
transform_log=transform_log, **kwargs)
self.allowed_transforms.update({'add_gaussian_noise': self.add_gaussian_noise,
'bandpass_filter': self.bandpass_filter})
self.apply_transforms(transforms)
[docs] def get_repres_attrs(self):
""" Get audio representation attributes """
attrs = super().get_repres_attrs()
attrs.update({'rate':self.rate, 'type':self.__class__.__name__})
return attrs
[docs] @classmethod
def from_wav(cls, path, channel=0, rate=None, offset=0, duration=None, resample_method='scipy',
id=None, normalize_wav=False, transforms=None, pad_mode="reflect", smooth=0.01, **kwargs):
""" Load audio data from one or several audio files.
When loading from several audio files, the waveforms are stitched together in
the order in which they are provided using the `append` method. Note that only
the name and offset of the first file are stored in the `filename` and `offset`
attributes.
Note that - despite the misleading name - this method can load other audio formats
than WAV. In particular, it also handles FLAC quite well.
TODO: Rename this function and document in greater detail which formats are supported.
Args:
path: str or list(str)
Path to input wave file(s).
channel: int
In the case of stereo recordings, this argument is used
to specify which channel to read from. Default is 0.
rate: float
Desired sampling rate in Hz. If None, the original sampling rate will be used
offset: float or list(float)
Position within the original audio file, in seconds
measured from the start of the file. Defaults to 0 if not specified.
duration: float or list(float)
Length in seconds.
resample_method: str
Resampling method. Only relevant if `rate` is specified. Options are
* kaiser_best
* kaiser_fast
* scipy (default)
* polyphase
See https://librosa.github.io/librosa/generated/librosa.core.resample.html
for details on the individual methods.
id: str
Unique identifier (optional). If provided, it is stored in the `filename` class attribute
instead of the filename. A common use of the `id` argument is to specify a full or relative
path to the file, including one or several directory levels.
normalize_wav: bool
Normalize the waveform to have a mean of zero (mean=0) and a standard
deviation of unity (std=1). Default is False.
transforms: list(dict)
List of dictionaries, where each dictionary specifies the name of
a transformation to be applied to this instance. For example,
{"name":"normalize", "mean":0.5, "std":1.0}
smooth: float
Width in seconds of the smoothing region used for stitching together audio files.
pad_mode: str
Padding mode. Select between 'reflect' (default) and 'zero'.
Returns:
Instance of Waveform
Audio signal
Example:
>>> from ketos.audio.waveform import Waveform
>>> # read audio signal from wav file
>>> a = Waveform.from_wav('ketos/tests/assets/grunt1.wav')
>>> # show signal
>>> fig = a.plot()
>>> fig.savefig("ketos/tests/assets/tmp/audio_grunt1.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/audio_grunt1.png
"""
path, offset, duration = _validate_wf_args(path, offset, duration)
if rate is None:
rate = get_sampling_rate(path)
waveforms = []
for i in range(len(path)):
wf = cls._from_single_file(path=path[i], channel=channel, rate=rate, offset=offset[i],
duration=duration[i], resample_method=resample_method, id=id, normalize_wav=normalize_wav,
transforms=transforms, pad_mode=pad_mode, **kwargs)
waveforms.append(wf)
wf = merge(waveforms, smooth=smooth)
return wf
@classmethod
def _from_single_file(cls, path, channel=0, rate=None, offset=0, duration=None, resample_method='scipy',
id=None, normalize_wav=False, transforms=None, pad_mode="reflect", **kwargs):
""" Load audio data from a single audio file.
If `duration` (and `offset`) are specified and `offset + duration` exceeds the
length of the audio file, the signal will be padded with its own reflection on
the right to achieve the desired duration. Similarly, if `offset < 0`, the signal
will be padded on the left. In both cases, a RuntimeWarning is issued.
If `offset` exceeds the file duration, an empty waveform is returned and a
RuntimeWarning is issued.
If `path` is None a waveform with length `int(rate * duration)` with purely zero
values will be returned. (Requires that both `rate` and `duration` are specified.)
TODO: If possible, remove librosa dependency
Args:
path: str
Path to input audio file
channel: int
In the case of stereo recordings, this argument is used
to specify which channel to read from. Default is 0.
rate: float
Desired sampling rate in Hz. If None, the original sampling rate will be used
offset: float
Position within the original audio file, in seconds
measured from the start of the file. Defaults to 0 if not specified.
duration: float
Length in seconds.
resample_method: str
Resampling method. Only relevant if `rate` is specified. Options are
* kaiser_best
* kaiser_fast
* scipy (default)
* polyphase
See https://librosa.github.io/librosa/generated/librosa.core.resample.html
for details on the individual methods.
id: str
Unique identifier (optional). If provided, it is stored in the `filename` class attribute
instead of the filename. A common use of the `id` argument is to specify a full or relative
path to the file, including one or several directory levels.
normalize_wav: bool
Normalize the waveform to have a mean of zero (mean=0) and a standard
deviation of unity (std=1). Default is False.
transforms: list(dict)
List of dictionaries, where each dictionary specifies the name of
a transformation to be applied to this instance. For example,
{"name":"normalize", "mean":0.5, "std":1.0}
pad_mode: str
Padding mode. Select between 'reflect' (default) and 'zero'.
Returns:
Instance of Waveform
Audio signal
"""
if path is None:
assert duration is not None, "duration must be specified if path is None"
assert rate is not None, "rate must be specified if path is None"
return cls(rate=rate, data=np.zeros(int(rate*duration)), filename=id, offset=0)
if transforms is None: transforms = []
assert duration is None or duration >= 0, 'duration must be non-negative'
# if 'id' is not specified, use the filename
if id is None: id = os.path.basename(path)
# original sampling rate in Hz
rate_orig = get_sampling_rate(path)
# file duration in seconds
file_duration = get_duration(path)[0]
# if the offset exceeds the file duration, return an empty array
# and issue a warning
if offset >= file_duration:
data = np.array([], dtype=np.float64)
if rate is None: rate = rate_orig
warnings.warn("Offset exceeds file duration. Empty waveform returned", RuntimeWarning)
return cls(rate=rate, data=data, filename=id, offset=offset)
# if the duration is specified to 0, return an empty array
# and issue a warning
if duration is not None and duration == 0:
data = np.array([], dtype=np.float64)
if rate is None: rate = rate_orig
warnings.warn("Duration is zero. Empty waveform returned", RuntimeWarning)
return cls(rate=rate, data=data, filename=id, offset=offset)
# if the offset is negative, pad with zeros on the left
num_pad_left = 0
if offset is not None and offset < 0:
sr = rate_orig if rate is None else rate
if duration is None:
num_pad_left = int(-offset*sr)
else:
num_pad_left = int(min(-offset, duration)*sr)
duration += offset
duration = max(0, duration)
num_pad_left = max(0, num_pad_left)
if duration is not None and duration == 0:
data = np.array([], dtype=np.float64)
if rate is None: rate = rate_orig
warnings.warn("Stop is before file start. Empty waveform returned", RuntimeWarning)
return cls(rate=rate, data=data, filename=id, offset=offset)
# determine start and stop times for reading the wav files
start = aum.num_samples(max(0,offset), rate_orig)
if duration is not None:
stop = aum.num_samples(max(0,offset) + duration, rate_orig)
else:
stop = None
# read data and sampling rate
rate_orig, data = read_wave(file=path, channel=channel, start=start, stop=stop)
# if necessary, re-sample
if rate is not None and rate != rate_orig:
from librosa.core import resample
data = resample(data, orig_sr=rate_orig, target_sr=rate, res_type=resample_method)
else:
rate = rate_orig
# pad on left and/or right to achieve desired duration, if necessary
if duration is not None:
num_pad_right = max(0, int(duration * rate - data.shape[0]))
if num_pad_right > 0 or num_pad_left > 0:
if pad_mode.lower() == 'reflect':
data = aum.pad_reflect(data, pad_left=num_pad_left, pad_right=num_pad_right)
warnings.warn("Waveform padded with its own reflection to achieve required length to compute the stft. {0} samples were padded on the left and {1} samples were padded on the right".format(num_pad_left, num_pad_right), RuntimeWarning)
else:
data = aum.pad_zero(data, pad_left=num_pad_left, pad_right=num_pad_right)
warnings.warn("Waveform padded with zeros to achieve the required length to compute the stft. {0} samples were padded on the left and {1} samples were padded on the right".format(num_pad_left, num_pad_right), RuntimeWarning)
if normalize_wav:
transforms.append({'name':'normalize','mean':0.0,'std':1.0})
return cls(rate=rate, data=data, filename=id, offset=offset, transforms=transforms, **kwargs)
[docs] @classmethod
def gaussian_noise(cls, rate, sigma, samples, filename="gaussian_noise"):
""" Generate Gaussian noise signal
Args:
rate: float
Sampling rate in Hz
sigma: float
Standard deviation of the signal amplitude
samples: int
Length of the audio signal given as the number of samples
filename: str
Meta-data string (optional)
Returns:
Instance of Waveform
Audio signal sampling of Gaussian noise
Example:
>>> from ketos.audio.waveform import Waveform
>>> # create gaussian noise with sampling rate of 10 Hz, standard deviation of 2.0 and 1000 samples
>>> a = Waveform.gaussian_noise(rate=10, sigma=2.0, samples=1000)
>>> # show signal
>>> fig = a.plot()
>>> fig.savefig("ketos/tests/assets/tmp/audio_noise.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/audio_noise.png
"""
assert sigma > 0, "sigma must be strictly positive"
y = np.random.normal(loc=0, scale=sigma, size=samples)
return cls(rate=rate, data=y, filename=filename)
[docs] @classmethod
def morlet(cls, rate, frequency, width, samples=None, height=1, displacement=0, dfdt=0, filename="morlet"):
""" Audio signal with the shape of the Morlet wavelet
Uses :func:`util.morlet_func` to compute the Morlet wavelet.
Args:
rate: float
Sampling rate in Hz
frequency: float
Frequency of the Morlet wavelet in Hz
width: float
Width of the Morlet wavelet in seconds (sigma of the Gaussian envelope)
samples: int
Length of the audio signal given as the number of samples (if no value is given, samples = 6 * width * rate)
height: float
Peak value of the audio signal
displacement: float
Peak position in seconds
dfdt: float
Rate of change in frequency as a function of time in Hz per second.
If dfdt is non-zero, the frequency is computed as
f = frequency + (time - displacement) * dfdt
filename: str
Meta-data string (optional)
Returns:
Instance of Waveform
Audio signal sampling of the Morlet wavelet
Examples:
>>> from ketos.audio.waveform import Waveform
>>> # create a Morlet wavelet with frequency of 3 Hz and 1-sigma width of envelope set to 2.0 seconds
>>> wavelet1 = Waveform.morlet(rate=100., frequency=3., width=2.0)
>>> # show signal
>>> fig = wavelet1.plot()
>>> fig.savefig("ketos/tests/assets/tmp/morlet_standard.png")
.. image:: ../../../ketos/tests/assets/tmp/morlet_standard.png
>>> # create another wavelet, but with frequency increasing linearly with time
>>> wavelet2 = Waveform.morlet(rate=100., frequency=3., width=2.0, dfdt=0.3)
>>> # show signal
>>> fig = wavelet2.plot()
>>> fig.savefig("ketos/tests/assets/tmp/morlet_dfdt.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/morlet_dfdt.png
"""
if samples is None:
samples = int(6 * width * rate)
N = int(samples)
# compute Morlet function at N equally spaced points
dt = 1. / rate
stop = (N-1.)/2. * dt
start = -stop
time = np.linspace(start, stop, N)
y = morlet_func(time=time, frequency=frequency, width=width, displacement=displacement, norm=False, dfdt=dfdt)
y *= height
return cls(rate=rate, data=np.array(y), filename=filename)
[docs] @classmethod
def cosine(cls, rate, frequency, duration=1, height=1, displacement=0, filename="cosine"):
""" Audio signal with the shape of a cosine function
Args:
rate: float
Sampling rate in Hz
frequency: float
Frequency of the Morlet wavelet in Hz
duration: float
Duration of the signal in seconds
height: float
Peak value of the audio signal
displacement: float
Phase offset in fractions of 2*pi
filename: str
Meta-data string (optional)
Returns:
Instance of Waveform
Audio signal sampling of the cosine function
Examples:
>>> from ketos.audio.waveform import Waveform
>>> # create a Cosine wave with frequency of 7 Hz
>>> cos = Waveform.cosine(rate=1000., frequency=7.)
>>> # show signal
>>> fig = cos.plot()
>>> fig.savefig("ketos/tests/assets/tmp/cosine_audio.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/cosine_audio.png
"""
N = int(duration * rate)
# compute cosine function at N equally spaced points
dt = 1. / rate
stop = (N-1.)/2. * dt
start = -stop
time = np.linspace(start, stop, N)
x = (time * frequency + displacement) * 2 * np.pi
y = height * np.cos(x)
return cls(rate=rate, data=np.array(y), filename=filename)
[docs] def to_wav(self, path, auto_loudness=True):
""" Save audio signal to wave file
Args:
path: str
Path to output wave file
auto_loudness: bool
Automatically amplify the signal so that the
maximum amplitude matches the full range of
a 16-bit wav file (32760)
"""
ensure_dir(path)
if auto_loudness:
m = max(1, np.max(np.abs(self.data)))
s = 32760 / m
else:
s = 1
wave.write(filename=path, rate=int(self.rate), data=(s*self.data).astype(dtype=np.int16))
[docs] def plot(self, show_annot=False, figsize=(5,4), label_in_title=True, append_title='', show_envelope=False):
""" Plot the data with proper axes ranges and labels.
Optionally, also display annotations as boxes superimposed on the data.
Note: The resulting figure can be shown (fig.show())
or saved (fig.savefig(file_name))
Args:
show_annot: bool
Display annotations
figsize: tuple
Figure size
label_in_title: bool
Include label (if available) in figure title
append_title: str
Append this string to the title
show_envelope: bool
Display envelope on top of signal
Returns:
fig: matplotlib.figure.Figure
Figure object.
Example:
>>> from ketos.audio.waveform import Waveform
>>> # create a morlet wavelet
>>> a = Waveform.morlet(rate=100, frequency=5, width=1)
>>> # plot the wave form
>>> fig = a.plot()
>>> plt.close(fig)
.. image:: ../_static/morlet.png
"""
fig, ax = super().plot(figsize, label_in_title, append_title)
y = self.get_data()
x = np.linspace(start=0, stop=self.duration(), num=self.data.shape[0])
ax.plot(x, y)
ax.set_ylabel('Amplitude')
# superimpose envelope
if show_envelope:
z = np.abs(scipy.signal.hilbert(y))
ax.plot(x, z, color='C1')
# superimpose annotation boxes
if show_annot: self._draw_annot_boxes(ax)
#fig.tight_layout()
return fig
def _draw_annot_boxes(self, ax):
"""Draws annotations boxes on top of the spectrogram
Args:
ax: matplotlib.axes.Axes
Axes object
"""
annots = self.get_annotations()
if annots is None: return
y1, y2 = ax.get_ylim()
y1 *= 0.95
y2 *= 0.95
for idx,annot in annots.iterrows():
x1 = annot['start']
x2 = annot['end']
box = patches.Rectangle((x1,y1),x2-x1,y2-y1,linewidth=1,edgecolor='C3',facecolor='none')
ax.add_patch(box)
ax.text(x1, y2, int(annot['label']), ha='left', va='bottom', color='C3')
[docs] def append(self, signal, n_smooth=0):
""" Append another audio signal to the present instance.
The two audio signals must have the same samling rate.
If n_smooth > 0, a smooth transition is made between the
two signals by padding the signals with their reflections
to form an overlap region of length n_smooth in which a
linear transition is made using the `_smoothclamp` function.
This is done in manner that ensure that the duration of the
output signal is exactly the sum of the durations of the two
input signals.
Note that the current implementation of the smoothing procedure is
quite slow, so it is advisable to use small value for n_smooth.
Args:
signal: Waveform
Audio signal to be appended.
n_smooth: int
Width of the smoothing/overlap region (number of samples).
Returns:
None
Example:
>>> from ketos.audio.waveform import Waveform
>>> # create a morlet wavelet
>>> mor = Waveform.morlet(rate=100, frequency=5, width=1)
>>> # create a cosine wave
>>> cos = Waveform.cosine(rate=100, frequency=3, duration=4)
>>> # append the cosine wave to the morlet wavelet, using a overlap of 100 bins
>>> mor.append(signal=cos, n_smooth=100)
>>> # show the wave form
>>> fig = mor.plot()
>>> fig.savefig("ketos/tests/assets/tmp/morlet_cosine.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/morlet_cosine.png
"""
assert self.rate == signal.rate, "Cannot merge audio signals with different sampling rates."
# if appending signal to itself, make a copy
if signal is self:
signal = self.deepcopy()
# ensure that overlap region is shorter than either signal
n_smooth = min(n_smooth, len(self.data) - 1)
n_smooth = min(n_smooth, len(signal.data) - 1)
# make sure n_smooth is even
n_smooth += n_smooth % 2
if n_smooth == 0:
self.data = np.concatenate([self.data, signal.data], axis=0)
else:# smoothly join
# extend by own reflections
a = np.concatenate([self.data, self.data[-2:int(-2-n_smooth/2):-1]])
b = np.concatenate([signal.data[n_smooth//2:0:-1], signal.data])
# split into separate and overlap
ao = a[-n_smooth:]
bo = b[:n_smooth]
a = a[:-n_smooth]
b = b[n_smooth:]
# compute values in overlap region
c = np.empty(n_smooth)
for i in range(n_smooth):
w = _smoothclamp(i, 0, n_smooth-1)
c[i] = (1.-w) * ao[i] + w * bo[i]
self.data = np.concatenate([a,c,b], axis=0)
# re-init time axis
length = self.data.shape[0] / self.rate
self.time_ax = LinearAxis(bins=self.data.shape[0], extent=(0., length), label='Time (s)')
[docs] def add_gaussian_noise(self, sigma):
""" Add Gaussian noise to the signal
Args:
sigma: float
Standard deviation of the gaussian noise
Example:
>>> from ketos.audio.waveform import Waveform
>>> # create a morlet wavelet
>>> morlet = Waveform.morlet(rate=100, frequency=2.5, width=1)
>>> morlet_pure = morlet.deepcopy() # make a copy
>>> # add some noise
>>> morlet.add_gaussian_noise(sigma=0.3)
>>> # show the wave form
>>> fig = morlet_pure.plot()
>>> fig.savefig("ketos/tests/assets/tmp/morlet_wo_noise.png")
>>> fig = morlet.plot()
>>> fig.savefig("ketos/tests/assets/tmp/morlet_w_noise.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/morlet_wo_noise.png
.. image:: ../../../ketos/tests/assets/tmp/morlet_w_noise.png
"""
noise = Waveform.gaussian_noise(rate=self.rate, sigma=sigma, samples=len(self.data))
self.add(noise)
self.transform_log.append({'name':'add_gaussian_noise', 'sigma':sigma})
[docs] def bandpass_filter(self, freq_min=None, freq_max=None, N=3):
""" Apply a lowpass, highpass, or bandpass filter to the signal.
Uses SciPy's implementation of an Nth-order digital Butterworth filter.
The critical frequencies, freq_min and freq_max, correspond to the points
at which the gain drops to 1/sqrt(2) that of the passband (the “-3 dB point”).
Args:
freq_min: float
Lower limit of the frequency window in Hz.
(Also sometimes referred to as the highpass frequency).
If None, a lowpass filter is applied.
freq_max: float
Upper limit of the frequency window in Hz.
(Also sometimes referred to as the lowpass frequency)
If None, a highpass filter is applied.
N: int
The order of the filter. The default value is 3.
Example:
>>> from ketos.audio.waveform import Waveform
>>> # create a Cosine waves with frequencies of 7 and 14 Hz
>>> cos = Waveform.cosine(rate=1000., frequency=7.)
>>> cos14 = Waveform.cosine(rate=1000., frequency=14.)
>>> cos.add(cos14)
>>> # show combined signal
>>> fig = cos.plot()
>>> fig.savefig("ketos/tests/assets/tmp/cosine_double_audio.png")
>>> plt.close(fig)
>>> # apply 10 Hz highpass filter
>>> cos.bandpass_filter(freq_max=10)
>>> # show filtered signal
>>> fig = cos.plot()
>>> fig.savefig("ketos/tests/assets/tmp/cosine_double_hp_audio.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/cosine_double_audio.png
.. image:: ../../../ketos/tests/assets/tmp/cosine_double_hp_audio.png
"""
if freq_min is None and freq_max is None: return
if freq_min is None:
Wn = freq_max
btype = 'lowpass'
elif freq_max is None:
Wn = freq_min
btype = 'highpass'
else:
Wn = (freq_min, freq_max)
btype = 'bandpass'
b,a = scipy.signal.butter(N=N, Wn=Wn, btype=btype, fs=self.rate)
self.data = scipy.signal.filtfilt(b, a, self.data)
self.transform_log.append({'name':'bandpass_filter', 'freq_min':freq_min, 'freq_max':freq_max, 'N':N})
[docs] def add(self, signal, offset=0, scale=1):
""" Add the amplitudes of the two audio signals.
The audio signals must have the same sampling rates.
The summed signal always has the same length as the present instance.
If the audio signals have different lengths and/or a non-zero delay is selected,
only the overlap region will be affected by the operation.
If the overlap region is empty, the original signal is unchanged.
Args:
signal: Waveform
Audio signal to be added
offset: float
Shift the audio signal by this many seconds
scale: float
Scaling factor applied to signal that is added
Example:
>>> from ketos.audio.waveform import Waveform
>>> # create a cosine wave
>>> cos = Waveform.cosine(rate=100, frequency=1., duration=4)
>>> # create a morlet wavelet
>>> mor = Waveform.morlet(rate=100, frequency=7., width=0.5)
>>> mor.duration()
3.0
>>> # add the morlet wavelet on top of the cosine, with a shift of 1.5 sec and a scaling factor of 0.5
>>> cos.add(signal=mor, offset=1.5, scale=0.5)
>>> # show the wave form
>>> fig = cos.plot()
>>> fig.savefig("ketos/tests/assets/tmp/morlet_cosine_added.png")
>>> plt.close(fig)
.. image:: ../../../ketos/tests/assets/tmp/morlet_cosine_added.png
"""
assert self.rate == signal.rate, "Cannot add audio signals with different sampling rates."
# if appending signal to itself, make a copy
if signal is self:
signal = self.deepcopy()
# convert to bin numbers
bin_offset = self.time_ax.bin(offset, truncate=True)
bin_start = self.time_ax.bin(-offset, truncate=True)
# crop signal that is being added
length = self.data.shape[0] - bin_offset
signal = signal.crop(start=-offset, length=length)
# add the two signals
b = bin_offset
bins = signal.data.shape[0]
self.data[b:b+bins] = self.data[b:b+bins] + scale * signal.data
[docs] def resample(self, new_rate, resample_method='scipy'):
""" Resample the acoustic signal with an arbitrary sampling rate.
TODO: If possible, remove librosa dependency
Args:
new_rate: int
New sampling rate in Hz
resample_method: str
Resampling method. Only relevant if `rate` is specified. Options are
* kaiser_best
* kaiser_fast
* scipy (default)
* polyphase
See https://librosa.github.io/librosa/generated/librosa.core.resample.html
for details on the individual methods.
"""
import librosa.core
if len(self.data) < 2:
self.rate = new_rate
else:
self.data = librosa.core.resample(self.get_data(), orig_sr=self.rate, target_sr=new_rate, res_type=resample_method)
self.rate = new_rate
self.time_ax = LinearAxis(bins=self.data.shape[0], extent=(0., self.data.shape[0] / self.rate), label='Time (s)') #new time axis
def _smoothclamp(x, mi, mx):
""" Smoothing function
"""
return (lambda t: np.where(t < 0 , 0, np.where( t <= 1 , 3*t**2-2*t**3, 1 ) ) )( (x-mi)/(mx-mi) )