Source code for cil.framework.block

#  Copyright 2019 United Kingdom Research and Innovation
#  Copyright 2019 The University of Manchester
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# Authors:
# CIL Developers, listed at: https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt
import functools
import warnings
from numbers import Number

import numpy

from ..utilities.multiprocessing import NUM_THREADS
from .labels import FillType


[docs] class BlockGeometry(object): @property def RANDOM(self): warnings.warn("use FillType.RANDOM instead", DeprecationWarning, stacklevel=2) return FillType.RANDOM @property def RANDOM_INT(self): warnings.warn("use FillType.RANDOM_INT instead", DeprecationWarning, stacklevel=2) return FillType.RANDOM_INT @property def dtype(self): return tuple(i.dtype for i in self.geometries) '''Class to hold Geometry as column vector''' #__array_priority__ = 1 def __init__(self, *args, **kwargs): '''''' self.geometries = args self.index = 0 shape = (len(args),1) self.shape = shape n_elements = functools.reduce(lambda x,y: x*y, shape, 1) if len(args) != n_elements: raise ValueError( 'Dimension and size do not match: expected {} got {}' .format(n_elements, len(args)))
[docs] def get_item(self, index): '''returns the Geometry in the BlockGeometry located at position index''' return self.geometries[index]
[docs] def allocate(self, value=0, **kwargs): '''Allocates a BlockDataContainer according to geometries contained in the BlockGeometry''' symmetry = kwargs.get('symmetry',False) containers = [geom.allocate(value, **kwargs) for geom in self.geometries] if symmetry == True: # for 2x2 # [ ig11, ig12\ # ig21, ig22] # Row-wise Order if len(containers)==4: containers[1]=containers[2] # for 3x3 # [ ig11, ig12, ig13\ # ig21, ig22, ig23\ # ig31, ig32, ig33] elif len(containers)==9: containers[1]=containers[3] containers[2]=containers[6] containers[5]=containers[7] # for 4x4 # [ ig11, ig12, ig13, ig14\ # ig21, ig22, ig23, ig24\ c # ig31, ig32, ig33, ig34 # ig41, ig42, ig43, ig44] elif len(containers) == 16: containers[1]=containers[4] containers[2]=containers[8] containers[3]=containers[12] containers[6]=containers[9] containers[7]=containers[10] containers[11]=containers[15] return BlockDataContainer(*containers)
def __iter__(self): '''BlockGeometry is an iterable''' return self def __next__(self): '''BlockGeometry is an iterable''' if self.index < len(self.geometries): result = self.geometries[self.index] self.index += 1 return result else: self.index = 0 raise StopIteration def __eq__(self, value: object) -> bool: if len(self.geometries) != len(value.geometries): return False return functools.reduce(lambda x,y: x and y, \ [sel == vel for sel,vel in zip(self.geometries, value.geometries)], True)
[docs] class BlockDataContainer(object): '''Class to hold DataContainers as column vector Provides basic algebra between BlockDataContainer's, DataContainer's and subclasses and Numbers 1) algebra between `BlockDataContainer`s will be element-wise, only if the shape of the 2 `BlockDataContainer`s is the same, otherwise it will fail 2) algebra between `BlockDataContainer`s and `list` or `numpy array` will work as long as the number of `rows` and element of the arrays match, independently on the fact that the `BlockDataContainer` could be nested 3) algebra between `BlockDataContainer` and one `DataContainer` is possible. It will require all the `DataContainers` in the block to be compatible with the `DataContainer` we want to operate with. 4) algebra between `BlockDataContainer` and a `Number` is possible and it will be done with each element of the `BlockDataContainer` even if nested A = [ [B,C] , D] A * 3 = [ 3 * [B,C] , 3* D] = [ [ 3*B, 3*C] , 3*D ] ''' ADD = 'add' SUBTRACT = 'subtract' MULTIPLY = 'multiply' DIVIDE = 'divide' POWER = 'power' SAPYB = 'sapyb' MAXIMUM = 'maximum' MINIMUM = 'minimum' ABS = 'abs' SIGN = 'sign' SQRT = 'sqrt' CONJUGATE = 'conjugate' __array_priority__ = 1 __container_priority__ = 2 @property def dtype(self): return tuple(i.dtype for i in self.containers) def __init__(self, *args, **kwargs): '''''' self.containers = args self.index = 0 #if len(set([i.shape for i in self.containers])): # self.geometry = self.containers[0].geometry shape = kwargs.get('shape', None) if shape is None: shape = (len(args),1) # shape = (len(args),1) self.shape = shape n_elements = functools.reduce(lambda x,y: x*y, shape, 1) if len(args) != n_elements: raise ValueError( 'Dimension and size do not match: expected {} got {}' .format(n_elements, len(args)))
[docs] def __iter__(self): '''BlockDataContainer is Iterable''' self.index=0 return self
[docs] def next(self): '''python2 backwards compatibility''' return self.__next__()
def __next__(self): try: out = self[self.index] except IndexError as ie: raise StopIteration() self.index+=1 return out
[docs] def is_compatible(self, other): '''basic check if the size of the 2 objects fit''' if isinstance(other, Number): return True elif isinstance(other, (list, tuple, numpy.ndarray)) : for ot in other: if not isinstance(ot, Number): raise ValueError('List/ numpy array can only contain numbers {}'\ .format(type(ot))) return len(self.containers) == len(other) elif isinstance(other, BlockDataContainer): return len(self.containers) == len(other.containers) else: # this should work for other as DataContainers and children ret = True for i, el in enumerate(self.containers): if isinstance(el, BlockDataContainer): a = el.is_compatible(other) else: a = el.shape == other.shape ret = ret and a # probably will raise return ret
def get_item(self, row): if row > self.shape[0]: raise ValueError('Requested row {} > max {}'.format(row, self.shape[0])) return self.containers[row] def __getitem__(self, row): return self.get_item(row)
[docs] def add(self, other, *args, **kwargs): '''Algebra: add method of BlockDataContainer with number/DataContainer or BlockDataContainer :param: other (number, DataContainer or subclasses or BlockDataContainer :param: out (optional): provides a placehold for the resul. ''' return self.binary_operations(BlockDataContainer.ADD, other, *args, **kwargs)
[docs] def subtract(self, other, *args, **kwargs): '''Algebra: subtract method of BlockDataContainer with number/DataContainer or BlockDataContainer :param: other (number, DataContainer or subclasses or BlockDataContainer :param: out (optional): provides a placeholder for the result. ''' return self.binary_operations(BlockDataContainer.SUBTRACT, other, *args, **kwargs)
[docs] def multiply(self, other, *args, **kwargs): '''Algebra: multiply method of BlockDataContainer with number/DataContainer or BlockDataContainer :param: other (number, DataContainer or subclasses or BlockDataContainer) :param: out (optional): provides a placeholder for the result. ''' return self.binary_operations(BlockDataContainer.MULTIPLY, other, *args, **kwargs)
[docs] def divide(self, other, *args, **kwargs): '''Algebra: divide method of BlockDataContainer with number/DataContainer or BlockDataContainer :param: other (number, DataContainer or subclasses or BlockDataContainer) :param: out (optional): provides a placeholder for the result. ''' return self.binary_operations(BlockDataContainer.DIVIDE, other, *args, **kwargs)
[docs] def power(self, other, *args, **kwargs): '''Algebra: power method of BlockDataContainer with number/DataContainer or BlockDataContainer :param: other (number, DataContainer or subclasses or BlockDataContainer :param: out (optional): provides a placeholder for the result. ''' return self.binary_operations(BlockDataContainer.POWER, other, *args, **kwargs)
[docs] def maximum(self, other, *args, **kwargs): '''Algebra: power method of BlockDataContainer with number/DataContainer or BlockDataContainer :param: other (number, DataContainer or subclasses or BlockDataContainer) :param: out (optional): provides a placeholder for the result. ''' return self.binary_operations(BlockDataContainer.MAXIMUM, other, *args, **kwargs)
[docs] def minimum(self, other, *args, **kwargs): '''Algebra: power method of BlockDataContainer with number/DataContainer or BlockDataContainer :param: other (number, DataContainer or subclasses or BlockDataContainer) :param: out (optional): provides a placeholder for the result. ''' return self.binary_operations(BlockDataContainer.MINIMUM, other, *args, **kwargs)
[docs] def sapyb(self, a, y, b, out, num_threads = NUM_THREADS): r'''performs axpby element-wise on the BlockDataContainer containers Does the operation .. math:: a*x+b*y and stores the result in out, where x is self :param a: scalar :param b: scalar :param y: compatible (Block)DataContainer :param out: (Block)DataContainer to store the result Example: -------- >>> a = 2 >>> b = 3 >>> ig = ImageGeometry(10,11) >>> x = ig.allocate(1) >>> y = ig.allocate(2) >>> bdc1 = BlockDataContainer(2*x, y) >>> bdc2 = BlockDataContainer(x, 2*y) >>> out = bdc1.sapyb(a,bdc2,b) ''' if out is None: raise ValueError("out container cannot be None") kwargs = {'a':a, 'b':b, 'out':out, 'num_threads': NUM_THREADS} self.binary_operations(BlockDataContainer.SAPYB, y, **kwargs)
[docs] def axpby(self, a, b, y, out, dtype=numpy.float32, num_threads = NUM_THREADS): '''Deprecated method. Alias of sapyb''' return self.sapyb(a,y,b,out,num_threads)
[docs] def binary_operations(self, operation, other, *args, **kwargs): '''Algebra: generic method of algebric operation with BlockDataContainer with number/DataContainer or BlockDataContainer Provides commutativity with DataContainer and subclasses, i.e. this class's reverse algebraic methods take precedence w.r.t. direct algebraic methods of DataContainer and subclasses. This method is not to be used directly ''' if not self.is_compatible(other): raise ValueError('Incompatible for operation {}'.format(operation)) out = kwargs.get('out', None) if isinstance(other, Number): # try to do algebra with one DataContainer. Will raise error if not compatible kw = kwargs.copy() res = [] for i,el in enumerate(self.containers): if operation == BlockDataContainer.ADD: op = el.add elif operation == BlockDataContainer.SUBTRACT: op = el.subtract elif operation == BlockDataContainer.MULTIPLY: op = el.multiply elif operation == BlockDataContainer.DIVIDE: op = el.divide elif operation == BlockDataContainer.POWER: op = el.power elif operation == BlockDataContainer.MAXIMUM: op = el.maximum elif operation == BlockDataContainer.MINIMUM: op = el.minimum else: raise ValueError('Unsupported operation', operation) if out is not None: kw['out'] = out.get_item(i) op(other, *args, **kw) else: res.append(op(other, *args, **kw)) if out is not None: return out else: return type(self)(*res, shape=self.shape) elif isinstance(other, (list, tuple, numpy.ndarray, BlockDataContainer)): kw = kwargs.copy() res = [] if isinstance(other, BlockDataContainer): the_other = other.containers else: the_other = other for i,zel in enumerate(zip ( self.containers, the_other) ): el = zel[0] ot = zel[1] if operation == BlockDataContainer.ADD: op = el.add elif operation == BlockDataContainer.SUBTRACT: op = el.subtract elif operation == BlockDataContainer.MULTIPLY: op = el.multiply elif operation == BlockDataContainer.DIVIDE: op = el.divide elif operation == BlockDataContainer.POWER: op = el.power elif operation == BlockDataContainer.MAXIMUM: op = el.maximum elif operation == BlockDataContainer.MINIMUM: op = el.minimum elif operation == BlockDataContainer.SAPYB: if not isinstance(other, BlockDataContainer): raise ValueError("{} cannot handle {}".format(operation, type(other))) op = el.sapyb else: raise ValueError('Unsupported operation', operation) if out is not None: if operation == BlockDataContainer.SAPYB: if isinstance(kw['a'], BlockDataContainer): a = kw['a'].get_item(i) else: a = kw['a'] if isinstance(kw['b'], BlockDataContainer): b = kw['b'].get_item(i) else: b = kw['b'] el.sapyb(a, ot, b, out.get_item(i), num_threads=kw['num_threads']) else: kw['out'] = out.get_item(i) op(ot, *args, **kw) else: res.append(op(ot, *args, **kw)) if out is not None: return out else: return type(self)(*res, shape=self.shape) else: # try to do algebra with one DataContainer. Will raise error if not compatible kw = kwargs.copy() if operation != BlockDataContainer.SAPYB: # remove keyworded argument related to SAPYB for k in ['a','b','y', 'num_threads', 'dtype']: if k in kw.keys(): kw.pop(k) res = [] for i,el in enumerate(self.containers): if operation == BlockDataContainer.ADD: op = el.add elif operation == BlockDataContainer.SUBTRACT: op = el.subtract elif operation == BlockDataContainer.MULTIPLY: op = el.multiply elif operation == BlockDataContainer.DIVIDE: op = el.divide elif operation == BlockDataContainer.POWER: op = el.power elif operation == BlockDataContainer.MAXIMUM: op = el.maximum elif operation == BlockDataContainer.MINIMUM: op = el.minimum elif operation == BlockDataContainer.SAPYB: if isinstance(kw['a'], BlockDataContainer): a = kw['a'].get_item(i) else: a = kw['a'] if isinstance(kw['b'], BlockDataContainer): b = kw['b'].get_item(i) else: b = kw['b'] el.sapyb(a, other, b, out.get_item(i), kw['num_threads']) # As axpyb cannot return anything we `continue` to skip the rest of the code block continue else: raise ValueError('Unsupported operation', operation) if out is not None: kw['out'] = out.get_item(i) op(other, *args, **kw) else: res.append(op(other, *args, **kw)) if out is not None: return out else: return type(self)(*res, shape=self.shape)
## unary operations
[docs] def unary_operations(self, operation, *args, **kwargs ): '''Unary operation on BlockDataContainer: generic method of unary operation with BlockDataContainer: abs, sign, sqrt and conjugate This method is not to be used directly ''' out = kwargs.get('out', None) kw = kwargs.copy() if out is None: res = [] for el in self.containers: if operation == BlockDataContainer.ABS: op = el.abs elif operation == BlockDataContainer.SIGN: op = el.sign elif operation == BlockDataContainer.SQRT: op = el.sqrt elif operation == BlockDataContainer.CONJUGATE: op = el.conjugate res.append(op(*args, **kw)) return BlockDataContainer(*res) else: kw.pop('out') for el,elout in zip(self.containers, out.containers): if operation == BlockDataContainer.ABS: op = el.abs elif operation == BlockDataContainer.SIGN: op = el.sign elif operation == BlockDataContainer.SQRT: op = el.sqrt elif operation == BlockDataContainer.CONJUGATE: op = el.conjugate kw['out'] = elout op(*args, **kw)
def abs(self, *args, **kwargs): return self.unary_operations(BlockDataContainer.ABS, *args, **kwargs) def sign(self, *args, **kwargs): return self.unary_operations(BlockDataContainer.SIGN, *args, **kwargs) def sqrt(self, *args, **kwargs): return self.unary_operations(BlockDataContainer.SQRT, *args, **kwargs) def conjugate(self, *args, **kwargs): return self.unary_operations(BlockDataContainer.CONJUGATE, *args, **kwargs) # def abs(self, *args, **kwargs): # return type(self)(*[ el.abs(*args, **kwargs) for el in self.containers], shape=self.shape) # def sign(self, *args, **kwargs): # return type(self)(*[ el.sign(*args, **kwargs) for el in self.containers], shape=self.shape) # def sqrt(self, *args, **kwargs): # return type(self)(*[ el.sqrt(*args, **kwargs) for el in self.containers], shape=self.shape) # def conjugate(self, out=None): # return type(self)(*[el.conjugate() for el in self.containers], shape=self.shape) ## reductions def sum(self, *args, **kwargs): return numpy.sum([ el.sum(*args, **kwargs) for el in self.containers]) def squared_norm(self): y = numpy.asarray([el.squared_norm() for el in self.containers]) return y.sum() def norm(self): return numpy.sqrt(self.squared_norm()) def pnorm(self, p=2): # See https://github.com/TomographicImaging/CIL/issues/1525#issuecomment-1757413803 if not functools.reduce(lambda x,y: x and y, [el.shape == self.containers[0].shape for el in self.containers], True): raise ValueError('pnorm: Incompatible shapes - each container in the BlockDataContainer must have the same shape in order to calculate the pnorm') if p==1: return sum(self.abs()) elif p==2: tmp = functools.reduce(lambda a,b: a + b.conjugate()*b, self.containers, self.get_item(0) * 0 ).sqrt() return tmp else: return ValueError('Not implemented')
[docs] def copy(self): '''alias of clone''' return self.clone()
def clone(self): return type(self)(*[el.copy() for el in self.containers], shape=self.shape) def fill(self, other): if isinstance (other, BlockDataContainer): if not self.is_compatible(other): raise ValueError('Incompatible containers') for el,ot in zip(self.containers, other.containers): el.fill(ot) else: return ValueError('Cannot fill with object provided {}'.format(type(other))) def __add__(self, other): return self.add( other ) # __radd__ def __sub__(self, other): return self.subtract( other ) # __rsub__ def __mul__(self, other): return self.multiply(other) # __rmul__ def __div__(self, other): return self.divide(other) # __rdiv__ def __truediv__(self, other): return self.divide(other) def __pow__(self, other): return self.power(other) # reverse operand
[docs] def __radd__(self, other): '''Reverse addition to make sure that this method is called rather than the __mul__ of a numpy array the class constant __array_priority__ must be set > 0 https://docs.scipy.org/doc/numpy-1.15.1/reference/arrays.classes.html#numpy.class.__array_priority__ ''' return self + other
# __radd__
[docs] def __rsub__(self, other): '''Reverse subtraction to make sure that this method is called rather than the __mul__ of a numpy array the class constant __array_priority__ must be set > 0 https://docs.scipy.org/doc/numpy-1.15.1/reference/arrays.classes.html#numpy.class.__array_priority__ ''' return (-1 * self) + other
# __rsub__
[docs] def __rmul__(self, other): '''Reverse multiplication to make sure that this method is called rather than the __mul__ of a numpy array the class constant __array_priority__ must be set > 0 https://docs.scipy.org/doc/numpy-1.15.1/reference/arrays.classes.html#numpy.class.__array_priority__ ''' return self * other
# __rmul__
[docs] def __rdiv__(self, other): '''Reverse division to make sure that this method is called rather than the __mul__ of a numpy array the class constant __array_priority__ must be set > 0 https://docs.scipy.org/doc/numpy-1.15.1/reference/arrays.classes.html#numpy.class.__array_priority__ ''' return pow(self / other, -1)
# __rdiv__
[docs] def __rtruediv__(self, other): '''Reverse truedivision to make sure that this method is called rather than the __mul__ of a numpy array the class constant __array_priority__ must be set > 0 https://docs.scipy.org/doc/numpy-1.15.1/reference/arrays.classes.html#numpy.class.__array_priority__ ''' return self.__rdiv__(other)
[docs] def __rpow__(self, other): '''Reverse power to make sure that this method is called rather than the __mul__ of a numpy array the class constant __array_priority__ must be set > 0 https://docs.scipy.org/doc/numpy-1.15.1/reference/arrays.classes.html#numpy.class.__array_priority__ ''' return other.power(self)
[docs] def __iadd__(self, other): '''Inline addition''' if isinstance (other, BlockDataContainer): for el,ot in zip(self.containers, other.containers): el += ot elif isinstance(other, Number): for el in self.containers: el += other elif isinstance(other, list) or isinstance(other, numpy.ndarray): if not self.is_compatible(other): raise ValueError('Incompatible for __iadd__') for el,ot in zip(self.containers, other): el += ot return self
# __iadd__
[docs] def __isub__(self, other): '''Inline subtraction''' if isinstance (other, BlockDataContainer): for el,ot in zip(self.containers, other.containers): el -= ot elif isinstance(other, Number): for el in self.containers: el -= other elif isinstance(other, list) or isinstance(other, numpy.ndarray): if not self.is_compatible(other): raise ValueError('Incompatible for __isub__') for el,ot in zip(self.containers, other): el -= ot return self
# __isub__
[docs] def __imul__(self, other): '''Inline multiplication''' if isinstance (other, BlockDataContainer): for el,ot in zip(self.containers, other.containers): el *= ot elif isinstance(other, Number): for el in self.containers: el *= other elif isinstance(other, list) or isinstance(other, numpy.ndarray): if not self.is_compatible(other): raise ValueError('Incompatible for __imul__') for el,ot in zip(self.containers, other): el *= ot return self
# __imul__
[docs] def __idiv__(self, other): '''Inline division''' if isinstance (other, BlockDataContainer): for el,ot in zip(self.containers, other.containers): el /= ot elif isinstance(other, Number): for el in self.containers: el /= other elif isinstance(other, list) or isinstance(other, numpy.ndarray): if not self.is_compatible(other): raise ValueError('Incompatible for __idiv__') for el,ot in zip(self.containers, other): el /= ot return self
# __rdiv__
[docs] def __itruediv__(self, other): '''Inline truedivision''' return self.__idiv__(other)
[docs] def __neg__(self): """ Return - self """ return -1 * self
def dot(self, other): # tmp = [ self.containers[i].dot(other.containers[i]) for i in range(self.shape[0])] return sum(tmp) def __len__(self): return self.shape[0] @property def geometry(self): try: return BlockGeometry(*[el.geometry.copy() for el in self.containers]) except AttributeError: return None