Source code for cil.optimisation.algorithms.Algorithm

# -*- coding: utf-8 -*-
#  Copyright 2019 United Kingdom Research and Innovation
#  Copyright 2019 The University of Manchester
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# Authors:
# CIL Developers, listed at: https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt

import time, functools
from numbers import Integral, Number
import logging
import numpy as np

[docs]class Algorithm(object): '''Base class for iterative algorithms provides the minimal infrastructure. Algorithms are iterables so can be easily run in a for loop. They will stop as soon as the stop criterion is met. The user is required to implement the :code:`set_up`, :code:`__init__`, :code:`update` and and :code:`update_objective` methods A courtesy method :code:`run` is available to run :code:`n` iterations. The method accepts a :code:`callback` function that receives the current iteration number and the actual objective value and can be used to trigger print to screens and other user interactions. The :code:`run` method will stop when the stopping criterion is met. '''
[docs] def __init__(self, **kwargs): '''Constructor Set the minimal number of parameters: :param max_iteration: maximum number of iterations :type max_iteration: int, optional, default 0 :param update_objective_interval: the interval every which we would save the current\ objective. 1 means every iteration, 2 every 2 iteration\ and so forth. This is by default 1 and should be increased\ when evaluating the objective is computationally expensive. :type update_objective_interval: int, optional, default 1 :param log_file: log verbose output to file :type log_file: str, optional, default None ''' self.iteration = -1 self.__max_iteration = kwargs.get('max_iteration', 0) self.__loss = [] self.memopt = False self.configured = False self.timing = [] self._iteration = [] self.update_objective_interval = kwargs.get('update_objective_interval', 1) # self.x = None self.iter_string = 'Iter' self.logger = None self.__set_up_logger(kwargs.get('log_file', None))
[docs] def set_up(self, *args, **kwargs): '''Set up the algorithm''' raise NotImplementedError()
[docs] def update(self): '''A single iteration of the algorithm''' raise NotImplementedError()
[docs] def should_stop(self): '''default stopping criterion: number of iterations The user can change this in concrete implementation of iterative algorithms.''' return self.max_iteration_stop_criterion()
def __set_up_logger(self, fname): """Set up the logger if desired""" if fname: print("Will output results to: " + fname) handler = logging.FileHandler(fname) self.logger = logging.getLogger("obj_fn") self.logger.setLevel(logging.INFO) self.logger.addHandler(handler)
[docs] def max_iteration_stop_criterion(self): '''default stop criterion for iterative algorithm: max_iteration reached''' return self.iteration > self.max_iteration
[docs] def __iter__(self): '''Algorithm is an iterable''' return self
[docs] def next(self): '''Algorithm is an iterable python2 backwards compatibility''' return self.__next__()
[docs] def __next__(self): '''Algorithm is an iterable calling this method triggers update and update_objective ''' if self.should_stop(): raise StopIteration() else: if self.iteration == -1 and self.update_objective_interval > 0: self._iteration.append(self.iteration) self.update_objective() self.iteration += 1 return time0 = time.time() if not self.configured: raise ValueError('Algorithm not configured correctly. Please run set_up.') self.update() self.timing.append( time.time() - time0 ) self.iteration += 1 self._update_previous_solution() if self.iteration >= 0 and self.update_objective_interval > 0 and\ self.iteration % self.update_objective_interval == 0: self._iteration.append(self.iteration) self.update_objective()
[docs] def _update_previous_solution(self): """ Update the previous solution with the current one The concrete algorithm calls update_previous_solution. Normally this would entail the swapping of pointers: .. highlight:: python .. code-block:: python tmp = self.x_old self.x_old = self.x self.x = tmp """ pass
[docs] def get_output(self): " Returns the current solution. " return self.x
def _provable_convergence_condition(self): raise NotImplementedError(" Convergence criterion is not implemented for this algorithm. ")
[docs] def is_provably_convergent(self): """ Check if the algorithm is convergent based on the provable convergence criterion. """ return self._provable_convergence_condition()
@property def solution(self): return self.get_output()
[docs] def get_last_loss(self, **kwargs): '''Returns the last stored value of the loss function if update_objective_interval is 1 it is the value of the objective at the current iteration. If update_objective_interval > 1 it is the last stored value. ''' return_all = kwargs.get('return_all', False) try: objective = self.__loss[-1] except IndexError as ie: objective = [np.nan, np.nan, np.nan] if return_all else np.nan if isinstance (objective, list): if return_all: return objective else: return objective[0] else: if return_all: return [ objective, np.nan, np.nan] else: return objective
[docs] def get_last_objective(self, **kwargs): '''alias to get_last_loss''' return self.get_last_loss(**kwargs)
[docs] def update_objective(self): '''calculates the objective with the current solution''' raise NotImplementedError()
@property def iterations(self): '''returns the iterations at which the objective has been evaluated''' return self._iteration @property def loss(self): '''returns the list of the values of the objective during the iteration The length of this list may be shorter than the number of iterations run when the update_objective_interval > 1 ''' return self.__loss @property def objective(self): '''alias of loss''' return self.loss @property def max_iteration(self): '''gets the maximum number of iterations''' return self.__max_iteration @max_iteration.setter def max_iteration(self, value): '''sets the maximum number of iterations''' assert isinstance(value, int) self.__max_iteration = value @property def update_objective_interval(self): return self.__update_objective_interval @update_objective_interval.setter def update_objective_interval(self, value): if isinstance(value, Integral): if value >= 0: self.__update_objective_interval = value else: raise ValueError('Update objective interval must be an integer >= 0') else: raise ValueError('Update objective interval must be an integer >= 0')
[docs] def run(self, iterations=None, verbose=1, callback=None, **kwargs): '''run n iterations and update the user with the callback if specified :param iterations: number of iterations to run. If not set the algorithm will run until max_iteration or until stop criterion is reached :param verbose: sets the verbosity output to screen, 0 no verbose, 1 medium, 2 highly verbose :param callback: is a function that receives: current iteration number, last objective function value and the current solution and gets executed at each update_objective_interval :param print_interval: integer, controls every how many iteration there's a print to screen. Notice that printing will not evaluate the objective function and so the print might be out of sync wrt the calculation of the objective. In such cases nan will be printed. ''' print_interval = kwargs.get('print_interval', self.update_objective_interval) if print_interval > self.update_objective_interval: print_interval = self.update_objective_interval if verbose == 0: verbose = False very_verbose = False elif verbose == 1: verbose = True very_verbose = False elif verbose == 2: verbose = True very_verbose = True else: raise ValueError("verbose should be 0, 1 or 2. Got {}".format (verbose)) if self.should_stop(): print ("Stop criterion has been reached.") if iterations is None : iterations = self.max_iteration if verbose: print (self.verbose_header(very_verbose)) if self.iteration == -1 and self.update_objective_interval>0: iterations+=1 else: print (self.verbose_output(very_verbose)) for i in range(iterations): try: self.__next__() except StopIteration: break if self.update_objective_interval > 0 and\ self.iteration % self.update_objective_interval == 0: if callback is not None: callback(self.iteration, self.get_last_objective(return_all=very_verbose), self.x) if verbose: if (print_interval != 0 and self.iteration % print_interval == 0) or \ ( self.update_objective_interval != 0 and self.iteration % self.update_objective_interval == 0): print (self.verbose_output(very_verbose)) if verbose: start = 3 # I don't understand why this bars = ['-' for i in range(start+9+10+13+20)] if (very_verbose): bars = ['-' for i in range(start+9+10+13+13+13+15)] # print a nice ---- with proper length at the end # print (functools.reduce(lambda x,y: x+y, bars, '')) out = "{}\n{}\n{}\n".format(functools.reduce(lambda x,y: x+y, bars, '') , self.verbose_output(very_verbose), "Stop criterion has been reached.") print (out) # Print to log file if desired if self.logger: self.logger.info(out)
[docs] def verbose_output(self, verbose=False): '''Creates a nice tabulated output''' timing = self.timing if len (timing) == 0: t = 0 else: t = sum(timing)/len(timing) out = "{:>9} {:>10} {:>13} {}".format( self.iteration, self.max_iteration, "{:.3f}".format(t), self.objective_to_string(verbose) ) # Print to log file if desired if self.logger: self.logger.info(out) return out
def objective_to_string(self, verbose=False): el = self.get_last_objective(return_all=verbose) if self.update_objective_interval == 0 or \ self.iteration % self.update_objective_interval != 0: el = [ np.nan, np.nan, np.nan] if verbose else np.nan if isinstance (el, list): if np.isnan(el[0]): string = functools.reduce(lambda x,y: x+' {:>13s}'.format(''), el[:-1],'') elif not np.isnan(el[0]) and np.isnan(el[1]): string = ' {:>13.5e}'.format(el[0]) string += ' {:>13s}'.format('') else: string = functools.reduce(lambda x,y: x+' {:>13.5e}'.format(y), el[:-1],'') if np.isnan(el[-1]): string += '{:>15s}'.format('') else: string += '{:>15.5e}'.format(el[-1]) else: if np.isnan(el): string = '{:>20s}'.format('') else: string = "{:>20.5e}".format(el) return string def verbose_header(self, verbose=False): el = self.get_last_objective(return_all=verbose) if type(el) == list: out = "{:>9} {:>10} {:>13} {:>13} {:>13} {:>15}\n".format(self.iter_string, 'Max {}'.format(self.iter_string), 'Time/{}'.format(self.iter_string), 'Primal' , 'Dual', 'Primal-Dual') out += "{:>9} {:>10} {:>13} {:>13} {:>13} {:>15}".format('', '', '[s]', 'Objective' , 'Objective', 'Gap') else: out = "{:>9} {:>10} {:>13} {:>20}\n".format(self.iter_string, 'Max {}'.format(self.iter_string), 'Time/{}'.format(self.iter_string), 'Objective') out += "{:>9} {:>10} {:>13} {:>20}".format('', '', '[s]', '') # Print to log file if desired if self.logger: self.logger.info(out) return out