Source code for cil.optimisation.operators.Operator

#  Copyright 2019 United Kingdom Research and Innovation
#  Copyright 2019 The University of Manchester
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# Authors:
# CIL Developers, listed at: https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt

from numbers import Number
from textwrap import dedent
import numpy
import functools
import logging
import warnings

log = logging.getLogger(__name__)


[docs] class Operator(object): """ Operator that maps from a space X -> Y Parameters ---------- domain_geometry : ImageGeometry or AcquisitionGeometry domain of the operator range_geometry : ImageGeometry or AcquisitionGeometry, optional, default None range of the operator """ def __init__(self, domain_geometry, **kwargs): self._norm = None self._domain_geometry = domain_geometry self._range_geometry = kwargs.get('range_geometry', None)
[docs] def is_linear(self): '''Returns if the operator is linear Returns ------- `Bool` ''' return False
[docs] def is_orthogonal(self): '''Returns if the operator is orthogonal Returns ------- `Bool` ''' return False
[docs] def direct(self, x, out=None): r"""Calls the operator Parameters ---------- x: DataContainer or BlockDataContainer Element in the domain of the Operator out: DataContainer or BlockDataContainer, default None If out is not None the output of the Operator will be filled in out, otherwise a new object is instantiated and returned. Returns ------- DataContainer or BlockDataContainer containing the result. """ raise NotImplementedError
[docs] def norm(self, **kwargs): '''Returns the norm of the Operator. On first call the norm will be calculated using the operator's calculate_norm method. Subsequent calls will return the cached norm. Returns ------- norm: positive:`float` ''' if len(kwargs) != 0: warnings.warn(dedent("""\ norm: the norm method does not use any parameters. For LinearOperators you can use PowerMethod to calculate the norm with non-default parameters and use set_norm to set it"""), DeprecationWarning, stacklevel=2) if self._norm is None: self._norm = self.calculate_norm() return self._norm
[docs] def set_norm(self, norm=None): '''Sets the norm of the operator to a custom value. Parameters --------- norm: float, optional Positive real valued number or `None` Note ---- The passed values are cached so that when self.norm() is called, the saved value will be returned and not calculated via the power method. If `None` is passed, the cache is cleared prompting the function to call the power method to calculate the norm the next time self.norm() is called. ''' if norm is not None: if isinstance(norm, Number): if norm <= 0: raise ValueError( "Norm must be a positive real valued number or None, got {}".format(norm)) else: raise TypeError( "Norm must be a number or None, got {} of type {}".format(norm, type(norm))) self._norm = norm
[docs] def calculate_norm(self): '''Returns the norm of the Operator. Note that this gives a NotImplementedError if the SumOperator is not linear. Returns ------- Scalar: the norm of the Operator ''' if self.is_linear(): return LinearOperator.calculate_norm(self) return NotImplementedError
[docs] def range_geometry(self): '''Returns the range of the Operator: Y space''' return self._range_geometry
[docs] def domain_geometry(self): '''Returns the domain of the Operator: X space''' return self._domain_geometry
@property def domain(self): return self.domain_geometry() @property def range(self): return self.range_geometry() def __rmul__(self, scalar): '''Defines the multiplication by a scalar on the left returns a ScaledOperator''' return ScaledOperator(self, scalar) def compose(self, *other, **kwargs): # TODO: check equality of domain and range of operators # if self.operator2.range_geometry != self.operator1.domain_geometry: # raise ValueError('Cannot compose operators, check domain geometry of {} and range geometry of {}'.format(self.operator1,self.operator2)) return CompositionOperator(self, *other, **kwargs) def __add__(self, other): return SumOperator(self, other) def __mul__(self, scalar): return self.__rmul__(scalar) def __neg__(self): """ Return -self """ return -1 * self def __sub__(self, other): """ Returns the subtraction of the operators.""" return self + (-1) * other
[docs] class LinearOperator(Operator): """ Linear operator that maps from a space X <-> Y Parameters ---------- domain_geometry : ImageGeometry or AcquisitionGeometry domain of the operator range_geometry : ImageGeometry or AcquisitionGeometry, optional, default None range of the operator """ def __init__(self, domain_geometry, **kwargs): super(LinearOperator, self).__init__(domain_geometry, **kwargs)
[docs] def is_linear(self): '''Returns if the operator is linear''' return True
[docs] def adjoint(self, x, out=None): '''Returns the adjoint/inverse operation evaluated at the point :math:`x` Parameters ---------- x: DataContainer or BlockDataContainer Element in the domain of the Operator out: DataContainer or BlockDataContainer, default None If out is not None the output of the Operator will be filled in out, otherwise a new object is instantiated and returned. Returns ------- DataContainer or BlockDataContainer containing the result. Note ---- Only available to linear operators''' raise NotImplementedError
[docs] @staticmethod def PowerMethod(operator, max_iteration=10, initial=None, tolerance=1e-5, return_all=False, method='auto'): r"""Power method or Power iteration algorithm The Power method computes the largest (dominant) eigenvalue of a matrix in magnitude, e.g., absolute value in the real case and modulus in the complex case. Parameters ---------- operator: LinearOperator max_iteration: positive:`int`, default=10 Number of iterations for the Power method algorithm. initial: DataContainer, default = None Starting point for the Power method. tolerance: positive:`float`, default = 1e-5 Stopping criterion for the Power method. Check if two consecutive eigenvalue evaluations are below the tolerance. return_all: `boolean`, default = False Toggles the verbosity of the return method: `string` one of `"auto"`, `"composed_with_adjoint"` and `"direct_only"`, default = `"auto"` The default `auto` lets the code choose the method, this can be specified with `"direct_only"` or `"composed_with_adjoint"` Returns ------- dominant eigenvalue: positive:`float` number of iterations: positive:`int` Number of iterations run. Only returned if return_all is True. eigenvector: DataContainer Corresponding eigenvector of the dominant eigenvalue. Only returned if return_all is True. list of eigenvalues: :obj:`list` List of eigenvalues. Only returned if return_all is True. convergence: `boolean` Check on wether the difference between the last two iterations is less than tolerance. Only returned if return_all is True. Note ----- The power method contains two different algorithms chosen by the `method` flag. In the case `method="direct_only"`, for operator, :math:`A`, the power method computes the iterations :math:`x_{k+1} = A (x_k/\|x_{k}\|)` initialised with a random vector :math:`x_0` and returning the largest (dominant) eigenvalue in magnitude given by :math:`\|x_k\|`. In the case `method="composed_with_adjoint"`, the algorithm computes the largest (dominant) eigenvalue of :math:`A^{T}A` returning the square root of this value, i.e. the iterations: :math:`x_{k+1} = A^TA (x_k/\|x_{k}\|)` and returning :math:`\sqrt{\|x_k\|}`. The default flag is `method="auto"`, the algorithm checks to see if the `operator.domain_geometry() == operator.range_geometry()` and if so uses the method "direct_only" and if not the method "composed_with_adjoint". Examples -------- >>> M = np.array([[1.,0],[1.,2.]]) >>> Mop = MatrixOperator(M) >>> Mop_norm = Mop.PowerMethod(Mop) >>> Mop_norm 2.0000654846240296 `PowerMethod` is called when we compute the norm of a matrix or a `LinearOperator`. >>> Mop_norm = Mop.norm() 2.0005647295658866 """ allowed_methods = ["auto", "direct_only", "composed_with_adjoint"] if method not in allowed_methods: raise ValueError("The argument 'method' can be set to one of {0} got {1}".format( allowed_methods, method)) apply_adjoint = True if method == "direct_only": apply_adjoint = False if method == "auto": try: geometries_match = operator.domain_geometry() == operator.range_geometry() except AssertionError: # catch AssertionError for SIRF objects https://github.com/SyneRBI/SIRF-SuperBuild/runs/5110228626?check_suite_focus=true#step:8:972 pass else: if geometries_match: apply_adjoint = False if initial is None: x0 = operator.domain_geometry().allocate('random') else: x0 = initial.copy() y_tmp = operator.range_geometry().allocate() # Normalize first eigenvector x0_norm = x0.norm() x0 /= x0_norm # initial guess for dominant eigenvalue eig_old = 1. if return_all: eig_list = [] convergence_check = True diff = numpy.finfo('d').max i = 0 while (i < max_iteration and diff > tolerance): operator.direct(x0, out=y_tmp) if not apply_adjoint: # swap datacontainer references tmp = x0 x0 = y_tmp y_tmp = tmp else: operator.adjoint(y_tmp, out=x0) # Get eigenvalue using Rayleigh quotient: denominator=1, due to normalization x0_norm = x0.norm() if x0_norm < tolerance: log.warning( "The operator has at least one zero eigenvector and is likely to be nilpotent") eig_new = 0. break x0 /= x0_norm eig_new = numpy.abs(x0_norm) if apply_adjoint: eig_new = numpy.sqrt(eig_new) diff = numpy.abs(eig_new - eig_old) if return_all: eig_list.append(eig_new) eig_old = eig_new i += 1 if return_all and i == max_iteration: convergence_check = False if return_all: return eig_new, i, x0, eig_list, convergence_check else: return eig_new
[docs] def calculate_norm(self): r""" Returns the norm of the LinearOperator calculated by the PowerMethod with default values. """ return LinearOperator.PowerMethod(self, method="composed_with_adjoint")
[docs] @staticmethod def dot_test(operator, domain_init=None, range_init=None, tolerance=1e-6, **kwargs): r'''Does a dot linearity test on the operator Evaluates if the following equivalence holds .. math:: Ax\times y = y \times A^Tx Parameters ---------- operator: operator to test the dot_test range_init: optional initialisation container in the operator range domain_init: optional initialisation container in the operator domain seed: int, default = 1 Seed random generator tolerance:float, default 1e-6 Check if the following expression is below the tolerance .. math:: |Ax\times y - y \times A^Tx|/(\|A\|\|x\|\|y\| + 1e-12) < tolerance Returns ------- boolean, True if the test is passed. ''' seed = kwargs.get('seed', 1) if range_init is None: y = operator.range_geometry().allocate('random', seed=seed + 10) else: y = range_init if domain_init is None: x = operator.domain_geometry().allocate('random', seed=seed) else: x = domain_init fx = operator.direct(x) by = operator.adjoint(y) lhs = fx.dot(y) rhs = x.dot(by) # Check relative tolerance but normalised with respect to # operator, x and y norms and avoid zero division error = numpy.abs(lhs - rhs) / (operator.norm()*x.norm()*y.norm() + 1e-12) if error < tolerance: return True else: print('Left hand side {}, \nRight hand side {}'.format(lhs, rhs)) return False
class AdjointOperator(LinearOperator): """ The Adjoint operator :math:`A^{*}: Y^{*}\rightarrow X^{*}` of a linear operator :math:`A: X\rightarrow Y` defined as .. math:: <x, A^* y> = <Ax, y> Parameters ---------- operator : A linear operator Examples -------- This example demonstrates that :math:` LHS:=<Gx, y> =<x, G^* y>=:RHS`, where :math:`G` is the gradient operator. >>> ig = ImageGeometry(2,3) >>> G = GradientOperator(ig) >>> div = AdjointOperator(G) >>> x = G.domain.allocate("random_int") >>> y = G.range.allocate("random_int") >>> lhs = G.direct(x).dot(y) >>> rhs = x.dot(div.direct(y)) >>> lhs == rhs # returns True """ def __init__(self, operator): super(AdjointOperator, self).__init__(domain_geometry=operator.range_geometry(), range_geometry=operator.domain_geometry()) self.operator = operator def direct(self, x, out=None): return self.operator.adjoint(x, out=out) def adjoint(self, x, out=None): return self.operator.direct(x, out=out)
[docs] class ScaledOperator(Operator): '''ScaledOperator A class to represent the scalar multiplication of an Operator with a scalar. It holds an operator and a scalar. Basically it returns the multiplication of the result of direct and adjoint of the operator with the scalar. For the rest it behaves like the operator it holds. Parameters ----------- operator: a `Operator` or `LinearOperator` scalar: Number a scalar multiplier Example -------- The scaled operator behaves like the following: .. code-block:: python sop = ScaledOperator(operator, scalar) sop.direct(x) = scalar * operator.direct(x) sop.adjoint(x) = scalar * operator.adjoint(x) sop.norm() = operator.norm() sop.range_geometry() = operator.range_geometry() sop.domain_geometry() = operator.domain_geometry() ''' def __init__(self, operator, scalar, **kwargs): super(ScaledOperator, self).__init__(domain_geometry=operator.domain_geometry(), range_geometry=operator.range_geometry()) if not isinstance(scalar, Number): raise TypeError('expected scalar: got {}'.format(type(scalar))) self.scalar = scalar self.operator = operator
[docs] def direct(self, x, out=None): '''direct method''' tmp = self.operator.direct(x, out=out) tmp *= self.scalar return tmp
[docs] def adjoint(self, x, out=None): '''adjoint method''' if not self.operator.is_linear(): raise TypeError('Operator is not linear') tmp = self.operator.adjoint(x, out=out) tmp *= self.scalar return tmp
[docs] def norm(self, **kwargs): '''norm of the operator''' return numpy.abs(self.scalar) * self.operator.norm(**kwargs)
[docs] def is_linear(self): '''returns a `boolean` indicating whether the operator is linear ''' return self.operator.is_linear()
############################################################################### ################ SumOperator ########################################### ###############################################################################
[docs] class SumOperator(Operator): """Sums two operators. For example, `SumOperator(left, right).direct(x)` is equivalent to `left.direct(x)+right.direct(x)` Parameters ---------- operator1: `Operator` The first `Operator` in the sum operator2: `Operator` The second `Operator` in the sum Note ---- Both operators must have the same domain and range. """ def __init__(self, operator1, operator2): self.operator1 = operator1 self.operator2 = operator2 # if self.operator1.domain_geometry() != self.operator2.domain_geometry(): # raise ValueError('Domain geometry of {} is not equal with domain geometry of {}'.format(self.operator1.__class__.__name__,self.operator2.__class__.__name__)) # if self.operator1.range_geometry() != self.operator2.range_geometry(): # raise ValueError('Range geometry of {} is not equal with range geometry of {}'.format(self.operator1.__class__.__name__,self.operator2.__class__.__name__)) self.linear_flag = self.operator1.is_linear() and self.operator2.is_linear() super(SumOperator, self).__init__(domain_geometry=self.operator1.domain_geometry(), range_geometry=self.operator1.range_geometry())
[docs] def direct(self, x, out=None): r"""Calls the sum operator Parameters ---------- x: DataContainer or BlockDataContainer Element in the domain of the SumOperator out: DataContainer or BlockDataContainer, default None If out is not None the output of the SumOperator will be filled in out, otherwise a new object is instantiated and returned. Returns ------- DataContainer or BlockDataContainer containing the result. """ ret = self.operator1.direct(x, out=out) ret.add(self.operator2.direct(x), out=ret) return ret
[docs] def adjoint(self, x, out=None): r"""Calls the adjoint of the sum operator, evaluated at the point :math:`x`. Parameters ---------- x: DataContainer or BlockDataContainer Element in the range of the SumOperator out: DataContainer or BlockDataContainer, default None If out is not None the output of the adjoint of the SumOperator will be filled in out, otherwise a new object is instantiated and returned. Returns ------- DataContainer or BlockDataContainer containing the result. """ if not self.linear_flag: raise ValueError('No adjoint operation with non-linear operators') ret = self.operator1.adjoint(x, out=out) ret.add(self.operator2.adjoint(x), out=ret) return ret
[docs] def is_linear(self): return self.linear_flag
############################################################################### ################ Composition ########################################### ###############################################################################
[docs] class CompositionOperator(Operator): """Composes one or more operators. For example, `CompositionOperator(left, right).direct(x)` is equivalent to `left.direct(right.direct(x))` Parameters ---------- args: `Operator`s Operators to be composed. As in mathematical notation, the operators will be applied right to left """ def __init__(self, *operators, **kwargs): # get a reference to the operators self.operators = operators self.linear_flag = functools.reduce(lambda x, y: x and y.is_linear(), self.operators, True) # self.preallocate = kwargs.get('preallocate', False) self.preallocate = False if self.preallocate: self.tmp_domain = [op.domain_geometry().allocate() for op in self.operators[:-1]] self.tmp_range = [op.range_geometry().allocate() for op in self.operators[1:]] # pass # TODO address the equality of geometries # if self.operator2.range_geometry() != self.operator1.domain_geometry(): # raise ValueError('Domain geometry of {} is not equal with range geometry of {}'.format(self.operator1.__class__.__name__,self.operator2.__class__.__name__)) super(CompositionOperator, self).__init__( domain_geometry=self.operators[-1].domain_geometry(), range_geometry=self.operators[0].range_geometry())
[docs] def direct(self, x, out=None): """Calls the composition operator at the point :math:`x`. Parameters ---------- x: DataContainer or BlockDataContainer Element in the domain of the CompositionOperator out: DataContainer or BlockDataContainer, default None If out is not None the output of the CompositionOperator will be filled in out, otherwise a new object is instantiated and returned. Returns ------- DataContainer or BlockDataContainer containing the result. """ if out is None: # return self.operator1.direct(self.operator2.direct(x)) # return functools.reduce(lambda X,operator: operator.direct(X), # self.operators[::-1][1:], # self.operators[-1].direct(x)) if self.preallocate: pass else: for i, operator in enumerate(self.operators[::-1]): if i == 0: step = operator.direct(x) else: step = operator.direct(step) return step else: # tmp = self.operator2.range_geometry().allocate() # self.operator2.direct(x, out = tmp) # self.operator1.direct(tmp, out = out) # out.fill ( # functools.reduce(lambda X,operator: operator.direct(X), # self.operators[::-1][1:], # self.operators[-1].direct(x)) # ) # TODO this is a bit silly but will handle the pre allocation later if self.preallocate: for i, operator in enumerate(self.operators[::-1]): if i == 0: operator.direct(x, out=self.tmp_range[i]) elif i == len(self.operators) - 1: operator.direct(self.tmp_range[i-1], out=out) else: operator.direct( self.tmp_range[i-1], out=self.tmp_range[i]) else: for i, operator in enumerate(self.operators[::-1]): if i == 0: step = operator.direct(x) else: step = operator.direct(step) out.fill(step) return out
[docs] def adjoint(self, x, out=None): """Calls the adjoint of the composition operator at the point :math:`x`. Parameters ---------- x: DataContainer or BlockDataContainer Element in the range of the CompositionOperator out: DataContainer or BlockDataContainer, default None If out is not None the output of the adjoint of the CompositionOperator will be filled in out, otherwise a new object is instantiated and returned. Returns ------- DataContainer or BlockDataContainer containing the result. """ if self.linear_flag: if out is not None: # return self.operator2.adjoint(self.operator1.adjoint(x)) # return functools.reduce(lambda X,operator: operator.adjoint(X), # self.operators[1:], # self.operators[0].adjoint(x)) if self.preallocate: for i, operator in enumerate(self.operators): if i == 0: operator.adjoint(x, out=self.tmp_domain[i]) elif i == len(self.operators) - 1: step = operator.adjoint( self.tmp_domain[i-1], out=out) else: operator.adjoint( self.tmp_domain[i-1], out=self.tmp_domain[i]) return else: for i, operator in enumerate(self.operators): if i == 0: step = operator.adjoint(x) else: step = operator.adjoint(step) out.fill(step) return out else: if self.preallocate: pass else: for i, operator in enumerate(self.operators): if i == 0: step = operator.adjoint(x) else: step = operator.adjoint(step) return step else: raise ValueError('No adjoint operation with non-linear operators')
[docs] def is_linear(self): return self.linear_flag