# ================================================================================ #
# Authors: Fabio Frazao and Oliver Kirsebom #
# Contact: fsfrazao@dal.ca, oliver.kirsebom@dal.ca #
# Organization: MERIDIAN (https://meridian.cs.dal.ca/) #
# Team: Data Analytics #
# Project: ketos #
# Project goal: The ketos library provides functionalities for handling #
# and processing acoustic data and applying deep neural networks to sound #
# detection and classification tasks. #
# #
# License: GNU GPLv3 #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# ================================================================================ #
""" 'neural_networks.dev_utils.export' module within the ketos library
This module contains utilities for saving ketos models in various formats.
"""
import ketos.data_handling.parsing as pa
from tensorflow.saved_model import save as save_pb
# from ketos.audio.audio_loader import audio_repres_dict
from ketos.utils import ensure_dir
from zipfile import ZipFile
import warnings
import json
import shutil
import os
import inspect
import numpy as np
from pathlib import Path, PurePath
def get_export_function(output_name):
r""" Get the appropriate export function for the chosen format.
The function automatically detects the output format from the filename
extension,
* '.pb' or '.PB': protobuf
* '.kt' or '.KT': ketos
* '.ktpb' or '.KTPB': ketos-protobuf (compatible with PAMGuard)
For any other extension, the ketos export function is used.
Args:
output_name: str
The name of the exported model.
Returns:
: func
The export function
"""
if output_name[-3:].lower() == '.pb':
return export_to_protobuf
elif output_name[-3:].lower() == '.kt':
return export_to_ketos
elif output_name[-5:].lower() == '.ktpb':
return export_to_ketos_protobuf
else:
return export_to_ketos
[docs]
def export_to_ketos_protobuf(model, output_name, audio_repr, input_shape=None,
tmp_folder="tmp_export_folder", overwrite=True,
duration=None, backward_compat=True, **kwargs):
r""" Export a ketos model to Ketos-Protobuf format.
Saving your ketos model in Ketos-Protobuf format makes it easier to share it with
collaborators and use it with other software applications.
In particular, the output file generated by this function can be loaded directly into
PAMGuard, an open-source and widely adopted application for passive acoustic monitoring (PAM).
If the output directory does not already exist, it is automatically created.
The function generates a zipped archive containing,
* the tensorflow model in protobuf format (model/model.pb)
* the audio representation (audio_repr.json)
* the ketos model recipe (recipe.json)
The user is free to specify the extension of the output file, but we recommend using \*.ktpb
as this will allow the file to be recognized and loaded into PAMGuard.
A warning will be printed if the method is unable to infer values for the parameters
`duration` and `step`. The model will be saved, but it will not be possible to load
it into PAMGuard.
Args:
model:
The ketos model to be exported. Usually created by one of the Interface classes found
in ketos.neural_networks (e.g.: ResNetInterface)
output_name: str
The name of the exported model. Must have the extension \*.ktpb to ensure that it can
be loaded into PAMGuard.
input_shape: list or tuple.
The input shape expected by the model. It can be represented by a tuple or list of four
elements: (number of intances, width, height, number of channels). The number of instances
and number of channels are commonly 1, and the width and height are usually the number of
time and frequency bins in a spectrogram, respectively. This, however, can vary with
the model in question.
audio_repr: dict or str
Audio representation. For example,
>>> audio_repr = {"type": "MagSpectrogram",
... "rate": "1000 Hz",
... "window": "0.256 s",
... "step": "0.032 s",
... "freq_min": "0 Hz",
... "freq_max": "500 Hz",
... "window_func": "hamming",
... "transforms": [{"name":"normalize"}]
... }
It is also possible to specify the path to a json file containing the audio representation.
tmp_folder: str
The name of a temporary folder created during the model conversion. It will be deleted
upon sucessful execution. If the folder already exists, a 'FileExistsError will be thrown,
unless 'overwrite' is set to True.
overwrite: bool
If True and the folder specified in 'tmp_folder' exists, the folder will be overwritten.
duration: float
Duration in seconds of the input sample. If not specified, the duration is extracted from
the audio representation, or, if not available there, it is computed as `step * input_shape[0]`,
provided that `input_shape` has been specified.
backward_compat: bool
Ensure backward compatibility with ketos versions 4.2.1 and older
Raises:
AssertionError if the input shape cannot be inferred.
"""
if input_shape != None and len(input_shape) == 4 and input_shape[0] == 1 and backward_compat:
print("Warning: ketos is using the following interpretation of the input_shape:" \
+ f"{input_shape} = (number of intances, width, height, number of channels). " \
+ "If this is not the correct interpretation, you should call the " \
+ "export_to_ketos_protobuf function with backward_compat=False.")
input_shape = input_shape[1:]
# if the path to a json file containing the audio representation(s)
# has been specified, load its contents
if isinstance(audio_repr, (str, PurePath)):
audio_repr = pa.load_audio_representation(audio_repr, return_unparsed=True)
if os.path.exists(tmp_folder):
if not overwrite:
raise FileExistsError("{} already exists. If you want to overwrite it set the \
'overwrite' argument to True.".format(tmp_folder))
else:
shutil.rmtree(tmp_folder)
# ensure that the audio representation is specified as a nested directory
if 'type' in audio_repr.keys():
audio_repr = {'spectrogram': audio_repr}
# check that one of the representations is named 'spectrogram'
# if this is not the case, issue a warning
if 'spectrogram' not in audio_repr.keys():
warnings.warn("audio representation named `spectrogram` not found. You will not be " \
"able to load the saved model into PAMGuard.", category=UserWarning)
# if the output directory does not already exist, create it
ensure_dir(output_name)
# loop over audio representations
for name, params in audio_repr.items():
# parse audio representation, if it is not already parsed
if pa.is_encoded(params):
params = pa.parse_audio_representation(params)
# if the user did not specify the sample duration, attempt to infer it
if duration == None:
duration = _infer_duration(params, input_shape)
if duration != None:
print(f'Inferred sample duration of {duration}s')
if duration == None:
warnings.warn("ketos was unable to infer the sample duration. You will not be " \
"able to load the saved model into PAMGuard. If you intend to load your model " \
"into PAMGuard, you must specify the sample duration.", category=UserWarning)
else:
params['duration'] = duration
# if the user did not specify an input shape, attempt to infer it
if input_shape == None:
input_shape = _infer_shape(params)
if input_shape != None:
print(f'Inferred input shape of {input_shape}')
assert input_shape != None, 'ketos was unable to infer the input shape. Use the input_shape ' \
+ 'argument to specify the input shape expected by the model.'
if 'step' not in params.keys():
warnings.warn("ketos was unable to infer the step size. You will not be " \
"able to load the saved model into PAMGuard. If you intend to load your model " \
"into PAMGuard, you must specify the step size.", category=UserWarning)
# if model has not been built, build it
X = model._transform_input(np.ones(shape=input_shape))
if not model.model.built:
model.run_on_instance(X[0])
params['dtype'] = model.model.dtype
params['input_ndims'] = model.model.layers[0].input_spec.min_ndim
params['input_shape'] = X.shape
# encode audio representation parameters before saving to file
params = pa.encode_audio_representation(params)
# create tmp folder
os.makedirs(tmp_folder)
# save recipe to tmp folder
recipe_path = os.path.join(tmp_folder, 'recipe.json')
model.save_recipe_file(recipe_path)
# save model to tmp folder
model_path = os.path.join(tmp_folder, 'model')
save_pb(obj=model.model, export_dir=model_path)
# save audio representation to tmp folder
audio_repr_path = os.path.join(tmp_folder, "audio_repr.json")
with open(audio_repr_path, 'w') as json_repr:
json.dump(audio_repr, json_repr)
# now, archive the contents of the tmp folder
with ZipFile(output_name, 'w') as zip:
zip.write(model_path, "model")
for root, dirs, files in os.walk(model_path):
renamed_root = root.replace(model_path, "model")
for d in dirs:
zip.write(os.path.join(root,d), os.path.join(renamed_root,d))
for f in files:
zip.write(os.path.join(root,f),os.path.join(renamed_root,f))
zip.write(recipe_path, "recipe.json")
zip.write(audio_repr_path, "audio_repr.json")
# tidy up
shutil.rmtree(tmp_folder)
def _infer_duration(params, input_shape):
""" Helper function for :func:`export_to_ketos_protobuf`.
The function first searches the `params` dictionary for an item
with key `duration`. If found, the value of this item is returned.
If not found, the function searches for the key `step` and, if found,
returns the product `params['step'] * input_shape[0]`.
If the function is unable to infer the duration through one of
these two approaches, it returns a None value.
Args:
params: dict
Parsed audio representation.
input_shape: tuple or list
Shape of an input sample.
Returns:
: float
Duration in seconds.
"""
if 'duration' in params.keys():
return params['duration']
elif input_shape != None and 'step' in params.keys():
return params['step'] * input_shape[0]
else:
return None
def _infer_shape(params):
""" Helper function for :func:`export_to_ketos_protobuf`.
Args:
params: dict
Parsed audio representation.
Returns:
: tuple
Inferred shape. If the shape could not be inferred,
a None value is returned.
"""
return params['type'].infer_shape(**params)
[docs]
def export_to_protobuf(model, output_name, **kwargs):
r""" Export a ketos model to Protobuf format (\*.pb).
See also the related fuction :func:`export_to_ketos_protobuf`.
Note that the model must be built before it can be saved.
Args:
model:
The ketos model to be exported. Usually created by one of the Interface
classes found in ketos.neural_networks (e.g.: ResNetInterface)
output_name: str
Folder where the exported model will be saved
"""
assert model.model.built, "The model must be built. Call model.run_on_instance() on a sample input"
# if the output directory does not already exist, create it
ensure_dir(output_name)
save_pb(obj=model.model, export_dir=output_name)
def export_to_ketos(model, output_name, checkpoint_name="cp-0000", audio_repr=None, custom_module=None,
tmp_folder="tmp_export_folder", overwrite=True, metadata=None, extra=None, **kwargs):
r""" Export a ketos model to ketos format (\*.kt).
If the output directory does not already exist, it is automatically created.
The function generates a zipped archive containing,
* the model weights (\*.ckpt)
* the audio representation (audio_repr.json)
* the ketos model recipe (recipe.json)
Args:
model:
The ketos model to be exported. Usually created by one of the Interface classes found
in ketos.neural_networks (e.g.: ResNetInterface)
output_name: str
The name of the exported model. Should have the extension \*.kt
checkpoint_name: str
The name of the checkpoint file that the model weights will be saved to. Optional.
audio_repr: dict or str
Optional audio representation dictionary. If passed, it will be added to the \*.kt file.
For example,
>>> audio_repr = {"spectrogram": {
... "type": "MagSpectrogram",
... "rate": "1000 Hz",
... "window": "0.256 s",
... "step": "0.032 s",
... "freq_min": "0 Hz",
... "freq_max": "500 Hz",
... "window_func": "hamming",
... "transforms": [{"name":"normalize"}]}
... }
It is also possible to specify the path to a json file containing the audio representation.
custom_module: str
Path to a custom module. The custom module can contain anynumber of files. Common files are user defined nn architecture,
audio representation, input transform function and output transform function.
All files inside the module will be saved to the ketos output file and can be later accessed to build the model for
re-training or inference.
tmp_folder: str
The name of a temporary folder created during the model conversion. It will be deleted
upon sucessful execution. If the folder already exists, a 'FileExistsError will be thrown,
unless 'overwrite' is set to True.
overwrite: bool
If True and the folder specified in 'tmp_folder' exists, the folder will be overwritten.
metadata: dict
Optional metadata dictionary. If passed, it will be added to the \*.kt output archive file.
extra: str, list(str)
Full path to one or several additional files to be saved to the output \*.kt archive file.
"""
if not isinstance(extra, list) and extra is not None:
extra = [extra]
tmp_folder = Path(tmp_folder)
# create tmp folder
if tmp_folder.exists():
if not overwrite:
raise FileExistsError("{} already exists. If you want to overwrite it set the \
'overwrite' argument to True.".format(tmp_folder))
else:
shutil.rmtree(tmp_folder)
recipe = model._extract_recipe_dict()
# save audio representation to tmp folder
if audio_repr is not None:
if isinstance(audio_repr, str) and Path(audio_repr).suffix == ".json":
audio_repr = pa.load_audio_representation(audio_repr, return_unparsed=False)
if isinstance(audio_repr, dict) and pa.is_encoded(audio_repr):
audio_repr = pa.parse_audio_representation(audio_repr)
audio_repr = pa.encode_audio_representation(audio_repr)
if custom_module is not None:
custom_module = Path(custom_module).resolve()
# save checkpoints to tmp folder
# Unfortunetly, the current implementation of save_eights and the ZipFile library does not allow to save the model weights directly to the zipfile
weights_path = (tmp_folder / "checkpoints" / checkpoint_name)
weights_path.parent.mkdir(parents=True, exist_ok=True)
model.model.save_weights(weights_path)
# save to zip archive
with ZipFile(output_name, 'w') as archive:
# save recipe
recipe = json.dumps(recipe) # converts the dictionary as a json_string
archive.writestr('recipe.json', recipe)
# save audio representation
if audio_repr is not None:
audio_repr = json.dumps(audio_repr) # properly converts to json string
archive.writestr('audio_repr.json', audio_repr)
# save metadata
if metadata is not None:
metadata = json.dumps(metadata)
archive.writestr("metadata.json", metadata)
# save checkpoints
checkpoints = weights_path.parent.glob('*')
for checkpoint in checkpoints:
archive.write(checkpoint, Path("checkpoints", checkpoint.name))
# Save custom module to the archive
if custom_module is not None:
files = custom_module.rglob('*')
for file in files:
archive.write(file, "custom/" + file.name)
# save any extra contents
if extra is not None:
for extra_file in extra:
archive.write(extra_file, Path(extra_file).name)
# tidy up
shutil.rmtree(tmp_folder)