Source code for atlalign.ml_utils.callbacks

"""Callbacks and aggregation functions."""

"""
    The package atlalign is a tool for registration of 2D images.

    Copyright (C) 2021 EPFL/Blue Brain Project

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

import pathlib

import h5py
import mlflow
import pandas as pd
from tensorflow import keras

from atlalign.data import annotation_volume, segmentation_collapsing_labels
from atlalign.metrics import evaluate_single
from atlalign.ml_utils.io import SupervisedGenerator


[docs]def get_mlflow_artifact_path(start_char=7): """Get path to the MLFlow artifacts of the active run. Stupid implementation. Parameters ---------- start_char : int Since the string will start like "file:///actual/path..." we just slice it. """ return pathlib.Path(mlflow.active_run().info.artifact_uri[start_char:])
[docs]class MLFlowCallback(keras.callbacks.Callback): """Logs metrics into ML. Notes ----- Only runs inside of an mlflow context. Parameters ---------- merged_path : str Path to the master h5 file containing all the data. train_original_ixs_path : str Path to where original training indices are stored. val_original_ixs_path : str Path to where original validation indices are stored. freq : int Reports metrics on every `freq` batch. workers : int Number of workers to be used for each of the evaluations. return_inverse : bool If True, then generators behave differently. starting_step : int Useful when we want to use a checkpointed model and log metrics as of a different step then 1. use_validation : int If True, then the custom metrics are computed on the validation set. Otherwise they will be computed on the training set. """ def __init__( self, merged_path, train_original_ixs_path, val_original_ixs_path, freq=10, workers=1, return_inverse=False, starting_step=0, use_validation=True, ): super().__init__() # Check if inside of an mlflow context if mlflow.active_run() is None: raise ValueError( "To use the MLFlowCallback one needs to be inside of a mlflow.start_run context." ) # mlflow self.root_path = get_mlflow_artifact_path() mlflow.log_params( { "train_original_ixs_path": train_original_ixs_path, "val_original_ixs_path": val_original_ixs_path, "merged_path": merged_path, } ) self.train_original_gen = SupervisedGenerator( merged_path, indexes=train_original_ixs_path, shuffle=False, batch_size=1, return_inverse=return_inverse, ) self.val_original_gen = SupervisedGenerator( merged_path, indexes=val_original_ixs_path, shuffle=False, batch_size=1, return_inverse=return_inverse, ) self.freq = freq self.workers = workers self.overall_batch = starting_step self.use_validation = use_validation
[docs] def on_train_begin(self, logs=None): """Save model architecture.""" arch_path = self.root_path / "architecture" checkpoints_path = self.root_path / "checkpoints" arch_path.mkdir(parents=True, exist_ok=True) checkpoints_path.mkdir(parents=True, exist_ok=True)
[docs] def on_batch_end(self, batch, logs=None): """Log metrics to mlflow. The goal here is two extract 3 types of metrics: - train_merged - extracted from logs (it is a running average over epoch) - train_original - computed via evaluate_generator - val_original - computed via evaluate_generator """ self.overall_batch += 1 if self.overall_batch % self.freq != 0: return model = self.model metric_names = model.metrics_names all_metrics = {} # Keras all_metrics.update( {"{}_train_merged".format(metric): logs[metric] for metric in metric_names} ) eval_train_original = model.evaluate_generator( self.train_original_gen, workers=self.workers ) all_metrics.update( { "{}_train_original".format(metric): value for metric, value in zip(metric_names, eval_train_original) } ) eval_val_original = model.evaluate_generator( self.val_original_gen, workers=self.workers ) all_metrics.update( { "{}_val_original".format(metric): value for metric, value in zip(metric_names, eval_val_original) } ) # Custom print( "\nComputing custom metrics on {} set!".format( "val" if self.use_validation else "train" ) ) gen = self.val_original_gen if self.use_validation else self.train_original_gen external_metrics_df = self.compute_external_metrics(model, gen) stats_dir = self.root_path / str(self.overall_batch) / "stats" stats_dir.mkdir(parents=True, exist_ok=True) external_metrics_df.to_csv(str(stats_dir / "stats.csv")) external_metrics_df.to_html(str(stats_dir / "stats.html")) external_metrics = dict(external_metrics_df.mean()) all_metrics.update(external_metrics) # log into mlflow mlflow.log_metrics(all_metrics, step=self.overall_batch) keras.models.save_model( model, str( self.root_path / "checkpoints" / "model_{}.h5".format(self.overall_batch) ), )
[docs] @staticmethod def compute_external_metrics(model, gen): """Compute external matrics sample by sample. Parameters ---------- model Keras model gen : SupervisedGenerator Generator Returns ------- metrics : dict Various metrics. """ # checks if gen.shuffle: raise ValueError("Shuffling is not allowed for external metrics!") if gen.batch_size != 1: raise ValueError("Batch size has to be 1 for external metrics") # Prepare annotation related stuff (load in RAM, small arrays) indexes = gen.indexes with h5py.File(gen.hdf_path, "r") as f: ps = f["p"][:][indexes] ids = f["image_id"][:][indexes] avol = annotation_volume() collapsing_labels = segmentation_collapsing_labels() external_metrics_per_sample = [] for i, p in enumerate(ps): sample = gen[i] # data[indexes[i]] if gen.return_inverse: img_mov = sample[0][0][0, ..., 1] deltas_true = sample[1][1][0] deltas_true_inv = sample[1][2][0] else: img_mov = sample[0][0, ..., 1] deltas_true = sample[1][1][0] deltas_true_inv = None deltas_pred = model.predict(sample[0])[1][0] deltas_pred_inv = None # we do not use the predicted one # compute external metrics res, images = evaluate_single( deltas_true, deltas_pred, img_mov, ds_f=8, # orig 8 p=p, deltas_true_inv=deltas_true_inv, deltas_pred_inv=deltas_pred_inv, avol=avol, collapsing_labels=collapsing_labels, depths=(0, 1, 2, 3, 4, 5, 6, 7, 8), ) external_metrics_per_sample.append(res) external_metrics_df = pd.DataFrame(external_metrics_per_sample, index=ids) return external_metrics_df