Source code for physicool.optimization

"""A module for model calibration and optimization routines."""
from dataclasses import dataclass, field
from pathlib import Path
import platform
import subprocess
import logging
from typing import List, Dict, Optional, Tuple, Union
from distutils.dir_util import copy_tree, remove_tree

import numpy as np

from physicool.updaters import ParamsUpdater
from physicool.processing import (
    OutputProcessor,
    ErrorQuantification,
    compute_mean_squared_error,
    NEW_OUTPUTS_VERSION,
)
from physicool.plotting import SweeperPlot


LOG_FILE = "debug.log"
logging.basicConfig(
    level=logging.WARNING,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%d-%b-%y %H:%M:%S",
    handlers=[logging.FileHandler(LOG_FILE), logging.StreamHandler()],
)


[docs]def _create_project_command(project_name: str) -> str: """ Creates a project command based on the current OS. Parameters ----------- project_name: The base name of the PhysiCell executable file. Returns -------- str The full command to be called in order to run the executable in the shell, adapted to the current OS. """ if platform.system() == "Windows": return f"{project_name}.exe" return f"./{project_name}"
[docs]def clean_outputs() -> None: """Removes the files from the output folder and creates it again (make data-cleanup).""" if Path("output").is_dir(): with open(LOG_FILE, "a") as log_file: subprocess.run( "make data-cleanup", shell=True, stdout=log_file, stderr=log_file, text=True, )
[docs]def clean_tmp_files() -> None: """Removes the temp folder if it exists.""" if Path("temp").is_dir(): remove_tree("temp")
[docs]def compile_project() -> None: """Compiles the current project by calling make.""" logging.info("compiling project...") with open(LOG_FILE, "a") as log_file: subprocess.run("make", shell=True, stdout=log_file, stderr=log_file, text=True)
[docs]@dataclass class PhysiCellBlackBox: """ A class to run PhysiCell models through Python as a black box. Models can be run as they are, and it's possible to run replicates. Alternatively, users can decide to include a ParamsUpdater object to change values in the XML file before the model is run and/or a OutputProcessor to extract data from the output files and return a given user-defined metric. Output files can be kept or discarded. If kept, they will be stored inside a new "temp" folder. """ updater: Optional[ParamsUpdater] = None processor: Optional[OutputProcessor] = None project_name: str = "project" project_command: str = field(init=False) version: str = NEW_OUTPUTS_VERSION
[docs] def __post_init__(self): """Create the right command to call the PhysiCell project based on the OS.""" self.project_command = _create_project_command(self.project_name)
[docs] def run( self, params: Optional[Dict[str, float]] = None, number_of_replicates: int = 1, keep_files: bool = True, ) -> Optional[np.ndarray]: """ Runs the black box pipeline. Parameters ---------- params The new parameter values, to be updated in the XML file by the ParamsUpdater class. number_of_replicates The number of simulation replicates to be run. keep_files If the output files should be stored in a tmp folder. Returns ------- Optional[np.ndarray] The output metrics computed by the OutputProcessor class """ # Create a new directory to store the output files if keep_files: storage_folder = "temp" Path(storage_folder).mkdir(exist_ok=True) # Update the XML configuration file with the passed values if (self.updater is not None) and (params is not None): self.updater.update(new_values=params) # Create an array to store the metrics computed by the processor if self.processor: output_metrics = [] # Run the PhysiCell model for each replicate # Create a new directory, run the model and save the files to this # location and compute and store the model output metrics for i in range(number_of_replicates): if (number_of_replicates > 1) & keep_files: storage_folder = f"temp/replicate{i}" Path(storage_folder).mkdir() log_status = f"running project with command {self.project_command}..." logging.info(log_status) with open(LOG_FILE, "a") as log_file: subprocess.run( self.project_command, shell=True, stdout=log_file, stderr=subprocess.PIPE, ) if self.processor: output_metrics.append(self.processor(version=self.version)) if keep_files: copy_tree("output", storage_folder) # Delete the files from the "output" folder clean_outputs() if self.processor: if number_of_replicates == 1: return output_metrics[0] return np.asarray(output_metrics)
[docs]def run_sweep( black_box: PhysiCellBlackBox, name: str, bounds: Tuple[float, float], step: float ) -> np.ndarray: input_values = np.arange(bounds[0], bounds[1], step) output_metrics = [] for value in input_values: output_metrics.append(black_box.run({name: value})) return np.asarray(output_metrics)
[docs]@dataclass class MultiLevelSweep: black_box: PhysiCellBlackBox target_data: np.ndarray n_levels: int points_dir: int percentage_dir: float parameters: List[str] error_estimator: ErrorQuantification = compute_mean_squared_error plotter: SweeperPlot = field(init=False) results: np.ndarray = field(init=False) current_level: int = field(init=False) current_opt_point: Tuple[float, float] = field(init=False) param_bounds: List[Tuple[Union[None, float], Union[None, float]]] = field( init=False )
[docs] def __post_init__(self) -> None: """Sets up everything needed for the sweeper.""" self.results = np.zeros((self.n_levels, self.points_dir, self.points_dir)) self.current_level = 0 self.current_opt_point = (0.0, 0.0) self.param_bounds = [(None, None), (None, None)] self.plotter = SweeperPlot()
[docs] def set_param_bounds( self, param1_bounds: Optional[Tuple[Union[None, float], Union[None, float]]] = None, param2_bounds: Optional[Tuple[Union[None, float], Union[None, float]]] = None, ) -> None: """Defines the parameter bounds to be considered when creating a parameter space.""" if param1_bounds is None: param1_bounds = (None, None) if param2_bounds is None: param2_bounds = (None, None) self.param_bounds = [param1_bounds, param2_bounds]
[docs] def get_parameter_range(self, factor: float, idx: int) -> np.ndarray: """Returns the parameter values to be tested based on the current level and bounds.""" min_limit = self.current_opt_point[idx] - factor * self.current_opt_point[idx] max_limit = self.current_opt_point[idx] + factor * self.current_opt_point[idx] if any(self.param_bounds[idx]): if self.param_bounds[idx][0] is not None: if min_limit < self.param_bounds[idx][0]: min_limit = self.param_bounds[idx][0] if self.param_bounds[idx][1] is not None: if max_limit > self.param_bounds[idx][1]: max_limit = self.param_bounds[idx][1] return np.linspace(min_limit, max_limit, self.points_dir)
[docs] def get_parameter_space(self): """Returns the parameter values to be tested at the current level.""" factor = self.percentage_dir / (self.current_level * 2 + 1) x = self.get_parameter_range(factor=factor, idx=0) y = self.get_parameter_range(factor=factor, idx=1) return x, y
[docs] def get_optimal_idx(self) -> Tuple[int, int]: """Returns the indexes for the smallest error in the current level.""" best_result = np.argmin(self.results[self.current_level]) i = int(np.floor(best_result / self.points_dir)) j = int(best_result - self.points_dir * i) return i, j
[docs] def compute_objective(self, x: np.ndarray, y: np.ndarray) -> None: """ Runs the black box for each cell of the parameter space defined by x and y. Also chooses the best optimal point found in the parameter space. """ for i, x_value in enumerate(x): for j, y_value in enumerate(y): clean_tmp_files() # Select parameters and run the model results = self.black_box.run( {self.parameters[0]: x_value, self.parameters[1]: y_value} ) # Compute error between simulated data and target data self.results[self.current_level][i][j] = self.error_estimator( results, self.target_data ) i, j = self.get_optimal_idx() self.current_opt_point = (x[i], y[j])
[docs] def run_level(self) -> None: """Run a single level of the multilevel sweep.""" # Find the new parameter space for the current level x, y = self.get_parameter_space() # Add the bounds of the parameter space to the plot self.plotter.add_bounds_to_ax(x, y, self.current_level) # RUn the model for each cell of the parameter space and save the results self.compute_objective(x, y) # Plot the results as a heatmap self.plotter.plot_level_results( x, y, self.current_level, self.results[self.current_level] )
[docs] def run_sweep(self, initial_point: Tuple[float, float]) -> Tuple[float, float]: """Runs all sweep levels from an initial value and returns the best values found.""" self.current_opt_point = initial_point self.plotter.set_up_plotter( n_levels=self.n_levels, param_labels=self.parameters ) while self.current_level < self.n_levels: self.run_level() self.current_level += 1 return self.current_opt_point[0], self.current_opt_point[1]