# -*- coding: utf-8 -*-
# Copyright 2018 United Kingdom Research and Innovation
# Copyright 2018 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# CIL Developers, listed at: https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt
import copy
import numpy
import warnings
from functools import reduce
from numbers import Number
import ctypes, platform
from ctypes import util
import math
import weakref
import logging
from cil.utilities.multiprocessing import NUM_THREADS
# check for the extension
if platform.system() == 'Linux':
dll = 'libcilacc.so'
elif platform.system() == 'Windows':
dll_file = 'cilacc.dll'
dll = util.find_library(dll_file)
elif platform.system() == 'Darwin':
dll = 'libcilacc.dylib'
else:
raise ValueError('Not supported platform, ', platform.system())
cilacc = ctypes.cdll.LoadLibrary(dll)
from cil.framework.BlockGeometry import BlockGeometry
class Partitioner(object):
'''Interface for AcquisitionData to be able to partition itself in a number of batches.
This class, by multiple inheritance with AcquisitionData, allows the user to partition the data,
by using the method ``partition``.
The partitioning will generate a ``BlockDataContainer`` with appropriate ``AcquisitionData``.
'''
# modes of partitioning
SEQUENTIAL = 'sequential'
STAGGERED = 'staggered'
RANDOM_PERMUTATION = 'random_permutation'
def _partition_indices(self, num_batches, indices, stagger=False):
"""Partition a list of indices into num_batches of indices.
Parameters
----------
num_batches : int
The number of batches to partition the indices into.
indices : list of int, int
The indices to partition. If passed a list, this list will be partitioned in ``num_batches``
partitions. If passed an int the indices will be generated automatically using ``range(indices)``.
stagger : bool, default False
If True, the indices will be staggered across the batches.
Returns
--------
list of list of int
A list of batches of indices.
"""
# Partition the indices into batches.
if isinstance(indices, int):
indices = list(range(indices))
num_indices = len(indices)
# sanity check
if num_indices < num_batches:
raise ValueError(
'The number of batches must be less than or equal to the number of indices.'
)
if stagger:
batches = [indices[i::num_batches] for i in range(num_batches)]
else:
# we split the indices with floor(N/M)+1 indices in N%M groups
# and floor(N/M) indices in the remaining M - N%M groups.
# rename num_indices to N for brevity
N = num_indices
# rename num_batches to M for brevity
M = num_batches
batches = [
indices[j:j + math.floor(N / M) + 1] for j in range(N % M)
]
offset = N % M * (math.floor(N / M) + 1)
for i in range(M - N % M):
start = offset + i * math.floor(N / M)
end = start + math.floor(N / M)
batches.append(indices[start:end])
return batches
def _construct_BlockGeometry_from_indices(self, indices):
'''Convert a list of boolean masks to a list of BlockGeometry.
Parameters
----------
indices : list of lists of indices
Returns
-------
BlockGeometry
'''
ags = []
for mask in indices:
ag = self.geometry.copy()
ag.config.angles.angle_data = numpy.take(self.geometry.angles, mask, axis=0)
ags.append(ag)
return BlockGeometry(*ags)
def partition(self, num_batches, mode, seed=None):
'''Partition the data into ``num_batches`` batches using the specified ``mode``.
The modes are
1. ``sequential`` - The data will be partitioned into ``num_batches`` batches of sequential indices.
2. ``staggered`` - The data will be partitioned into ``num_batches`` batches of sequential indices, with stride equal to ``num_batches``.
3. ``random_permutation`` - The data will be partitioned into ``num_batches`` batches of random indices.
Parameters
----------
num_batches : int
The number of batches to partition the data into.
mode : str
The mode to use for partitioning. Must be one of ``sequential``, ``staggered`` or ``random_permutation``.
seed : int, optional
The seed to use for the random permutation. If not specified, the random number
generator will not be seeded.
Returns
-------
BlockDataContainer
Block of `AcquisitionData` objects containing the data requested in each batch
Example
-------
Partitioning a list of ints [0, 1, 2, 3, 4, 5, 6, 7, 8] into 4 batches will return:
1. [[0, 1, 2], [3, 4], [5, 6], [7, 8]] with ``sequential``
2. [[0, 4, 8], [1, 5], [2, 6], [3, 7]] with ``staggered``
3. [[8, 2, 6], [7, 1], [0, 4], [3, 5]] with ``random_permutation`` and seed 1
'''
if mode == Partitioner.SEQUENTIAL:
return self._partition_deterministic(num_batches, stagger=False)
elif mode == Partitioner.STAGGERED:
return self._partition_deterministic(num_batches, stagger=True)
elif mode == Partitioner.RANDOM_PERMUTATION:
return self._partition_random_permutation(num_batches, seed=seed)
else:
raise ValueError('Unknown partition mode {}'.format(mode))
def _partition_deterministic(self, num_batches, stagger=False, indices=None):
'''Partition the data into ``num_batches`` batches.
Parameters
----------
num_batches : int
The number of batches to partition the data into.
stagger : bool, optional
If ``True``, the batches will be staggered. Default is ``False``.
indices : list of int, optional
The indices to partition. If not specified, the indices will be generated from the number of projections.
'''
if indices is None:
indices = self.geometry.num_projections
partition_indices = self._partition_indices(num_batches, indices, stagger)
blk_geo = self._construct_BlockGeometry_from_indices(partition_indices)
# copy data
out = blk_geo.allocate(None)
out.geometry = blk_geo
axis = self.dimension_labels.index('angle')
for i in range(num_batches):
out[i].fill(
numpy.squeeze(
numpy.take(self.array, partition_indices[i], axis=axis)
)
)
return out
def _partition_random_permutation(self, num_batches, seed=None):
'''Partition the data into ``num_batches`` batches using a random permutation.
Parameters
----------
num_batches : int
The number of batches to partition the data into.
seed : int, optional
The seed to use for the random permutation. If not specified, the random number generator
will not be seeded.
'''
if seed is not None:
numpy.random.seed(seed)
indices = numpy.arange(self.geometry.num_projections)
numpy.random.shuffle(indices)
indices = list(indices)
return self._partition_deterministic(num_batches, stagger=False, indices=indices)
def find_key(dic, val):
"""return the key of dictionary dic given the value"""
return [k for k, v in dic.items() if v == val][0]
def message(cls, msg, *args):
msg = "{0}: " + msg
for i in range(len(args)):
msg += " {%d}" %(i+1)
args = list(args)
args.insert(0, cls.__name__ )
return msg.format(*args )
[docs]class ImageGeometry(object):
RANDOM = 'random'
RANDOM_INT = 'random_int'
CHANNEL = 'channel'
VERTICAL = 'vertical'
HORIZONTAL_X = 'horizontal_x'
HORIZONTAL_Y = 'horizontal_y'
@property
def shape(self):
shape_dict = {ImageGeometry.CHANNEL: self.channels,
ImageGeometry.VERTICAL: self.voxel_num_z,
ImageGeometry.HORIZONTAL_Y: self.voxel_num_y,
ImageGeometry.HORIZONTAL_X: self.voxel_num_x}
shape = []
for label in self.dimension_labels:
shape.append(shape_dict[label])
return tuple(shape)
@shape.setter
def shape(self, val):
print("Deprecated - shape will be set automatically")
@property
def spacing(self):
spacing_dict = {ImageGeometry.CHANNEL: self.channel_spacing,
ImageGeometry.VERTICAL: self.voxel_size_z,
ImageGeometry.HORIZONTAL_Y: self.voxel_size_y,
ImageGeometry.HORIZONTAL_X: self.voxel_size_x}
spacing = []
for label in self.dimension_labels:
spacing.append(spacing_dict[label])
return tuple(spacing)
@property
def length(self):
return len(self.dimension_labels)
@property
def ndim(self):
return len(self.dimension_labels)
@property
def dimension_labels(self):
labels_default = DataOrder.CIL_IG_LABELS
shape_default = [ self.channels,
self.voxel_num_z,
self.voxel_num_y,
self.voxel_num_x]
try:
labels = list(self._dimension_labels)
except AttributeError:
labels = labels_default.copy()
for i, x in enumerate(shape_default):
if x == 0 or x==1:
try:
labels.remove(labels_default[i])
except ValueError:
pass #if not in custom list carry on
return tuple(labels)
@dimension_labels.setter
def dimension_labels(self, val):
self.set_labels(val)
def set_labels(self, labels):
labels_default = DataOrder.CIL_IG_LABELS
#check input and store. This value is not used directly
if labels is not None:
for x in labels:
if x not in labels_default:
raise ValueError('Requested axis are not possible. Accepted label names {},\ngot {}'\
.format(labels_default,labels))
self._dimension_labels = tuple(labels)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.voxel_num_x == other.voxel_num_x \
and self.voxel_num_y == other.voxel_num_y \
and self.voxel_num_z == other.voxel_num_z \
and self.voxel_size_x == other.voxel_size_x \
and self.voxel_size_y == other.voxel_size_y \
and self.voxel_size_z == other.voxel_size_z \
and self.center_x == other.center_x \
and self.center_y == other.center_y \
and self.center_z == other.center_z \
and self.channels == other.channels \
and self.channel_spacing == other.channel_spacing \
and self.dimension_labels == other.dimension_labels:
return True
return False
@property
def dtype(self):
return self._dtype
@dtype.setter
def dtype(self, val):
self._dtype = val
def __init__(self,
voxel_num_x=0,
voxel_num_y=0,
voxel_num_z=0,
voxel_size_x=1,
voxel_size_y=1,
voxel_size_z=1,
center_x=0,
center_y=0,
center_z=0,
channels=1,
**kwargs):
self.voxel_num_x = int(voxel_num_x)
self.voxel_num_y = int(voxel_num_y)
self.voxel_num_z = int(voxel_num_z)
self.voxel_size_x = float(voxel_size_x)
self.voxel_size_y = float(voxel_size_y)
self.voxel_size_z = float(voxel_size_z)
self.center_x = center_x
self.center_y = center_y
self.center_z = center_z
self.channels = channels
self.channel_labels = None
self.channel_spacing = 1.0
self.dimension_labels = kwargs.get('dimension_labels', None)
self.dtype = kwargs.get('dtype', numpy.float32)
[docs] def get_slice(self,channel=None, vertical=None, horizontal_x=None, horizontal_y=None):
'''
Returns a new ImageGeometry of a single slice of in the requested direction.
'''
geometry_new = self.copy()
if channel is not None:
geometry_new.channels = 1
try:
geometry_new.channel_labels = [self.channel_labels[channel]]
except:
geometry_new.channel_labels = None
if vertical is not None:
geometry_new.voxel_num_z = 0
if horizontal_y is not None:
geometry_new.voxel_num_y = 0
if horizontal_x is not None:
geometry_new.voxel_num_x = 0
return geometry_new
def get_order_by_label(self, dimension_labels, default_dimension_labels):
order = []
for i, el in enumerate(default_dimension_labels):
for j, ek in enumerate(dimension_labels):
if el == ek:
order.append(j)
break
return order
def get_min_x(self):
return self.center_x - 0.5*self.voxel_num_x*self.voxel_size_x
def get_max_x(self):
return self.center_x + 0.5*self.voxel_num_x*self.voxel_size_x
def get_min_y(self):
return self.center_y - 0.5*self.voxel_num_y*self.voxel_size_y
def get_max_y(self):
return self.center_y + 0.5*self.voxel_num_y*self.voxel_size_y
def get_min_z(self):
if not self.voxel_num_z == 0:
return self.center_z - 0.5*self.voxel_num_z*self.voxel_size_z
else:
return 0
def get_max_z(self):
if not self.voxel_num_z == 0:
return self.center_z + 0.5*self.voxel_num_z*self.voxel_size_z
else:
return 0
[docs] def clone(self):
'''returns a copy of the ImageGeometry'''
return copy.deepcopy(self)
[docs] def copy(self):
'''alias of clone'''
return self.clone()
def __str__ (self):
repres = ""
repres += "Number of channels: {0}\n".format(self.channels)
repres += "channel_spacing: {0}\n".format(self.channel_spacing)
if self.voxel_num_z > 0:
repres += "voxel_num : x{0},y{1},z{2}\n".format(self.voxel_num_x, self.voxel_num_y, self.voxel_num_z)
repres += "voxel_size : x{0},y{1},z{2}\n".format(self.voxel_size_x, self.voxel_size_y, self.voxel_size_z)
repres += "center : x{0},y{1},z{2}\n".format(self.center_x, self.center_y, self.center_z)
else:
repres += "voxel_num : x{0},y{1}\n".format(self.voxel_num_x, self.voxel_num_y)
repres += "voxel_size : x{0},y{1}\n".format(self.voxel_size_x, self.voxel_size_y)
repres += "center : x{0},y{1}\n".format(self.center_x, self.center_y)
return repres
[docs] def allocate(self, value=0, **kwargs):
'''allocates an ImageData according to the size expressed in the instance
:param value: accepts numbers to allocate an uniform array, or a string as 'random' or 'random_int' to create a random array or None.
:type value: number or string, default None allocates empty memory block, default 0
:param dtype: numerical type to allocate
:type dtype: numpy type, default numpy.float32
'''
dtype = kwargs.get('dtype', self.dtype)
if kwargs.get('dimension_labels', None) is not None:
raise ValueError("Deprecated: 'dimension_labels' cannot be set with 'allocate()'. Use 'geometry.set_labels()' to modify the geometry before using allocate.")
out = ImageData(geometry=self.copy(),
dtype=dtype,
suppress_warning=True)
if isinstance(value, Number):
# it's created empty, so we make it 0
out.array.fill(value)
else:
if value == ImageGeometry.RANDOM:
seed = kwargs.get('seed', None)
if seed is not None:
numpy.random.seed(seed)
if numpy.iscomplexobj(out.array):
r = numpy.random.random_sample(self.shape) + 1j * numpy.random.random_sample(self.shape)
out.fill(r)
else:
out.fill(numpy.random.random_sample(self.shape))
elif value == ImageGeometry.RANDOM_INT:
seed = kwargs.get('seed', None)
if seed is not None:
numpy.random.seed(seed)
max_value = kwargs.get('max_value', 100)
r = numpy.random.randint(max_value,size=self.shape, dtype=numpy.int32)
out.fill(numpy.asarray(r, dtype=self.dtype))
elif value is None:
pass
else:
raise ValueError('Value {} unknown'.format(value))
return out
class ComponentDescription(object):
r'''This class enables the creation of vectors and unit vectors used to describe the components of a tomography system
'''
def __init__ (self, dof):
self._dof = dof
@staticmethod
def create_vector(val):
try:
vec = numpy.array(val, dtype=numpy.float64).reshape(len(val))
except:
raise ValueError("Can't convert to numpy array")
return vec
@staticmethod
def create_unit_vector(val):
vec = ComponentDescription.create_vector(val)
dot_product = vec.dot(vec)
if abs(dot_product)>1e-8:
vec = (vec/numpy.sqrt(dot_product))
else:
raise ValueError("Can't return a unit vector of a zero magnitude vector")
return vec
def length_check(self, val):
try:
val_length = len(val)
except:
raise ValueError("Vectors for {0}D geometries must have length = {0}. Got {1}".format(self._dof,val))
if val_length != self._dof:
raise ValueError("Vectors for {0}D geometries must have length = {0}. Got {1}".format(self._dof,val))
@staticmethod
def test_perpendicular(vector1, vector2):
dor_prod = vector1.dot(vector2)
if abs(dor_prod) <1e-10:
return True
return False
@staticmethod
def test_parallel(vector1, vector2):
'''For unit vectors only. Returns true if directions are opposite'''
dor_prod = vector1.dot(vector2)
if 1- abs(dor_prod) <1e-10:
return True
return False
class PositionVector(ComponentDescription):
r'''This class creates a component of a tomography system with a position attribute
'''
@property
def position(self):
try:
return self._position
except:
raise AttributeError
@position.setter
def position(self, val):
self.length_check(val)
self._position = ComponentDescription.create_vector(val)
class DirectionVector(ComponentDescription):
r'''This class creates a component of a tomography system with a direction attribute
'''
@property
def direction(self):
try:
return self._direction
except:
raise AttributeError
@direction.setter
def direction(self, val):
self.length_check(val)
self._direction = ComponentDescription.create_unit_vector(val)
class PositionDirectionVector(PositionVector, DirectionVector):
r'''This class creates a component of a tomography system with position and direction attributes
'''
pass
class Detector1D(PositionVector):
r'''This class creates a component of a tomography system with position and direction_x attributes used for 1D panels
'''
@property
def direction_x(self):
try:
return self._direction_x
except:
raise AttributeError
@direction_x.setter
def direction_x(self, val):
self.length_check(val)
self._direction_x = ComponentDescription.create_unit_vector(val)
@property
def normal(self):
try:
return ComponentDescription.create_unit_vector([self._direction_x[1], -self._direction_x[0]])
except:
raise AttributeError
class Detector2D(PositionVector):
r'''This class creates a component of a tomography system with position, direction_x and direction_y attributes used for 2D panels
'''
@property
def direction_x(self):
try:
return self._direction_x
except:
raise AttributeError
@property
def direction_y(self):
try:
return self._direction_y
except:
raise AttributeError
@property
def normal(self):
try:
return numpy.cross(self._direction_x, self._direction_y)
except:
raise AttributeError
def set_direction(self, x, y):
self.length_check(x)
x = ComponentDescription.create_unit_vector(x)
self.length_check(y)
y = ComponentDescription.create_unit_vector(y)
dot_product = x.dot(y)
if not numpy.isclose(dot_product, 0):
raise ValueError("vectors detector.direction_x and detector.direction_y must be orthogonal")
self._direction_y = y
self._direction_x = x
class SystemConfiguration(object):
r'''This is a generic class to hold the description of a tomography system
'''
SYSTEM_SIMPLE = 'simple'
SYSTEM_OFFSET = 'offset'
SYSTEM_ADVANCED = 'advanced'
@property
def dimension(self):
if self._dimension == 2:
return '2D'
else:
return '3D'
@dimension.setter
def dimension(self,val):
if val != 2 and val != 3:
raise ValueError('Can set up 2D and 3D systems only. got {0}D'.format(val))
else:
self._dimension = val
@property
def geometry(self):
return self._geometry
@geometry.setter
def geometry(self,val):
if val != AcquisitionGeometry.CONE and val != AcquisitionGeometry.PARALLEL:
raise ValueError('geom_type = {} not recognised please specify \'cone\' or \'parallel\''.format(val))
else:
self._geometry = val
def __init__(self, dof, geometry, units='units'):
"""Initialises the system component attributes for the acquisition type
"""
self.dimension = dof
self.geometry = geometry
self.units = units
if geometry == AcquisitionGeometry.PARALLEL:
self.ray = DirectionVector(dof)
else:
self.source = PositionVector(dof)
if dof == 2:
self.detector = Detector1D(dof)
self.rotation_axis = PositionVector(dof)
else:
self.detector = Detector2D(dof)
self.rotation_axis = PositionDirectionVector(dof)
def __str__(self):
"""Implements the string representation of the system configuration
"""
raise NotImplementedError
def __eq__(self, other):
"""Implements the equality check of the system configuration
"""
raise NotImplementedError
@staticmethod
def rotation_vec_to_y(vec):
''' returns a rotation matrix that will rotate the projection of vec on the x-y plane to the +y direction [0,1, Z]'''
vec = ComponentDescription.create_unit_vector(vec)
axis_rotation = numpy.eye(len(vec))
if numpy.allclose(vec[:2],[0,1]):
pass
elif numpy.allclose(vec[:2],[0,-1]):
axis_rotation[0][0] = -1
axis_rotation[1][1] = -1
else:
theta = math.atan2(vec[0],vec[1])
axis_rotation[0][0] = axis_rotation[1][1] = math.cos(theta)
axis_rotation[0][1] = -math.sin(theta)
axis_rotation[1][0] = math.sin(theta)
return axis_rotation
@staticmethod
def rotation_vec_to_z(vec):
''' returns a rotation matrix that will align vec with the z-direction [0,0,1]'''
vec = ComponentDescription.create_unit_vector(vec)
if len(vec) == 2:
return numpy.array([[1, 0],[0, 1]])
elif len(vec) == 3:
axis_rotation = numpy.eye(3)
if numpy.allclose(vec,[0,0,1]):
pass
elif numpy.allclose(vec,[0,0,-1]):
axis_rotation = numpy.eye(3)
axis_rotation[1][1] = -1
axis_rotation[2][2] = -1
else:
vx = numpy.array([[0, 0, -vec[0]], [0, 0, -vec[1]], [vec[0], vec[1], 0]])
axis_rotation = numpy.eye(3) + vx + vx.dot(vx) * 1 / (1 + vec[2])
else:
raise ValueError("Vec must have length 3, got {}".format(len(vec)))
return axis_rotation
def update_reference_frame(self):
r'''Transforms the system origin to the rotation_axis position
'''
self.set_origin(self.rotation_axis.position)
def set_origin(self, origin):
r'''Transforms the system origin to the input origin
'''
translation = origin.copy()
if hasattr(self,'source'):
self.source.position -= translation
self.detector.position -= translation
self.rotation_axis.position -= translation
def get_centre_slice(self):
"""Returns the 2D system configuration corresponding to the centre slice
"""
raise NotImplementedError
def calculate_magnification(self):
r'''Calculates the magnification of the system using the source to rotate axis,
and source to detector distance along the direction.
:return: returns [dist_source_center, dist_center_detector, magnification], [0] distance from the source to the rotate axis, [1] distance from the rotate axis to the detector, [2] magnification of the system
:rtype: list
'''
raise NotImplementedError
def system_description(self):
r'''Returns `simple` if the the geometry matches the default definitions with no offsets or rotations,
\nReturns `offset` if the the geometry matches the default definitions with centre-of-rotation or detector offsets
\nReturns `advanced` if the the geometry has rotated or tilted rotation axis or detector, can also have offsets
'''
raise NotImplementedError
def copy(self):
'''returns a copy of SystemConfiguration'''
return copy.deepcopy(self)
class Parallel2D(SystemConfiguration):
r'''This class creates the SystemConfiguration of a parallel beam 2D tomographic system
:param ray_direction: A 2D vector describing the x-ray direction (x,y)
:type ray_direction: list, tuple, ndarray
:param detector_pos: A 2D vector describing the position of the centre of the detector (x,y)
:type detector_pos: list, tuple, ndarray
:param detector_direction_x: A 2D vector describing the direction of the detector_x (x,y)
:type detector_direction_x: list, tuple, ndarray
:param rotation_axis_pos: A 2D vector describing the position of the axis of rotation (x,y)
:type rotation_axis_pos: list, tuple, ndarray
:param units: Label the units of distance used for the configuration
:type units: string
'''
def __init__ (self, ray_direction, detector_pos, detector_direction_x, rotation_axis_pos, units='units'):
"""Constructor method
"""
super(Parallel2D, self).__init__(dof=2, geometry = 'parallel', units=units)
#source
self.ray.direction = ray_direction
#detector
self.detector.position = detector_pos
self.detector.direction_x = detector_direction_x
#rotate axis
self.rotation_axis.position = rotation_axis_pos
def align_reference_frame(self, definition='cil'):
r'''Transforms and rotates the system to backend definitions
'cil' sets the origin to the rotation axis and aligns the y axis with the ray-direction
'tigre' sets the origin to the rotation axis and aligns the y axis with the ray-direction
'''
#in this instance definitions are the same
if definition not in ['cil','tigre']:
raise ValueError("Geometry can be configured for definition = 'cil' or 'tigre' only. Got {}".format(definition))
self.set_origin(self.rotation_axis.position)
rotation_matrix = SystemConfiguration.rotation_vec_to_y(self.ray.direction)
self.ray.direction = rotation_matrix.dot(self.ray.direction.reshape(2,1))
self.detector.position = rotation_matrix.dot(self.detector.position.reshape(2,1))
self.detector.direction_x = rotation_matrix.dot(self.detector.direction_x.reshape(2,1))
def system_description(self):
r'''Returns `simple` if the the geometry matches the default definitions with no offsets or rotations,
\nReturns `offset` if the the geometry matches the default definitions with centre-of-rotation or detector offsets
\nReturns `advanced` if the the geometry has rotated or tilted rotation axis or detector, can also have offsets
'''
rays_perpendicular_detector = ComponentDescription.test_parallel(self.ray.direction, self.detector.normal)
#rotation axis position + ray direction hits detector position
if numpy.allclose(self.rotation_axis.position, self.detector.position): #points are equal so on ray path
rotation_axis_centred = True
else:
vec_a = ComponentDescription.create_unit_vector(self.detector.position - self.rotation_axis.position)
rotation_axis_centred = ComponentDescription.test_parallel(self.ray.direction, vec_a)
if not rays_perpendicular_detector:
config = SystemConfiguration.SYSTEM_ADVANCED
elif not rotation_axis_centred:
config = SystemConfiguration.SYSTEM_OFFSET
else:
config = SystemConfiguration.SYSTEM_SIMPLE
return config
def rotation_axis_on_detector(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the world coordinate system
Returns
-------
PositionVector
Position in the 3D system
"""
Pv = self.rotation_axis.position
ratio = (self.detector.position - Pv).dot(self.detector.normal) / self.ray.direction.dot(self.detector.normal)
out = PositionVector(2)
out.position = Pv + self.ray.direction * ratio
return out
def calculate_centre_of_rotation(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the detector coordinate system
Note
----
- Origin is in the centre of the detector
- Axes directions are specified by detector.direction_x, detector.direction_y
- Units are the units of distance used to specify the component's positions
Returns
-------
Float
Offset position along the detector x_axis at y=0
Float
Angle between the y_axis and the rotation axis projection, in radians
"""
#convert to the detector coordinate system
dp1 = self.rotation_axis_on_detector().position - self.detector.position
offset = self.detector.direction_x.dot(dp1)
return (offset, 0.0)
def set_centre_of_rotation(self, offset):
""" Configures the geometry to have the requested centre of rotation offset at the detector
"""
offset_current = self.calculate_centre_of_rotation()[0]
offset_new = offset - offset_current
self.rotation_axis.position = self.rotation_axis.position + offset_new * self.detector.direction_x
def __str__(self):
def csv(val):
return numpy.array2string(val, separator=', ')
repres = "2D Parallel-beam tomography\n"
repres += "System configuration:\n"
repres += "\tRay direction: {0}\n".format(csv(self.ray.direction))
repres += "\tRotation axis position: {0}\n".format(csv(self.rotation_axis.position))
repres += "\tDetector position: {0}\n".format(csv(self.detector.position))
repres += "\tDetector direction x: {0}\n".format(csv(self.detector.direction_x))
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if numpy.allclose(self.ray.direction, other.ray.direction) \
and numpy.allclose(self.detector.position, other.detector.position)\
and numpy.allclose(self.detector.direction_x, other.detector.direction_x)\
and numpy.allclose(self.rotation_axis.position, other.rotation_axis.position):
return True
return False
def get_centre_slice(self):
return self
def calculate_magnification(self):
return [None, None, 1.0]
class Parallel3D(SystemConfiguration):
r'''This class creates the SystemConfiguration of a parallel beam 3D tomographic system
:param ray_direction: A 3D vector describing the x-ray direction (x,y,z)
:type ray_direction: list, tuple, ndarray
:param detector_pos: A 3D vector describing the position of the centre of the detector (x,y,z)
:type detector_pos: list, tuple, ndarray
:param detector_direction_x: A 3D vector describing the direction of the detector_x (x,y,z)
:type detector_direction_x: list, tuple, ndarray
:param detector_direction_y: A 3D vector describing the direction of the detector_y (x,y,z)
:type detector_direction_y: list, tuple, ndarray
:param rotation_axis_pos: A 3D vector describing the position of the axis of rotation (x,y,z)
:type rotation_axis_pos: list, tuple, ndarray
:param rotation_axis_direction: A 3D vector describing the direction of the axis of rotation (x,y,z)
:type rotation_axis_direction: list, tuple, ndarray
:param units: Label the units of distance used for the configuration
:type units: string
'''
def __init__ (self, ray_direction, detector_pos, detector_direction_x, detector_direction_y, rotation_axis_pos, rotation_axis_direction, units='units'):
"""Constructor method
"""
super(Parallel3D, self).__init__(dof=3, geometry = 'parallel', units=units)
#source
self.ray.direction = ray_direction
#detector
self.detector.position = detector_pos
self.detector.set_direction(detector_direction_x, detector_direction_y)
#rotate axis
self.rotation_axis.position = rotation_axis_pos
self.rotation_axis.direction = rotation_axis_direction
def align_z(self):
r'''Transforms the system origin to the rotate axis with z direction aligned to the rotate axis direction
'''
self.set_origin(self.rotation_axis.position)
#calculate rotation matrix to align rotation axis direction with z
rotation_matrix = SystemConfiguration.rotation_vec_to_z(self.rotation_axis.direction)
#apply transform
self.rotation_axis.direction = [0,0,1]
self.ray.direction = rotation_matrix.dot(self.ray.direction.reshape(3,1))
self.detector.position = rotation_matrix.dot(self.detector.position.reshape(3,1))
new_x = rotation_matrix.dot(self.detector.direction_x.reshape(3,1))
new_y = rotation_matrix.dot(self.detector.direction_y.reshape(3,1))
self.detector.set_direction(new_x, new_y)
def align_reference_frame(self, definition='cil'):
r'''Transforms and rotates the system to backend definitions
'''
#in this instance definitions are the same
if definition not in ['cil','tigre']:
raise ValueError("Geometry can be configured for definition = 'cil' or 'tigre' only. Got {}".format(definition))
self.align_z()
rotation_matrix = SystemConfiguration.rotation_vec_to_y(self.ray.direction)
self.ray.direction = rotation_matrix.dot(self.ray.direction.reshape(3,1))
self.detector.position = rotation_matrix.dot(self.detector.position.reshape(3,1))
new_direction_x = rotation_matrix.dot(self.detector.direction_x.reshape(3,1))
new_direction_y = rotation_matrix.dot(self.detector.direction_y.reshape(3,1))
self.detector.set_direction(new_direction_x, new_direction_y)
def system_description(self):
r'''Returns `simple` if the the geometry matches the default definitions with no offsets or rotations,
\nReturns `offset` if the the geometry matches the default definitions with centre-of-rotation or detector offsets
\nReturns `advanced` if the the geometry has rotated or tilted rotation axis or detector, can also have offsets
'''
'''
simple
- rays perpendicular to detector
- rotation axis parallel to detector y
- rotation axis position + ray direction hits detector with no x offset (y offsets allowed)
offset
- rays perpendicular to detector
- rotation axis parallel to detector y
rolled
- rays perpendicular to detector
- rays perpendicular to rotation axis
advanced
- not rays perpendicular to detector (for parallel just equates to an effective pixel size change?)
or
- not rays perpendicular to rotation axis (tilted, i.e. laminography)
'''
rays_perpendicular_detector = ComponentDescription.test_parallel(self.ray.direction, self.detector.normal)
rays_perpendicular_rotation = ComponentDescription.test_perpendicular(self.ray.direction, self.rotation_axis.direction)
rotation_parallel_detector_y = ComponentDescription.test_parallel(self.rotation_axis.direction, self.detector.direction_y)
#rotation axis to detector is parallel with ray
if numpy.allclose(self.rotation_axis.position, self.detector.position): #points are equal so on ray path
rotation_axis_centred = True
else:
vec_a = ComponentDescription.create_unit_vector(self.detector.position - self.rotation_axis.position )
rotation_axis_centred = ComponentDescription.test_parallel(self.ray.direction, vec_a)
if not rays_perpendicular_detector or\
not rays_perpendicular_rotation or\
not rotation_parallel_detector_y:
config = SystemConfiguration.SYSTEM_ADVANCED
elif not rotation_axis_centred:
config = SystemConfiguration.SYSTEM_OFFSET
else:
config = SystemConfiguration.SYSTEM_SIMPLE
return config
def __str__(self):
def csv(val):
return numpy.array2string(val, separator=', ')
repres = "3D Parallel-beam tomography\n"
repres += "System configuration:\n"
repres += "\tRay direction: {0}\n".format(csv(self.ray.direction))
repres += "\tRotation axis position: {0}\n".format(csv(self.rotation_axis.position))
repres += "\tRotation axis direction: {0}\n".format(csv(self.rotation_axis.direction))
repres += "\tDetector position: {0}\n".format(csv(self.detector.position))
repres += "\tDetector direction x: {0}\n".format(csv(self.detector.direction_x))
repres += "\tDetector direction y: {0}\n".format(csv(self.detector.direction_y))
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if numpy.allclose(self.ray.direction, other.ray.direction) \
and numpy.allclose(self.detector.position, other.detector.position)\
and numpy.allclose(self.detector.direction_x, other.detector.direction_x)\
and numpy.allclose(self.detector.direction_y, other.detector.direction_y)\
and numpy.allclose(self.rotation_axis.position, other.rotation_axis.position)\
and numpy.allclose(self.rotation_axis.direction, other.rotation_axis.direction):
return True
return False
def calculate_magnification(self):
return [None, None, 1.0]
def get_centre_slice(self):
"""Returns the 2D system configuration corresponding to the centre slice
"""
dp1 = self.rotation_axis.direction.dot(self.ray.direction)
dp2 = self.rotation_axis.direction.dot(self.detector.direction_x)
if numpy.isclose(dp1, 0) and numpy.isclose(dp2, 0):
temp = self.copy()
#convert to rotation axis reference frame
temp.align_reference_frame()
ray_direction = temp.ray.direction[0:2]
detector_position = temp.detector.position[0:2]
detector_direction_x = temp.detector.direction_x[0:2]
rotation_axis_position = temp.rotation_axis.position[0:2]
return Parallel2D(ray_direction, detector_position, detector_direction_x, rotation_axis_position)
else:
raise ValueError('Cannot convert geometry to 2D. Requires axis of rotation to be perpendicular to ray direction and the detector direction x.')
def rotation_axis_on_detector(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the world coordinate system
Returns
-------
PositionDirectionVector
Position and direction in the 3D system
"""
#calculate the rotation axis line with the detector
vec_a = self.ray.direction
#calculate the intersection with the detector
Pv = self.rotation_axis.position
ratio = (self.detector.position - Pv).dot(self.detector.normal) / vec_a.dot(self.detector.normal)
point1 = Pv + vec_a * ratio
Pv = self.rotation_axis.position + self.rotation_axis.direction
ratio = (self.detector.position - Pv).dot(self.detector.normal) / vec_a.dot(self.detector.normal)
point2 = Pv + vec_a * ratio
out = PositionDirectionVector(3)
out.position = point1
out.direction = point2 - point1
return out
def calculate_centre_of_rotation(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the detector coordinate system
Note
----
- Origin is in the centre of the detector
- Axes directions are specified by detector.direction_x, detector.direction_y
- Units are the units of distance used to specify the component's positions
Returns
-------
Float
Offset position along the detector x_axis at y=0
Float
Angle between the y_axis and the rotation axis projection, in radians
"""
rotate_axis_projection = self.rotation_axis_on_detector()
p1 = rotate_axis_projection.position
p2 = p1 + rotate_axis_projection.direction
#point1 and point2 are on the detector plane. need to return them in the detector coordinate system
dp1 = p1 - self.detector.position
x1 = self.detector.direction_x.dot(dp1)
y1 = self.detector.direction_y.dot(dp1)
dp2 = p2 - self.detector.position
x2 = self.detector.direction_x.dot(dp2)
y2 = self.detector.direction_y.dot(dp2)
#y = m * x + c
#c = y1 - m * x1
#when y is 0
#x=-c/m
#x_y0 = -y1/m + x1
offset_x_y0 = x1 -y1 * (x2 - x1)/(y2-y1)
angle = math.atan2(x2 - x1, y2 - y1)
offset = offset_x_y0
return (offset, angle)
def set_centre_of_rotation(self, offset, angle):
""" Configures the geometry to have the requested centre of rotation offset at the detector
"""
#two points on the detector
x1 = offset
y1 = 0
x2 = offset + math.tan(angle)
y2 = 1
#convert to 3d coordinates in system frame
p1 = self.detector.position + x1 * self.detector.direction_x + y1 * self.detector.direction_y
p2 = self.detector.position + x2 * self.detector.direction_x + y2 * self.detector.direction_y
# find where vec p1 + t * ray dirn intersects plane defined by rotate axis (pos and dir) and det_x direction
vector_pos=p1
vec_dirn=self.ray.direction
plane_pos=self.rotation_axis.position
plane_normal = numpy.cross(self.detector.direction_x, self.rotation_axis.direction)
ratio = (plane_pos - vector_pos).dot(plane_normal) / vec_dirn.dot(plane_normal)
p1_on_plane = vector_pos + vec_dirn * ratio
vector_pos=p2
ratio = (plane_pos - vector_pos).dot(plane_normal) / vec_dirn.dot(plane_normal)
p2_on_plane = vector_pos + vec_dirn * ratio
self.rotation_axis.position = p1_on_plane
self.rotation_axis.direction = p2_on_plane - p1_on_plane
class Cone2D(SystemConfiguration):
r'''This class creates the SystemConfiguration of a cone beam 2D tomographic system
:param source_pos: A 2D vector describing the position of the source (x,y)
:type source_pos: list, tuple, ndarray
:param detector_pos: A 2D vector describing the position of the centre of the detector (x,y)
:type detector_pos: list, tuple, ndarray
:param detector_direction_x: A 2D vector describing the direction of the detector_x (x,y)
:type detector_direction_x: list, tuple, ndarray
:param rotation_axis_pos: A 2D vector describing the position of the axis of rotation (x,y)
:type rotation_axis_pos: list, tuple, ndarray
:param units: Label the units of distance used for the configuration
:type units: string
'''
def __init__ (self, source_pos, detector_pos, detector_direction_x, rotation_axis_pos, units='units'):
"""Constructor method
"""
super(Cone2D, self).__init__(dof=2, geometry = 'cone', units=units)
#source
self.source.position = source_pos
#detector
self.detector.position = detector_pos
self.detector.direction_x = detector_direction_x
#rotate axis
self.rotation_axis.position = rotation_axis_pos
def align_reference_frame(self, definition='cil'):
r'''Transforms and rotates the system to backend definitions
'''
self.set_origin(self.rotation_axis.position)
if definition=='cil':
rotation_matrix = SystemConfiguration.rotation_vec_to_y(self.detector.position - self.source.position)
elif definition=='tigre':
rotation_matrix = SystemConfiguration.rotation_vec_to_y(self.rotation_axis.position - self.source.position)
else:
raise ValueError("Geometry can be configured for definition = 'cil' or 'tigre' only. Got {}".format(definition))
self.source.position = rotation_matrix.dot(self.source.position.reshape(2,1))
self.detector.position = rotation_matrix.dot(self.detector.position.reshape(2,1))
self.detector.direction_x = rotation_matrix.dot(self.detector.direction_x.reshape(2,1))
def system_description(self):
r'''Returns `simple` if the the geometry matches the default definitions with no offsets or rotations,
\nReturns `offset` if the the geometry matches the default definitions with centre-of-rotation or detector offsets
\nReturns `advanced` if the the geometry has rotated or tilted rotation axis or detector, can also have offsets
'''
vec_src2det = ComponentDescription.create_unit_vector(self.detector.position - self.source.position)
principal_ray_centred = ComponentDescription.test_parallel(vec_src2det, self.detector.normal)
#rotation axis to detector is parallel with centre ray
if numpy.allclose(self.rotation_axis.position, self.detector.position): #points are equal
rotation_axis_centred = True
else:
vec_b = ComponentDescription.create_unit_vector(self.detector.position - self.rotation_axis.position )
rotation_axis_centred = ComponentDescription.test_parallel(vec_src2det, vec_b)
if not principal_ray_centred:
config = SystemConfiguration.SYSTEM_ADVANCED
elif not rotation_axis_centred:
config = SystemConfiguration.SYSTEM_OFFSET
else:
config = SystemConfiguration.SYSTEM_SIMPLE
return config
def __str__(self):
def csv(val):
return numpy.array2string(val, separator=', ')
repres = "2D Cone-beam tomography\n"
repres += "System configuration:\n"
repres += "\tSource position: {0}\n".format(csv(self.source.position))
repres += "\tRotation axis position: {0}\n".format(csv(self.rotation_axis.position))
repres += "\tDetector position: {0}\n".format(csv(self.detector.position))
repres += "\tDetector direction x: {0}\n".format(csv(self.detector.direction_x))
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if numpy.allclose(self.source.position, other.source.position) \
and numpy.allclose(self.detector.position, other.detector.position)\
and numpy.allclose(self.detector.direction_x, other.detector.direction_x)\
and numpy.allclose(self.rotation_axis.position, other.rotation_axis.position):
return True
return False
def get_centre_slice(self):
return self
def calculate_magnification(self):
ab = (self.rotation_axis.position - self.source.position)
dist_source_center = float(numpy.sqrt(ab.dot(ab)))
ab_unit = ab / numpy.sqrt(ab.dot(ab))
n = self.detector.normal
#perpendicular distance between source and detector centre
sd = float((self.detector.position - self.source.position).dot(n))
ratio = float(ab_unit.dot(n))
source_to_detector = sd / ratio
dist_center_detector = source_to_detector - dist_source_center
magnification = (dist_center_detector + dist_source_center) / dist_source_center
return [dist_source_center, dist_center_detector, magnification]
def rotation_axis_on_detector(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the world coordinate system
Returns
-------
PositionVector
Position in the 3D system
"""
#calculate the point the rotation axis intersects with the detector
vec_a = self.rotation_axis.position - self.source.position
Pv = self.rotation_axis.position
ratio = (self.detector.position - Pv).dot(self.detector.normal) / vec_a.dot(self.detector.normal)
out = PositionVector(2)
out.position = Pv + vec_a * ratio
return out
def calculate_centre_of_rotation(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the detector coordinate system
Note
----
- Origin is in the centre of the detector
- Axes directions are specified by detector.direction_x, detector.direction_y
- Units are the units of distance used to specify the component's positions
Returns
-------
Float
Offset position along the detector x_axis at y=0
Float
Angle between the y_axis and the rotation axis projection, in radians
"""
#convert to the detector coordinate system
dp1 = self.rotation_axis_on_detector().position - self.detector.position
offset = self.detector.direction_x.dot(dp1)
return (offset, 0.0)
def set_centre_of_rotation(self, offset):
""" Configures the geometry to have the requested centre of rotation offset at the detector
"""
offset_current = self.calculate_centre_of_rotation()[0]
offset_new = offset - offset_current
cofr_shift = offset_new * self.detector.direction_x /self.calculate_magnification()[2]
self.rotation_axis.position =self.rotation_axis.position + cofr_shift
class Cone3D(SystemConfiguration):
r'''This class creates the SystemConfiguration of a cone beam 3D tomographic system
:param source_pos: A 3D vector describing the position of the source (x,y,z)
:type source_pos: list, tuple, ndarray
:param detector_pos: A 3D vector describing the position of the centre of the detector (x,y,z)
:type detector_pos: list, tuple, ndarray
:param detector_direction_x: A 3D vector describing the direction of the detector_x (x,y,z)
:type detector_direction_x: list, tuple, ndarray
:param detector_direction_y: A 3D vector describing the direction of the detector_y (x,y,z)
:type detector_direction_y: list, tuple, ndarray
:param rotation_axis_pos: A 3D vector describing the position of the axis of rotation (x,y,z)
:type rotation_axis_pos: list, tuple, ndarray
:param rotation_axis_direction: A 3D vector describing the direction of the axis of rotation (x,y,z)
:type rotation_axis_direction: list, tuple, ndarray
:param units: Label the units of distance used for the configuration
:type units: string
'''
def __init__ (self, source_pos, detector_pos, detector_direction_x, detector_direction_y, rotation_axis_pos, rotation_axis_direction, units='units'):
"""Constructor method
"""
super(Cone3D, self).__init__(dof=3, geometry = 'cone', units=units)
#source
self.source.position = source_pos
#detector
self.detector.position = detector_pos
self.detector.set_direction(detector_direction_x, detector_direction_y)
#rotate axis
self.rotation_axis.position = rotation_axis_pos
self.rotation_axis.direction = rotation_axis_direction
def align_z(self):
r'''Transforms the system origin to the rotate axis with z direction aligned to the rotate axis direction
'''
self.set_origin(self.rotation_axis.position)
rotation_matrix = SystemConfiguration.rotation_vec_to_z(self.rotation_axis.direction)
#apply transform
self.rotation_axis.direction = [0,0,1]
self.source.position = rotation_matrix.dot(self.source.position.reshape(3,1))
self.detector.position = rotation_matrix.dot(self.detector.position.reshape(3,1))
new_x = rotation_matrix.dot(self.detector.direction_x.reshape(3,1))
new_y = rotation_matrix.dot(self.detector.direction_y.reshape(3,1))
self.detector.set_direction(new_x, new_y)
def align_reference_frame(self, definition='cil'):
r'''Transforms and rotates the system to backend definitions
'''
self.align_z()
if definition=='cil':
rotation_matrix = SystemConfiguration.rotation_vec_to_y(self.detector.position - self.source.position)
elif definition=='tigre':
rotation_matrix = SystemConfiguration.rotation_vec_to_y(self.rotation_axis.position - self.source.position)
else:
raise ValueError("Geometry can be configured for definition = 'cil' or 'tigre' only. Got {}".format(definition))
self.source.position = rotation_matrix.dot(self.source.position.reshape(3,1))
self.detector.position = rotation_matrix.dot(self.detector.position.reshape(3,1))
new_direction_x = rotation_matrix.dot(self.detector.direction_x.reshape(3,1))
new_direction_y = rotation_matrix.dot(self.detector.direction_y.reshape(3,1))
self.detector.set_direction(new_direction_x, new_direction_y)
def system_description(self):
r'''Returns `simple` if the the geometry matches the default definitions with no offsets or rotations,
\nReturns `offset` if the the geometry matches the default definitions with centre-of-rotation or detector offsets
\nReturns `advanced` if the the geometry has rotated or tilted rotation axis or detector, can also have offsets
'''
vec_src2det = ComponentDescription.create_unit_vector(self.detector.position - self.source.position)
principal_ray_centred = ComponentDescription.test_parallel(vec_src2det, self.detector.normal)
centre_ray_perpendicular_rotation = ComponentDescription.test_perpendicular(vec_src2det, self.rotation_axis.direction)
rotation_parallel_detector_y = ComponentDescription.test_parallel(self.rotation_axis.direction, self.detector.direction_y)
#rotation axis to detector is parallel with centre ray
if numpy.allclose(self.rotation_axis.position, self.detector.position): #points are equal
rotation_axis_centred = True
else:
vec_b = ComponentDescription.create_unit_vector(self.detector.position - self.rotation_axis.position )
rotation_axis_centred = ComponentDescription.test_parallel(vec_src2det, vec_b)
if not principal_ray_centred or\
not centre_ray_perpendicular_rotation or\
not rotation_parallel_detector_y:
config = SystemConfiguration.SYSTEM_ADVANCED
elif not rotation_axis_centred:
config = SystemConfiguration.SYSTEM_OFFSET
else:
config = SystemConfiguration.SYSTEM_SIMPLE
return config
def get_centre_slice(self):
"""Returns the 2D system configuration corresponding to the centre slice
"""
#requires the rotate axis to be perpendicular to the normal of the detector, and perpendicular to detector_direction_x
dp1 = self.rotation_axis.direction.dot(self.detector.normal)
dp2 = self.rotation_axis.direction.dot(self.detector.direction_x)
if numpy.isclose(dp1, 0) and numpy.isclose(dp2, 0):
temp = self.copy()
temp.align_reference_frame()
source_position = temp.source.position[0:2]
detector_position = temp.detector.position[0:2]
detector_direction_x = temp.detector.direction_x[0:2]
rotation_axis_position = temp.rotation_axis.position[0:2]
return Cone2D(source_position, detector_position, detector_direction_x, rotation_axis_position)
else:
raise ValueError('Cannot convert geometry to 2D. Requires axis of rotation to be perpendicular to the detector.')
def __str__(self):
def csv(val):
return numpy.array2string(val, separator=', ')
repres = "3D Cone-beam tomography\n"
repres += "System configuration:\n"
repres += "\tSource position: {0}\n".format(csv(self.source.position))
repres += "\tRotation axis position: {0}\n".format(csv(self.rotation_axis.position))
repres += "\tRotation axis direction: {0}\n".format(csv(self.rotation_axis.direction))
repres += "\tDetector position: {0}\n".format(csv(self.detector.position))
repres += "\tDetector direction x: {0}\n".format(csv(self.detector.direction_x))
repres += "\tDetector direction y: {0}\n".format(csv(self.detector.direction_y))
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if numpy.allclose(self.source.position, other.source.position) \
and numpy.allclose(self.detector.position, other.detector.position)\
and numpy.allclose(self.detector.direction_x, other.detector.direction_x)\
and numpy.allclose(self.detector.direction_y, other.detector.direction_y)\
and numpy.allclose(self.rotation_axis.position, other.rotation_axis.position)\
and numpy.allclose(self.rotation_axis.direction, other.rotation_axis.direction):
return True
return False
def calculate_magnification(self):
ab = (self.rotation_axis.position - self.source.position)
dist_source_center = float(numpy.sqrt(ab.dot(ab)))
ab_unit = ab / numpy.sqrt(ab.dot(ab))
n = self.detector.normal
#perpendicular distance between source and detector centre
sd = float((self.detector.position - self.source.position).dot(n))
ratio = float(ab_unit.dot(n))
source_to_detector = sd / ratio
dist_center_detector = source_to_detector - dist_source_center
magnification = (dist_center_detector + dist_source_center) / dist_source_center
return [dist_source_center, dist_center_detector, magnification]
def rotation_axis_on_detector(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the world coordinate system
Returns
-------
PositionDirectionVector
Position and direction in the 3D system
"""
#calculate the intersection with the detector, of source to pv
Pv = self.rotation_axis.position
vec_a = Pv - self.source.position
ratio = (self.detector.position - Pv).dot(self.detector.normal) / vec_a.dot(self.detector.normal)
point1 = Pv + vec_a * ratio
#calculate the intersection with the detector, of source to pv
Pv = self.rotation_axis.position + self.rotation_axis.direction
vec_a = Pv - self.source.position
ratio = (self.detector.position - Pv).dot(self.detector.normal) / vec_a.dot(self.detector.normal)
point2 = Pv + vec_a * ratio
out = PositionDirectionVector(3)
out.position = point1
out.direction = point2 - point1
return out
def calculate_centre_of_rotation(self):
"""
Calculates the position, on the detector, of the projection of the rotation axis in the detector coordinate system
Note
----
- Origin is in the centre of the detector
- Axes directions are specified by detector.direction_x, detector.direction_y
- Units are the units of distance used to specify the component's positions
Returns
-------
Float
Offset position along the detector x_axis at y=0
Float
Angle between the y_axis and the rotation axis projection, in radians
"""
rotate_axis_projection = self.rotation_axis_on_detector()
p1 = rotate_axis_projection.position
p2 = p1 + rotate_axis_projection.direction
#point1 and point2 are on the detector plane. need to return them in the detector coordinate system
dp1 = p1 - self.detector.position
x1 = self.detector.direction_x.dot(dp1)
y1 = self.detector.direction_y.dot(dp1)
dp2 = p2 - self.detector.position
x2 = self.detector.direction_x.dot(dp2)
y2 = self.detector.direction_y.dot(dp2)
#y = m * x + c
#c = y1 - m * x1
#when y is 0
#x=-c/m
#x_y0 = -y1/m + x1
offset_x_y0 = x1 -y1 * (x2 - x1)/(y2-y1)
angle = math.atan2(x2 - x1, y2 - y1)
offset = offset_x_y0
return (offset, angle)
def set_centre_of_rotation(self, offset, angle):
""" Configures the geometry to have the requested centre of rotation offset at the detector
"""
#two points on the detector
x1 = offset
y1 = 0
x2 = offset + math.tan(angle)
y2 = 1
#convert to 3d coordinates in system frame
p1 = self.detector.position + x1 * self.detector.direction_x + y1 * self.detector.direction_y
p2 = self.detector.position + x2 * self.detector.direction_x + y2 * self.detector.direction_y
# vectors from source define plane
sp1 = p1 - self.source.position
sp2 = p2 - self.source.position
#find vector intersection with a plane defined by rotate axis (pos and dir) and det_x direction
plane_normal = numpy.cross(self.rotation_axis.direction, self.detector.direction_x)
ratio = (self.rotation_axis.position - self.source.position).dot(plane_normal) / sp1.dot(plane_normal)
p1_on_plane = self.source.position + sp1 * ratio
ratio = (self.rotation_axis.position - self.source.position).dot(plane_normal) / sp2.dot(plane_normal)
p2_on_plane = self.source.position + sp2 * ratio
self.rotation_axis.position = p1_on_plane
self.rotation_axis.direction = p2_on_plane - p1_on_plane
class Panel(object):
r'''This is a class describing the panel of the system.
:param num_pixels: num_pixels_h or (num_pixels_h, num_pixels_v) containing the number of pixels of the panel
:type num_pixels: int, list, tuple
:param pixel_size: pixel_size_h or (pixel_size_h, pixel_size_v) containing the size of the pixels of the panel
:type pixel_size: int, lust, tuple
:param origin: the position of pixel 0 (the data origin) of the panel `top-left`, `top-right`, `bottom-left`, `bottom-right`
:type origin: string
'''
@property
def num_pixels(self):
return self._num_pixels
@num_pixels.setter
def num_pixels(self, val):
if isinstance(val,int):
num_pixels_temp = [val, 1]
else:
try:
length_val = len(val)
except:
raise TypeError('num_pixels expected int x or [int x, int y]. Got {}'.format(type(val)))
if length_val == 2:
try:
val0 = int(val[0])
val1 = int(val[1])
except:
raise TypeError('num_pixels expected int x or [int x, int y]. Got {0},{1}'.format(type(val[0]), type(val[1])))
num_pixels_temp = [val0, val1]
else:
raise ValueError('num_pixels expected int x or [int x, int y]. Got {}'.format(val))
if num_pixels_temp[1] > 1 and self._dimension == 2:
raise ValueError('2D acquisitions expects a 1D panel. Expected num_pixels[1] = 1. Got {}'.format(num_pixels_temp[1]))
if num_pixels_temp[0] < 1 or num_pixels_temp[1] < 1:
raise ValueError('num_pixels (x,y) must be >= (1,1). Got {}'.format(num_pixels_temp))
else:
self._num_pixels = numpy.array(num_pixels_temp, dtype=numpy.int16)
@property
def pixel_size(self):
return self._pixel_size
@pixel_size.setter
def pixel_size(self, val):
if val is None:
pixel_size_temp = [1.0,1.0]
else:
try:
length_val = len(val)
except:
try:
temp = float(val)
pixel_size_temp = [temp, temp]
except:
raise TypeError('pixel_size expected float xy or [float x, float y]. Got {}'.format(val))
else:
if length_val == 2:
try:
temp0 = float(val[0])
temp1 = float(val[1])
pixel_size_temp = [temp0, temp1]
except:
raise ValueError('pixel_size expected float xy or [float x, float y]. Got {}'.format(val))
else:
raise ValueError('pixel_size expected float xy or [float x, float y]. Got {}'.format(val))
if pixel_size_temp[0] <= 0 or pixel_size_temp[1] <= 0:
raise ValueError('pixel_size (x,y) at must be > (0.,0.). Got {}'.format(pixel_size_temp))
self._pixel_size = numpy.array(pixel_size_temp)
@property
def origin(self):
return self._origin
@origin.setter
def origin(self, val):
allowed = ['top-left', 'top-right','bottom-left','bottom-right']
if val in allowed:
self._origin=val
else:
raise ValueError('origin expected one of {0}. Got {1}'.format(allowed, val))
def __str__(self):
repres = "Panel configuration:\n"
repres += "\tNumber of pixels: {0}\n".format(self.num_pixels)
repres += "\tPixel size: {0}\n".format(self.pixel_size)
repres += "\tPixel origin: {0}\n".format(self.origin)
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if numpy.array_equal(self.num_pixels, other.num_pixels) \
and numpy.allclose(self.pixel_size, other.pixel_size) \
and self.origin == other.origin:
return True
return False
def __init__ (self, num_pixels, pixel_size, origin, dimension):
"""Constructor method
"""
self._dimension = dimension
self.num_pixels = num_pixels
self.pixel_size = pixel_size
self.origin = origin
class Channels(object):
r'''This is a class describing the channels of the data.
This will be created on initialisation of AcquisitionGeometry.
:param num_channels: The number of channels of data
:type num_channels: int
:param channel_labels: A list of channel labels
:type channel_labels: list, optional
'''
@property
def num_channels(self):
return self._num_channels
@num_channels.setter
def num_channels(self, val):
try:
val = int(val)
except TypeError:
raise ValueError('num_channels expected a positive integer. Got {}'.format(type(val)))
if val > 0:
self._num_channels = val
else:
raise ValueError('num_channels expected a positive integer. Got {}'.format(val))
@property
def channel_labels(self):
return self._channel_labels
@channel_labels.setter
def channel_labels(self, val):
if val is None or len(val) == self._num_channels:
self._channel_labels = val
else:
raise ValueError('labels expected to have length {0}. Got {1}'.format(self._num_channels, len(val)))
def __str__(self):
repres = "Channel configuration:\n"
repres += "\tNumber of channels: {0}\n".format(self.num_channels)
num_print=min(10,self.num_channels)
if hasattr(self, 'channel_labels'):
repres += "\tChannel labels 0-{0}: {1}\n".format(num_print, self.channel_labels[0:num_print])
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.num_channels != other.num_channels:
return False
if hasattr(self,'channel_labels'):
if self.channel_labels != other.channel_labels:
return False
return True
def __init__ (self, num_channels, channel_labels):
"""Constructor method
"""
self.num_channels = num_channels
if channel_labels is not None:
self.channel_labels = channel_labels
class Angles(object):
r'''This is a class describing the angles of the data.
:param angles: The angular positions of the acquisition data
:type angles: list, ndarray
:param initial_angle: The angular offset of the object from the reference frame
:type initial_angle: float, optional
:param angle_unit: The units of the stored angles 'degree' or 'radian'
:type angle_unit: string
'''
@property
def angle_data(self):
return self._angle_data
@angle_data.setter
def angle_data(self, val):
if val is None:
raise ValueError('angle_data expected to be a list of floats')
else:
try:
self.num_positions = len(val)
except TypeError:
self.num_positions = 1
val = [val]
finally:
try:
self._angle_data = numpy.asarray(val, dtype=numpy.float32)
except:
raise ValueError('angle_data expected to be a list of floats')
@property
def initial_angle(self):
return self._initial_angle
@initial_angle.setter
def initial_angle(self, val):
try:
val = float(val)
except:
raise TypeError('initial_angle expected a float. Got {0}'.format(type(val)))
self._initial_angle = val
@property
def angle_unit(self):
return self._angle_unit
@angle_unit.setter
def angle_unit(self,val):
if val != AcquisitionGeometry.DEGREE and val != AcquisitionGeometry.RADIAN:
raise ValueError('angle_unit = {} not recognised please specify \'degree\' or \'radian\''.format(val))
else:
self._angle_unit = val
def __str__(self):
repres = "Acquisition description:\n"
repres += "\tNumber of positions: {0}\n".format(self.num_positions)
num_print=min(20,self.num_positions)
repres += "\tAngles 0-{0} in {1}s:\n{2}\n".format(num_print, self.angle_unit, numpy.array2string(self.angle_data[0:num_print], separator=', '))
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.angle_unit != other.angle_unit:
return False
if self.initial_angle != other.initial_angle:
return False
if not numpy.allclose(self.angle_data, other.angle_data):
return False
return True
def __init__ (self, angles, initial_angle, angle_unit):
"""Constructor method
"""
self.angle_data = angles
self.initial_angle = initial_angle
self.angle_unit = angle_unit
class Configuration(object):
r'''This class holds the description of the system components.
'''
def __init__(self, units_distance='units distance'):
self.system = None #has distances
self.angles = None #has angles
self.panel = None #has distances
self.channels = Channels(1, None)
self.units = units_distance
@property
def configured(self):
if self.system is None:
print("Please configure AcquisitionGeometry using one of the following methods:\
\n\tAcquisitionGeometry.create_Parallel2D()\
\n\tAcquisitionGeometry.create_Cone3D()\
\n\tAcquisitionGeometry.create_Parallel2D()\
\n\tAcquisitionGeometry.create_Cone3D()")
return False
configured = True
if self.angles is None:
print("Please configure angular data using the set_angles() method")
configured = False
if self.panel is None:
print("Please configure the panel using the set_panel() method")
configured = False
return configured
def shift_detector_in_plane(self,
pixel_offset,
direction='horizontal'):
"""
Adjusts the position of the detector in a specified direction within the imaging plane.
Parameters:
-----------
pixel_offset : float
The number of pixels to adjust the detector's position by.
direction : {'horizontal', 'vertical'}, optional
The direction in which to adjust the detector's position. Defaults to 'horizontal'.
Notes:
------
- If `direction` is 'horizontal':
- If the panel's origin is 'left', positive offsets translate the detector to the right.
- If the panel's origin is 'right', positive offsets translate the detector to the left.
- If `direction` is 'vertical':
- If the panel's origin is 'bottom', positive offsets translate the detector upward.
- If the panel's origin is 'top', positive offsets translate the detector downward.
Returns:
--------
None
"""
if direction == 'horizontal':
pixel_size = self.panel.pixel_size[0]
pixel_direction = self.system.detector.direction_x
elif direction == 'vertical':
pixel_size = self.panel.pixel_size[1]
pixel_direction = self.system.detector.direction_y
if 'bottom' in self.panel.origin or 'left' in self.panel.origin:
self.system.detector.position -= pixel_offset * pixel_direction * pixel_size
else:
self.system.detector.position += pixel_offset * pixel_direction * pixel_size
def __str__(self):
repres = ""
if self.configured:
repres += str(self.system)
repres += str(self.panel)
repres += str(self.channels)
repres += str(self.angles)
repres += "Distances in units: {}".format(self.units)
return repres
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.system == other.system\
and self.panel == other.panel\
and self.channels == other.channels\
and self.angles == other.angles:
return True
return False
[docs]class AcquisitionGeometry(object):
"""This class holds the AcquisitionGeometry of the system.
Please initialise the AcquisitionGeometry using the using the static methods:
`AcquisitionGeometry.create_Parallel2D()`
`AcquisitionGeometry.create_Cone2D()`
`AcquisitionGeometry.create_Parallel3D()`
`AcquisitionGeometry.create_Cone3D()`
"""
RANDOM = 'random'
RANDOM_INT = 'random_int'
ANGLE_UNIT = 'angle_unit'
DEGREE = 'degree'
RADIAN = 'radian'
CHANNEL = 'channel'
ANGLE = 'angle'
VERTICAL = 'vertical'
HORIZONTAL = 'horizontal'
PARALLEL = 'parallel'
CONE = 'cone'
DIM2 = '2D'
DIM3 = '3D'
#for backwards compatibility
@property
def geom_type(self):
return self.config.system.geometry
@property
def num_projections(self):
return len(self.angles)
@property
def pixel_num_h(self):
return self.config.panel.num_pixels[0]
@pixel_num_h.setter
def pixel_num_h(self, val):
self.config.panel.num_pixels[0] = val
@property
def pixel_num_v(self):
return self.config.panel.num_pixels[1]
@pixel_num_v.setter
def pixel_num_v(self, val):
self.config.panel.num_pixels[1] = val
@property
def pixel_size_h(self):
return self.config.panel.pixel_size[0]
@pixel_size_h.setter
def pixel_size_h(self, val):
self.config.panel.pixel_size[0] = val
@property
def pixel_size_v(self):
return self.config.panel.pixel_size[1]
@pixel_size_v.setter
def pixel_size_v(self, val):
self.config.panel.pixel_size[1] = val
@property
def channels(self):
return self.config.channels.num_channels
@property
def angles(self):
return self.config.angles.angle_data
@property
def dist_source_center(self):
out = self.config.system.calculate_magnification()
return out[0]
@property
def dist_center_detector(self):
out = self.config.system.calculate_magnification()
return out[1]
@property
def magnification(self):
out = self.config.system.calculate_magnification()
return out[2]
@property
def dimension(self):
return self.config.system.dimension
@property
def shape(self):
shape_dict = {AcquisitionGeometry.CHANNEL: self.config.channels.num_channels,
AcquisitionGeometry.ANGLE: self.config.angles.num_positions,
AcquisitionGeometry.VERTICAL: self.config.panel.num_pixels[1],
AcquisitionGeometry.HORIZONTAL: self.config.panel.num_pixels[0]}
shape = []
for label in self.dimension_labels:
shape.append(shape_dict[label])
return tuple(shape)
@property
def dimension_labels(self):
labels_default = DataOrder.CIL_AG_LABELS
shape_default = [self.config.channels.num_channels,
self.config.angles.num_positions,
self.config.panel.num_pixels[1],
self.config.panel.num_pixels[0]
]
try:
labels = list(self._dimension_labels)
except AttributeError:
labels = labels_default.copy()
#remove from list labels where len == 1
#
for i, x in enumerate(shape_default):
if x == 0 or x==1:
try:
labels.remove(labels_default[i])
except ValueError:
pass #if not in custom list carry on
return tuple(labels)
@dimension_labels.setter
def dimension_labels(self, val):
labels_default = DataOrder.CIL_AG_LABELS
#check input and store. This value is not used directly
if val is not None:
for x in val:
if x not in labels_default:
raise ValueError('Requested axis are not possible. Accepted label names {},\ngot {}'.format(labels_default,val))
self._dimension_labels = tuple(val)
@property
def ndim(self):
return len(self.dimension_labels)
@property
def system_description(self):
return self.config.system.system_description()
@property
def dtype(self):
return self._dtype
@dtype.setter
def dtype(self, val):
self._dtype = val
def __init__(self):
self._dtype = numpy.float32
def get_centre_of_rotation(self, distance_units='default', angle_units='radian'):
"""
Returns the system centre of rotation offset at the detector
Note
----
- Origin is in the centre of the detector
- Axes directions are specified by detector.direction_x, detector.direction_y
Parameters
----------
distance_units : string, default='default'
Units of distance used to calculate the return values.
'default' uses the same units the system and panel were specified in.
'pixels' uses pixels sizes in the horizontal and vertical directions as appropriate.
angle_units : string
Units to return the angle in. Can take 'radian' or 'degree'.
Returns
-------
Dictionary
{'offset': (offset, distance_units), 'angle': (angle, angle_units)}
where,
'offset' gives the position along the detector x_axis at y=0
'angle' gives the angle between the y_axis and the projection of the rotation axis on the detector
"""
if hasattr(self.config.system, 'calculate_centre_of_rotation'):
offset_distance, angle_rad = self.config.system.calculate_centre_of_rotation()
else:
raise NotImplementedError
if distance_units == 'default':
offset = offset_distance
offset_units = self.config.units
elif distance_units == 'pixels':
offset = offset_distance/ self.config.panel.pixel_size[0]
offset_units = 'pixels'
if self.dimension == '3D' and self.config.panel.pixel_size[0] != self.config.panel.pixel_size[1]:
#if aspect ratio of pixels isn't 1:1 need to convert angle by new ratio
y_pix = 1 /self.config.panel.pixel_size[1]
x_pix = math.tan(angle_rad)/self.config.panel.pixel_size[0]
angle_rad = math.atan2(x_pix,y_pix)
else:
raise ValueError("`distance_units` is not recognised. Must be 'default' or 'pixels'. Got {}".format(distance_units))
if angle_units == 'radian':
angle = angle_rad
ang_units = 'radian'
elif angle_units == 'degree':
angle = numpy.degrees(angle_rad)
ang_units = 'degree'
else:
raise ValueError("`angle_units` is not recognised. Must be 'radian' or 'degree'. Got {}".format(angle_units))
return {'offset': (offset, offset_units), 'angle': (angle, ang_units)}
def set_centre_of_rotation(self, offset=0.0, distance_units='default', angle=0.0, angle_units='radian'):
"""
Configures the system geometry to have the requested centre of rotation offset at the detector.
Note
----
- Origin is in the centre of the detector
- Axes directions are specified by detector.direction_x, detector.direction_y
Parameters
----------
offset: float, default 0.0
The position of the centre of rotation along the detector x_axis at y=0
distance_units : string, default='default'
Units the offset is specified in. Can be 'default'or 'pixels'.
'default' interprets the input as same units the system and panel were specified in.
'pixels' interprets the input in horizontal pixels.
angle: float, default=0.0
The angle between the detector y_axis and the rotation axis direction on the detector
Notes
-----
If aspect ratio of pixels is not 1:1 ensure the angle is calculated from the x and y values in the correct units.
angle_units : string, default='radian'
Units the angle is specified in. Can take 'radian' or 'degree'.
"""
if not hasattr(self.config.system, 'set_centre_of_rotation'):
raise NotImplementedError()
if angle_units == 'radian':
angle_rad = angle
elif angle_units == 'degree':
angle_rad = numpy.radians(angle)
else:
raise ValueError("`angle_units` is not recognised. Must be 'radian' or 'degree'. Got {}".format(angle_units))
if distance_units =='default':
offset_distance = offset
elif distance_units =='pixels':
offset_distance = offset * self.config.panel.pixel_size[0]
else:
raise ValueError("`distance_units` is not recognised. Must be 'default' or 'pixels'. Got {}".format(distance_units))
if self.dimension == '2D':
self.config.system.set_centre_of_rotation(offset_distance)
else:
self.config.system.set_centre_of_rotation(offset_distance, angle_rad)
def set_centre_of_rotation_by_slice(self, offset1, slice_index1=None, offset2=None, slice_index2=None):
"""
Configures the system geometry to have the requested centre of rotation offset at the detector.
If two slices are passed the rotation axis will be rotated to pass through both points.
Note
----
- Offset is specified in pixels
- Offset can be sub-pixels
- Offset direction is specified by detector.direction_x
Parameters
----------
offset1: float
The offset from the centre of the detector to the projected rotation position at slice_index_1
slice_index1: int, optional
The slice number of offset1
offset2: float, optional
The offset from the centre of the detector to the projected rotation position at slice_index_2
slice_index2: int, optional
The slice number of offset2
"""
if not hasattr(self.config.system, 'set_centre_of_rotation'):
raise NotImplementedError()
if self.dimension == '2D':
if offset2 is not None:
logging.WARNING("Only offset1 is being used")
self.set_centre_of_rotation(offset1)
if offset2 is None or offset1 == offset2:
offset_x_y0 = offset1
angle = 0
else:
if slice_index1 is None or slice_index2 is None or slice_index1 == slice_index2:
raise ValueError("Cannot calculate angle. Please specify `slice_index1` and `slice_index2` to define a rotated axis")
offset_x_y0 = offset1 -slice_index1 * (offset2 - offset1)/(slice_index2-slice_index1)
angle = math.atan2(offset2 - offset1, slice_index2 - slice_index1)
self.set_centre_of_rotation(offset_x_y0, 'pixels', angle, 'radian')
[docs] def set_angles(self, angles, initial_angle=0, angle_unit='degree'):
r'''This method configures the angular information of an AcquisitionGeometry object.
:param angles: The angular positions of the acquisition data
:type angles: list, ndarray
:param initial_angle: The angular offset of the object from the reference frame
:type initial_angle: float, optional
:param angle_unit: The units of the stored angles 'degree' or 'radian'
:type angle_unit: string
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
self.config.angles = Angles(angles, initial_angle, angle_unit)
return self
[docs] def set_panel(self, num_pixels, pixel_size=(1,1), origin='bottom-left'):
r'''This method configures the panel information of an AcquisitionGeometry object.
:param num_pixels: num_pixels_h or (num_pixels_h, num_pixels_v) containing the number of pixels of the panel
:type num_pixels: int, list, tuple
:param pixel_size: pixel_size_h or (pixel_size_h, pixel_size_v) containing the size of the pixels of the panel
:type pixel_size: int, list, tuple, optional
:param origin: the position of pixel 0 (the data origin) of the panel 'top-left', 'top-right', 'bottom-left', 'bottom-right'
:type origin: string, default 'bottom-left'
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
self.config.panel = Panel(num_pixels, pixel_size, origin, self.config.system._dimension)
return self
[docs] def set_channels(self, num_channels=1, channel_labels=None):
r'''This method configures the channel information of an AcquisitionGeometry object.
:param num_channels: The number of channels of data
:type num_channels: int, optional
:param channel_labels: A list of channel labels
:type channel_labels: list, optional
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
self.config.channels = Channels(num_channels, channel_labels)
return self
[docs] def set_labels(self, labels=None):
r'''This method configures the dimension labels of an AcquisitionGeometry object.
:param labels: The order of the dimensions describing the data.\
Expects a list containing at least one of the unique labels: 'channel' 'angle' 'vertical' 'horizontal'
default = ['channel','angle','vertical','horizontal']
:type labels: list, optional
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
self.dimension_labels = labels
return self
[docs] @staticmethod
def create_Parallel2D(ray_direction=[0, 1], detector_position=[0, 0], detector_direction_x=[1, 0], rotation_axis_position=[0, 0], units='units distance'):
r'''This creates the AcquisitionGeometry for a parallel beam 2D tomographic system
:param ray_direction: A 2D vector describing the x-ray direction (x,y)
:type ray_direction: list, tuple, ndarray, optional
:param detector_position: A 2D vector describing the position of the centre of the detector (x,y)
:type detector_position: list, tuple, ndarray, optional
:param detector_direction_x: A 2D vector describing the direction of the detector_x (x,y)
:type detector_direction_x: list, tuple, ndarray
:param rotation_axis_position: A 2D vector describing the position of the axis of rotation (x,y)
:type rotation_axis_position: list, tuple, ndarray, optional
:param units: Label the units of distance used for the configuration, these should be consistent for the geometry and panel
:type units: string
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
AG = AcquisitionGeometry()
AG.config = Configuration(units)
AG.config.system = Parallel2D(ray_direction, detector_position, detector_direction_x, rotation_axis_position, units)
return AG
[docs] @staticmethod
def create_Cone2D(source_position, detector_position, detector_direction_x=[1,0], rotation_axis_position=[0,0], units='units distance'):
r'''This creates the AcquisitionGeometry for a cone beam 2D tomographic system
:param source_position: A 2D vector describing the position of the source (x,y)
:type source_position: list, tuple, ndarray
:param detector_position: A 2D vector describing the position of the centre of the detector (x,y)
:type detector_position: list, tuple, ndarray
:param detector_direction_x: A 2D vector describing the direction of the detector_x (x,y)
:type detector_direction_x: list, tuple, ndarray
:param rotation_axis_position: A 2D vector describing the position of the axis of rotation (x,y)
:type rotation_axis_position: list, tuple, ndarray, optional
:param units: Label the units of distance used for the configuration, these should be consistent for the geometry and panel
:type units: string
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
AG = AcquisitionGeometry()
AG.config = Configuration(units)
AG.config.system = Cone2D(source_position, detector_position, detector_direction_x, rotation_axis_position, units)
return AG
[docs] @staticmethod
def create_Parallel3D(ray_direction=[0,1,0], detector_position=[0,0,0], detector_direction_x=[1,0,0], detector_direction_y=[0,0,1], rotation_axis_position=[0,0,0], rotation_axis_direction=[0,0,1], units='units distance'):
r'''This creates the AcquisitionGeometry for a parallel beam 3D tomographic system
:param ray_direction: A 3D vector describing the x-ray direction (x,y,z)
:type ray_direction: list, tuple, ndarray, optional
:param detector_position: A 3D vector describing the position of the centre of the detector (x,y,z)
:type detector_position: list, tuple, ndarray, optional
:param detector_direction_x: A 3D vector describing the direction of the detector_x (x,y,z)
:type detector_direction_x: list, tuple, ndarray
:param detector_direction_y: A 3D vector describing the direction of the detector_y (x,y,z)
:type detector_direction_y: list, tuple, ndarray
:param rotation_axis_position: A 3D vector describing the position of the axis of rotation (x,y,z)
:type rotation_axis_position: list, tuple, ndarray, optional
:param rotation_axis_direction: A 3D vector describing the direction of the axis of rotation (x,y,z)
:type rotation_axis_direction: list, tuple, ndarray, optional
:param units: Label the units of distance used for the configuration, these should be consistent for the geometry and panel
:type units: string
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
AG = AcquisitionGeometry()
AG.config = Configuration(units)
AG.config.system = Parallel3D(ray_direction, detector_position, detector_direction_x, detector_direction_y, rotation_axis_position, rotation_axis_direction, units)
return AG
[docs] @staticmethod
def create_Cone3D(source_position, detector_position, detector_direction_x=[1,0,0], detector_direction_y=[0,0,1], rotation_axis_position=[0,0,0], rotation_axis_direction=[0,0,1], units='units distance'):
r'''This creates the AcquisitionGeometry for a cone beam 3D tomographic system
:param source_position: A 3D vector describing the position of the source (x,y,z)
:type source_position: list, tuple, ndarray, optional
:param detector_position: A 3D vector describing the position of the centre of the detector (x,y,z)
:type detector_position: list, tuple, ndarray, optional
:param detector_direction_x: A 3D vector describing the direction of the detector_x (x,y,z)
:type detector_direction_x: list, tuple, ndarray
:param detector_direction_y: A 3D vector describing the direction of the detector_y (x,y,z)
:type detector_direction_y: list, tuple, ndarray
:param rotation_axis_position: A 3D vector describing the position of the axis of rotation (x,y,z)
:type rotation_axis_position: list, tuple, ndarray, optional
:param rotation_axis_direction: A 3D vector describing the direction of the axis of rotation (x,y,z)
:type rotation_axis_direction: list, tuple, ndarray, optional
:param units: Label the units of distance used for the configuration, these should be consistent for the geometry and panel
:type units: string
:return: returns a configured AcquisitionGeometry object
:rtype: AcquisitionGeometry
'''
AG = AcquisitionGeometry()
AG.config = Configuration(units)
AG.config.system = Cone3D(source_position, detector_position, detector_direction_x, detector_direction_y, rotation_axis_position, rotation_axis_direction, units)
return AG
def get_order_by_label(self, dimension_labels, default_dimension_labels):
order = []
for i, el in enumerate(default_dimension_labels):
for j, ek in enumerate(dimension_labels):
if el == ek:
order.append(j)
break
return order
def __eq__(self, other):
if isinstance(other, self.__class__) and self.config == other.config :
return True
return False
def clone(self):
'''returns a copy of the AcquisitionGeometry'''
return copy.deepcopy(self)
def copy(self):
'''alias of clone'''
return self.clone()
def get_centre_slice(self):
'''returns a 2D AcquisitionGeometry that corresponds to the centre slice of the input'''
if self.dimension == '2D':
return self
AG_2D = copy.deepcopy(self)
AG_2D.config.system = self.config.system.get_centre_slice()
AG_2D.config.panel.num_pixels[1] = 1
AG_2D.config.panel.pixel_size[1] = abs(self.config.system.detector.direction_y[2]) * self.config.panel.pixel_size[1]
return AG_2D
[docs] def get_ImageGeometry(self, resolution=1.0):
'''returns a default configured ImageGeometry object based on the AcquisitionGeomerty'''
num_voxel_xy = int(numpy.ceil(self.config.panel.num_pixels[0] * resolution))
voxel_size_xy = self.config.panel.pixel_size[0] / (resolution * self.magnification)
if self.dimension == '3D':
num_voxel_z = int(numpy.ceil(self.config.panel.num_pixels[1] * resolution))
voxel_size_z = self.config.panel.pixel_size[1] / (resolution * self.magnification)
else:
num_voxel_z = 0
voxel_size_z = 1
return ImageGeometry(num_voxel_xy, num_voxel_xy, num_voxel_z, voxel_size_xy, voxel_size_xy, voxel_size_z, channels=self.channels)
def __str__ (self):
return str(self.config)
[docs] def get_slice(self, channel=None, angle=None, vertical=None, horizontal=None):
'''
Returns a new AcquisitionGeometry of a single slice of in the requested direction. Will only return reconstructable geometries.
'''
geometry_new = self.copy()
if channel is not None:
geometry_new.config.channels.num_channels = 1
if hasattr(geometry_new.config.channels,'channel_labels'):
geometry_new.config.panel.channel_labels = geometry_new.config.panel.channel_labels[channel]
if angle is not None:
geometry_new.config.angles.angle_data = geometry_new.config.angles.angle_data[angle]
if vertical is not None:
if geometry_new.geom_type == AcquisitionGeometry.PARALLEL or vertical == 'centre' or abs(geometry_new.pixel_num_v/2 - vertical) < 1e-6:
geometry_new = geometry_new.get_centre_slice()
else:
raise ValueError("Can only subset centre slice geometry on cone-beam data. Expected vertical = 'centre'. Got vertical = {0}".format(vertical))
if horizontal is not None:
raise ValueError("Cannot calculate system geometry for a horizontal slice")
return geometry_new
[docs] def allocate(self, value=0, **kwargs):
'''allocates an AcquisitionData according to the size expressed in the instance
:param value: accepts numbers to allocate an uniform array, or a string as 'random' or 'random_int' to create a random array or None.
:type value: number or string, default None allocates empty memory block
:param dtype: numerical type to allocate
:type dtype: numpy type, default numpy.float32
'''
dtype = kwargs.get('dtype', self.dtype)
if kwargs.get('dimension_labels', None) is not None:
raise ValueError("Deprecated: 'dimension_labels' cannot be set with 'allocate()'. Use 'geometry.set_labels()' to modify the geometry before using allocate.")
out = AcquisitionData(geometry=self.copy(),
dtype=dtype,
suppress_warning=True)
if isinstance(value, Number):
# it's created empty, so we make it 0
out.array.fill(value)
else:
if value == AcquisitionGeometry.RANDOM:
seed = kwargs.get('seed', None)
if seed is not None:
numpy.random.seed(seed)
if numpy.iscomplexobj(out.array):
r = numpy.random.random_sample(self.shape) + 1j * numpy.random.random_sample(self.shape)
out.fill(r)
else:
out.fill(numpy.random.random_sample(self.shape))
elif value == AcquisitionGeometry.RANDOM_INT:
seed = kwargs.get('seed', None)
if seed is not None:
numpy.random.seed(seed)
max_value = kwargs.get('max_value', 100)
r = numpy.random.randint(max_value,size=self.shape, dtype=numpy.int32)
out.fill(numpy.asarray(r, dtype=self.dtype))
elif value is None:
pass
else:
raise ValueError('Value {} unknown'.format(value))
return out
[docs]class DataContainer(object):
'''Generic class to hold data
Data is currently held in a numpy arrays'''
@property
def geometry(self):
return None
@geometry.setter
def geometry(self, val):
if val is not None:
raise TypeError("DataContainers cannot hold a geometry, use ImageData or AcquisitionData instead")
@property
def dimension_labels(self):
if self._dimension_labels is None:
default_labels = [0]*self.number_of_dimensions
for i in range(self.number_of_dimensions):
default_labels[i] = 'dimension_{0:02}'.format(i)
return tuple(default_labels)
else:
return self._dimension_labels
@dimension_labels.setter
def dimension_labels(self, val):
if val is None:
self._dimension_labels = None
elif len(list(val))==self.number_of_dimensions:
self._dimension_labels = tuple(val)
else:
raise ValueError("dimension_labels expected a list containing {0} strings got {1}".format(self.number_of_dimensions, val))
@property
def shape(self):
'''Returns the shape of the DataContainer'''
return self.array.shape
@property
def ndim(self):
'''Returns the ndim of the DataContainer'''
return self.array.ndim
@shape.setter
def shape(self, val):
print("Deprecated - shape will be set automatically")
@property
def number_of_dimensions(self):
'''Returns the shape of the of the DataContainer'''
return len(self.array.shape)
@property
def dtype(self):
'''Returns the dtype of the data array.'''
return self.array.dtype
@property
def size(self):
'''Returns the number of elements of the DataContainer'''
return self.array.size
__container_priority__ = 1
def __init__ (self, array, deep_copy=True, dimension_labels=None,
**kwargs):
'''Holds the data'''
if type(array) == numpy.ndarray:
if deep_copy:
self.array = array.copy()
else:
self.array = array
else:
raise TypeError('Array must be NumpyArray, passed {0}'\
.format(type(array)))
#Don't set for derived classes
if type(self) is DataContainer:
self.dimension_labels = dimension_labels
# finally copy the geometry, and force dtype of the geometry of the data = the dype of the data
if 'geometry' in kwargs.keys():
self.geometry = kwargs['geometry']
try:
self.geometry.dtype = self.dtype
except:
pass
def get_dimension_size(self, dimension_label):
if dimension_label in self.dimension_labels:
i = self.dimension_labels.index(dimension_label)
return self.shape[i]
else:
raise ValueError('Unknown dimension {0}. Should be one of {1}'.format(dimension_label,
self.dimension_labels))
def get_dimension_axis(self, dimension_label):
if dimension_label in self.dimension_labels:
return self.dimension_labels.index(dimension_label)
else:
raise ValueError('Unknown dimension {0}. Should be one of {1}'.format(dimension_label,
self.dimension_labels))
[docs] def as_array(self):
'''Returns the pointer to the array.
'''
return self.array
[docs] def get_slice(self,**kw):
'''
Returns a new DataContainer containing a single slice of in the requested direction. \
Pass keyword arguments <dimension label>=index
'''
new_array = None
#get ordered list of current dimensions
dimension_labels_list = list(self.dimension_labels)
#remove axes from array and labels
for key, value in kw.items():
if value is not None:
axis = dimension_labels_list.index(key)
dimension_labels_list.remove(key)
if new_array is None:
new_array = self.as_array().take(indices=value, axis=axis)
else:
new_array = new_array.take(indices=value, axis=axis)
if new_array.ndim > 1:
return DataContainer(new_array, False, dimension_labels_list, suppress_warning=True)
else:
return VectorData(new_array, dimension_labels=dimension_labels_list)
[docs] def reorder(self, order=None):
'''
reorders the data in memory as requested.
:param order: ordered list of labels from self.dimension_labels, or order for engine 'astra' or 'tigre'
:type order: list, sting
'''
if order in DataOrder.ENGINES:
order = DataOrder.get_order_for_engine(order, self.geometry)
try:
if len(order) != len(self.shape):
raise ValueError('The axes list for resorting must have {0} dimensions. Got {1}'.format(len(self.shape), len(order)))
except TypeError as ae:
raise ValueError('The order must be an iterable with __len__ implemented, like a list or a tuple. Got {}'.format(type(order)))
correct = True
for el in order:
correct = correct and el in self.dimension_labels
if not correct:
raise ValueError('The axes list for resorting must contain the dimension_labels {0} got {1}'.format(self.dimension_labels, order))
new_order = [0]*len(self.shape)
dimension_labels_new = [0]*len(self.shape)
for i, axis in enumerate(order):
new_order[i] = self.dimension_labels.index(axis)
dimension_labels_new[i] = axis
self.array = numpy.ascontiguousarray(numpy.transpose(self.array, new_order))
if self.geometry is None:
self.dimension_labels = dimension_labels_new
else:
self.geometry.set_labels(dimension_labels_new)
[docs] def fill(self, array, **dimension):
'''fills the internal data array with the DataContainer, numpy array or number provided
:param array: number, numpy array or DataContainer to copy into the DataContainer
:type array: DataContainer or subclasses, numpy array or number
:param dimension: dictionary, optional
if the passed numpy array points to the same array that is contained in the DataContainer,
it just returns
In case a DataContainer or subclass is passed, there will be a check of the geometry,
if present, and the array will be resorted if the data is not in the appropriate order.
User may pass a named parameter to specify in which axis the fill should happen:
dc.fill(some_data, vertical=1, horizontal_x=32)
will copy the data in some_data into the data container.
'''
if id(array) == id(self.array):
return
if dimension == {}:
if isinstance(array, numpy.ndarray):
if array.shape != self.shape:
raise ValueError('Cannot fill with the provided array.' + \
'Expecting shape {0} got {1}'.format(
self.shape,array.shape))
numpy.copyto(self.array, array)
elif isinstance(array, Number):
self.array.fill(array)
elif issubclass(array.__class__ , DataContainer):
try:
if self.dimension_labels != array.dimension_labels:
raise ValueError('Input array is not in the same order as destination array. Use "array.reorder()"')
except AttributeError:
pass
if self.array.shape == array.shape:
numpy.copyto(self.array, array.array)
else:
raise ValueError('Cannot fill with the provided array.' + \
'Expecting shape {0} got {1}'.format(
self.shape,array.shape))
else:
raise TypeError('Can fill only with number, numpy array or DataContainer and subclasses. Got {}'.format(type(array)))
else:
axis = [':']* self.number_of_dimensions
dimension_labels = list(self.dimension_labels)
for k,v in dimension.items():
i = dimension_labels.index(k)
axis[i] = v
command = 'self.array['
i = 0
for el in axis:
if i > 0:
command += ','
command += str(el)
i+=1
if isinstance(array, numpy.ndarray):
command = command + "] = array[:]"
elif issubclass(array.__class__, DataContainer):
command = command + "] = array.as_array()[:]"
elif isinstance (array, Number):
command = command + "] = array"
else:
raise TypeError('Can fill only with number, numpy array or DataContainer and subclasses. Got {}'.format(type(array)))
exec(command)
def check_dimensions(self, other):
return self.shape == other.shape
## algebra
def __add__(self, other):
return self.add(other)
def __mul__(self, other):
return self.multiply(other)
def __sub__(self, other):
return self.subtract(other)
def __div__(self, other):
return self.divide(other)
def __truediv__(self, other):
return self.divide(other)
def __pow__(self, other):
return self.power(other)
# reverse operand
def __radd__(self, other):
return self + other
# __radd__
def __rsub__(self, other):
return (-1 * self) + other
# __rsub__
def __rmul__(self, other):
return self * other
# __rmul__
def __rdiv__(self, other):
tmp = self.power(-1)
tmp *= other
return tmp
# __rdiv__
def __rtruediv__(self, other):
return self.__rdiv__(other)
def __rpow__(self, other):
if isinstance(other, Number) :
fother = numpy.ones(numpy.shape(self.array)) * other
return type(self)(fother ** self.array ,
dimension_labels=self.dimension_labels,
geometry=self.geometry)
# __rpow__
# in-place arithmetic operators:
# (+=, -=, *=, /= , //=,
# must return self
def __iadd__(self, other):
kw = {'out':self}
return self.add(other, **kw)
def __imul__(self, other):
kw = {'out':self}
return self.multiply(other, **kw)
def __isub__(self, other):
kw = {'out':self}
return self.subtract(other, **kw)
def __idiv__(self, other):
kw = {'out':self}
return self.divide(other, **kw)
def __itruediv__(self, other):
kw = {'out':self}
return self.divide(other, **kw)
def __neg__(self):
'''negation operator'''
return -1 * self
def __str__ (self, representation=False):
repres = ""
repres += "Number of dimensions: {0}\n".format(self.number_of_dimensions)
repres += "Shape: {0}\n".format(self.shape)
repres += "Axis labels: {0}\n".format(self.dimension_labels)
if representation:
repres += "Representation: \n{0}\n".format(self.array)
return repres
[docs] def get_data_axes_order(self,new_order=None):
'''returns the axes label of self as a list
If new_order is None returns the labels of the axes as a sorted-by-key list.
If new_order is a list of length number_of_dimensions, returns a list
with the indices of the axes in new_order with respect to those in
self.dimension_labels: i.e.
>>> self.dimension_labels = {0:'horizontal',1:'vertical'}
>>> new_order = ['vertical','horizontal']
returns [1,0]
'''
if new_order is None:
return self.dimension_labels
else:
if len(new_order) == self.number_of_dimensions:
axes_order = [0]*len(self.shape)
for i, axis in enumerate(new_order):
axes_order[i] = self.dimension_labels.index(axis)
return axes_order
else:
raise ValueError('Expecting {0} axes, got {2}'\
.format(len(self.shape),len(new_order)))
[docs] def clone(self):
'''returns a copy of DataContainer'''
return copy.deepcopy(self)
[docs] def copy(self):
'''alias of clone'''
return self.clone()
## binary operations
def pixel_wise_binary(self, pwop, x2, *args, **kwargs):
out = kwargs.get('out', None)
if out is None:
if isinstance(x2, Number):
out = pwop(self.as_array() , x2 , *args, **kwargs )
elif issubclass(x2.__class__ , DataContainer):
out = pwop(self.as_array() , x2.as_array() , *args, **kwargs )
elif isinstance(x2, numpy.ndarray):
out = pwop(self.as_array() , x2 , *args, **kwargs )
else:
raise TypeError('Expected x2 type as number or DataContainer, got {}'.format(type(x2)))
geom = self.geometry
if geom is not None:
geom = self.geometry.copy()
return type(self)(out,
deep_copy=False,
dimension_labels=self.dimension_labels,
geometry= None if self.geometry is None else self.geometry.copy(),
suppress_warning=True)
elif issubclass(type(out), DataContainer) and issubclass(type(x2), DataContainer):
if self.check_dimensions(out) and self.check_dimensions(x2):
kwargs['out'] = out.as_array()
pwop(self.as_array(), x2.as_array(), *args, **kwargs )
#return type(self)(out.as_array(),
# deep_copy=False,
# dimension_labels=self.dimension_labels,
# geometry=self.geometry)
return out
else:
raise ValueError(message(type(self),"Wrong size for data memory: out {} x2 {} expected {}".format( out.shape,x2.shape ,self.shape)))
elif issubclass(type(out), DataContainer) and \
isinstance(x2, (Number, numpy.ndarray)):
if self.check_dimensions(out):
if isinstance(x2, numpy.ndarray) and\
not (x2.shape == self.shape and x2.dtype == self.dtype):
raise ValueError(message(type(self),
"Wrong size for data memory: out {} x2 {} expected {}"\
.format( out.shape,x2.shape ,self.shape)))
kwargs['out']=out.as_array()
pwop(self.as_array(), x2, *args, **kwargs )
return out
else:
raise ValueError(message(type(self),"Wrong size for data memory: ", out.shape,self.shape))
elif issubclass(type(out), numpy.ndarray):
if self.array.shape == out.shape and self.array.dtype == out.dtype:
kwargs['out'] = out
pwop(self.as_array(), x2, *args, **kwargs)
#return type(self)(out,
# deep_copy=False,
# dimension_labels=self.dimension_labels,
# geometry=self.geometry)
else:
raise ValueError (message(type(self), "incompatible class:" , pwop.__name__, type(out)))
def add(self, other, *args, **kwargs):
if hasattr(other, '__container_priority__') and \
self.__class__.__container_priority__ < other.__class__.__container_priority__:
return other.add(self, *args, **kwargs)
return self.pixel_wise_binary(numpy.add, other, *args, **kwargs)
def subtract(self, other, *args, **kwargs):
if hasattr(other, '__container_priority__') and \
self.__class__.__container_priority__ < other.__class__.__container_priority__:
return other.subtract(self, *args, **kwargs)
return self.pixel_wise_binary(numpy.subtract, other, *args, **kwargs)
def multiply(self, other, *args, **kwargs):
if hasattr(other, '__container_priority__') and \
self.__class__.__container_priority__ < other.__class__.__container_priority__:
return other.multiply(self, *args, **kwargs)
return self.pixel_wise_binary(numpy.multiply, other, *args, **kwargs)
def divide(self, other, *args, **kwargs):
if hasattr(other, '__container_priority__') and \
self.__class__.__container_priority__ < other.__class__.__container_priority__:
return other.divide(self, *args, **kwargs)
return self.pixel_wise_binary(numpy.divide, other, *args, **kwargs)
def power(self, other, *args, **kwargs):
return self.pixel_wise_binary(numpy.power, other, *args, **kwargs)
def maximum(self, x2, *args, **kwargs):
return self.pixel_wise_binary(numpy.maximum, x2, *args, **kwargs)
def minimum(self,x2, out=None, *args, **kwargs):
return self.pixel_wise_binary(numpy.minimum, x2=x2, out=out, *args, **kwargs)
[docs] def sapyb(self, a, y, b, out=None, num_threads=NUM_THREADS):
'''performs a*self + b * y. Can be done in-place
Parameters
----------
a : multiplier for self, can be a number or a numpy array or a DataContainer
y : DataContainer
b : multiplier for y, can be a number or a numpy array or a DataContainer
out : return DataContainer, if None a new DataContainer is returned, default None.
out can be self or y.
num_threads : number of threads to use during the calculation, using the CIL C library
It will try to use the CIL C library and default to numpy operations, in case the C library does not handle the types.
Example
-------
>>> a = 2
>>> b = 3
>>> ig = ImageGeometry(10,11)
>>> x = ig.allocate(1)
>>> y = ig.allocate(2)
>>> out = x.sapyb(a,y,b)
'''
ret_out = False
if out is None:
out = self * 0.
ret_out = True
if out.dtype in [ numpy.float32, numpy.float64 ]:
# handle with C-lib _axpby
try:
self._axpby(a, b, y, out, out.dtype, num_threads)
if ret_out:
return out
return
except RuntimeError as rte:
warnings.warn("sapyb defaulting to Python due to: {}".format(rte))
except TypeError as te:
warnings.warn("sapyb defaulting to Python due to: {}".format(te))
finally:
pass
# cannot be handled by _axpby
ax = self * a
y.multiply(b, out=out)
out.add(ax, out=out)
if ret_out:
return out
def _axpby(self, a, b, y, out, dtype=numpy.float32, num_threads=NUM_THREADS):
'''performs axpby with cilacc C library, can be done in-place.
Does the operation .. math:: a*x+b*y and stores the result in out, where x is self
:param a: scalar
:type a: float
:param b: scalar
:type b: float
:param y: DataContainer
:param out: DataContainer instance to store the result
:param dtype: data type of the DataContainers
:type dtype: numpy type, optional, default numpy.float32
:param num_threads: number of threads to run on
:type num_threads: int, optional, default 1/2 CPU of the system
'''
c_float_p = ctypes.POINTER(ctypes.c_float)
c_double_p = ctypes.POINTER(ctypes.c_double)
#convert a and b to numpy arrays and get the reference to the data (length = 1 or ndx.size)
try:
nda = a.as_array()
except:
nda = numpy.asarray(a)
try:
ndb = b.as_array()
except:
ndb = numpy.asarray(b)
a_vec = 0
if nda.size > 1:
a_vec = 1
b_vec = 0
if ndb.size > 1:
b_vec = 1
# get the reference to the data
ndx = self.as_array()
ndy = y.as_array()
ndout = out.as_array()
if ndout.dtype != dtype:
raise Warning("out array of type {0} does not match requested dtype {1}. Using {0}".format(ndout.dtype, dtype))
dtype = ndout.dtype
if ndx.dtype != dtype:
ndx = ndx.astype(dtype, casting='safe')
if ndy.dtype != dtype:
ndy = ndy.astype(dtype, casting='safe')
if nda.dtype != dtype:
nda = nda.astype(dtype, casting='same_kind')
if ndb.dtype != dtype:
ndb = ndb.astype(dtype, casting='same_kind')
if dtype == numpy.float32:
x_p = ndx.ctypes.data_as(c_float_p)
y_p = ndy.ctypes.data_as(c_float_p)
out_p = ndout.ctypes.data_as(c_float_p)
a_p = nda.ctypes.data_as(c_float_p)
b_p = ndb.ctypes.data_as(c_float_p)
f = cilacc.saxpby
elif dtype == numpy.float64:
x_p = ndx.ctypes.data_as(c_double_p)
y_p = ndy.ctypes.data_as(c_double_p)
out_p = ndout.ctypes.data_as(c_double_p)
a_p = nda.ctypes.data_as(c_double_p)
b_p = ndb.ctypes.data_as(c_double_p)
f = cilacc.daxpby
else:
raise TypeError('Unsupported type {}. Expecting numpy.float32 or numpy.float64'.format(dtype))
#out = numpy.empty_like(a)
# int psaxpby(float * x, float * y, float * out, float a, float b, long size)
cilacc.saxpby.argtypes = [ctypes.POINTER(ctypes.c_float), # pointer to the first array
ctypes.POINTER(ctypes.c_float), # pointer to the second array
ctypes.POINTER(ctypes.c_float), # pointer to the third array
ctypes.POINTER(ctypes.c_float), # pointer to A
ctypes.c_int, # type of type of A selector (int)
ctypes.POINTER(ctypes.c_float), # pointer to B
ctypes.c_int, # type of type of B selector (int)
ctypes.c_longlong, # type of size of first array
ctypes.c_int] # number of threads
cilacc.daxpby.argtypes = [ctypes.POINTER(ctypes.c_double), # pointer to the first array
ctypes.POINTER(ctypes.c_double), # pointer to the second array
ctypes.POINTER(ctypes.c_double), # pointer to the third array
ctypes.POINTER(ctypes.c_double), # type of A (c_double)
ctypes.c_int, # type of type of A selector (int)
ctypes.POINTER(ctypes.c_double), # type of B (c_double)
ctypes.c_int, # type of type of B selector (int)
ctypes.c_longlong, # type of size of first array
ctypes.c_int] # number of threads
if f(x_p, y_p, out_p, a_p, a_vec, b_p, b_vec, ndx.size, num_threads) != 0:
raise RuntimeError('axpby execution failed')
## unary operations
def pixel_wise_unary(self, pwop, *args, **kwargs):
out = kwargs.get('out', None)
if out is None:
out = pwop(self.as_array() , *args, **kwargs )
return type(self)(out,
deep_copy=False,
dimension_labels=self.dimension_labels,
geometry=self.geometry,
suppress_warning=True)
elif issubclass(type(out), DataContainer):
if self.check_dimensions(out):
kwargs['out'] = out.as_array()
pwop(self.as_array(), *args, **kwargs )
else:
raise ValueError(message(type(self),"Wrong size for data memory: ", out.shape,self.shape))
elif issubclass(type(out), numpy.ndarray):
if self.array.shape == out.shape and self.array.dtype == out.dtype:
kwargs['out'] = out
pwop(self.as_array(), *args, **kwargs)
else:
raise ValueError (message(type(self), "incompatible class:" , pwop.__name__, type(out)))
def abs(self, *args, **kwargs):
return self.pixel_wise_unary(numpy.abs, *args, **kwargs)
def sign(self, *args, **kwargs):
return self.pixel_wise_unary(numpy.sign, *args, **kwargs)
def sqrt(self, *args, **kwargs):
return self.pixel_wise_unary(numpy.sqrt, *args, **kwargs)
def conjugate(self, *args, **kwargs):
return self.pixel_wise_unary(numpy.conjugate, *args, **kwargs)
[docs] def exp(self, *args, **kwargs):
'''Applies exp pixel-wise to the DataContainer'''
return self.pixel_wise_unary(numpy.exp, *args, **kwargs)
[docs] def log(self, *args, **kwargs):
'''Applies log pixel-wise to the DataContainer'''
return self.pixel_wise_unary(numpy.log, *args, **kwargs)
## reductions
def sum(self, *args, **kwargs):
return self.as_array().sum(*args, **kwargs)
[docs] def squared_norm(self, **kwargs):
'''return the squared euclidean norm of the DataContainer viewed as a vector'''
#shape = self.shape
#size = reduce(lambda x,y:x*y, shape, 1)
#y = numpy.reshape(self.as_array(), (size, ))
return self.dot(self)
#return self.dot(self)
[docs] def norm(self, **kwargs):
'''return the euclidean norm of the DataContainer viewed as a vector'''
return numpy.sqrt(self.squared_norm(**kwargs))
[docs] def dot(self, other, *args, **kwargs):
'''returns the inner product of 2 DataContainers viewed as vectors. Suitable for real and complex data.
For complex data, the dot method returns a.dot(b.conjugate())
'''
method = kwargs.get('method', 'numpy')
if method not in ['numpy','reduce']:
raise ValueError('dot: specified method not valid. Expecting numpy or reduce got {} '.format(
method))
if self.shape == other.shape:
if method == 'numpy':
return numpy.dot(self.as_array().ravel(), other.as_array().ravel().conjugate())
elif method == 'reduce':
# see https://github.com/vais-ral/CCPi-Framework/pull/273
# notice that Python seems to be smart enough to use
# the appropriate type to hold the result of the reduction
sf = reduce(lambda x,y: x + y[0]*y[1],
zip(self.as_array().ravel(),
other.as_array().ravel().conjugate()),
0)
return sf
else:
raise ValueError('Shapes are not aligned: {} != {}'.format(self.shape, other.shape))
[docs] def min(self, *args, **kwargs):
'''Returns the min pixel value in the DataContainer'''
return numpy.min(self.as_array(), *args, **kwargs)
[docs] def max(self, *args, **kwargs):
'''Returns the max pixel value in the DataContainer'''
return numpy.max(self.as_array(), *args, **kwargs)
[docs] def mean(self, *args, **kwargs):
'''Returns the mean pixel value of the DataContainer'''
if kwargs.get('dtype', None) is None:
kwargs['dtype'] = numpy.float64
return numpy.mean(self.as_array(), *args, **kwargs)
# Logic operators between DataContainers and floats
def __le__(self, other):
'''Returns boolean array of DataContainer less or equal than DataContainer/float'''
if isinstance(other, DataContainer):
return self.as_array()<=other.as_array()
return self.as_array()<=other
def __lt__(self, other):
'''Returns boolean array of DataContainer less than DataContainer/float'''
if isinstance(other, DataContainer):
return self.as_array()<other.as_array()
return self.as_array()<other
def __ge__(self, other):
'''Returns boolean array of DataContainer greater or equal than DataContainer/float'''
if isinstance(other, DataContainer):
return self.as_array()>=other.as_array()
return self.as_array()>=other
def __gt__(self, other):
'''Returns boolean array of DataContainer greater than DataContainer/float'''
if isinstance(other, DataContainer):
return self.as_array()>other.as_array()
return self.as_array()>other
def __eq__(self, other):
'''Returns boolean array of DataContainer equal to DataContainer/float'''
if isinstance(other, DataContainer):
return self.as_array()==other.as_array()
return self.as_array()==other
def __ne__(self, other):
'''Returns boolean array of DataContainer negative to DataContainer/float'''
if isinstance(other, DataContainer):
return self.as_array()!=other.as_array()
return self.as_array()!=other
[docs]class ImageData(DataContainer):
'''DataContainer for holding 2D or 3D DataContainer'''
__container_priority__ = 1
@property
def geometry(self):
return self._geometry
@geometry.setter
def geometry(self, val):
self._geometry = val
@property
def dimension_labels(self):
return self.geometry.dimension_labels
@dimension_labels.setter
def dimension_labels(self, val):
if val is not None:
raise ValueError("Unable to set the dimension_labels directly. Use geometry.set_labels() instead")
def __init__(self,
array = None,
deep_copy=False,
geometry=None,
**kwargs):
dtype = kwargs.get('dtype', numpy.float32)
if geometry is None:
raise AttributeError("ImageData requires a geometry")
labels = kwargs.get('dimension_labels', None)
if labels is not None and labels != geometry.dimension_labels:
raise ValueError("Deprecated: 'dimension_labels' cannot be set with 'allocate()'. Use 'geometry.set_labels()' to modify the geometry before using allocate.")
if array is None:
array = numpy.empty(geometry.shape, dtype=dtype)
elif issubclass(type(array) , DataContainer):
array = array.as_array()
elif issubclass(type(array) , numpy.ndarray):
pass
else:
raise TypeError('array must be a CIL type DataContainer or numpy.ndarray got {}'.format(type(array)))
if array.shape != geometry.shape:
raise ValueError('Shape mismatch {} {}'.format(array.shape, geometry.shape))
if array.ndim not in [2,3,4]:
raise ValueError('Number of dimensions are not 2 or 3 or 4 : {0}'.format(array.ndim))
super(ImageData, self).__init__(array, deep_copy, geometry=geometry, **kwargs)
[docs] def get_slice(self,channel=None, vertical=None, horizontal_x=None, horizontal_y=None, force=False):
'''
Returns a new ImageData of a single slice of in the requested direction.
'''
try:
geometry_new = self.geometry.get_slice(channel=channel, vertical=vertical, horizontal_x=horizontal_x, horizontal_y=horizontal_y)
except ValueError:
if force:
geometry_new = None
else:
raise ValueError ("Unable to return slice of requested ImageData. Use 'force=True' to return DataContainer instead.")
#if vertical = 'centre' slice convert to index and subset, this will interpolate 2 rows to get the center slice value
if vertical == 'centre':
dim = self.geometry.dimension_labels.index('vertical')
centre_slice_pos = (self.geometry.shape[dim]-1) / 2.
ind0 = int(numpy.floor(centre_slice_pos))
w2 = centre_slice_pos - ind0
out = DataContainer.get_slice(self, channel=channel, vertical=ind0, horizontal_x=horizontal_x, horizontal_y=horizontal_y)
if w2 > 0:
out2 = DataContainer.get_slice(self, channel=channel, vertical=ind0 + 1, horizontal_x=horizontal_x, horizontal_y=horizontal_y)
out = out * (1 - w2) + out2 * w2
else:
out = DataContainer.get_slice(self, channel=channel, vertical=vertical, horizontal_x=horizontal_x, horizontal_y=horizontal_y)
if len(out.shape) == 1 or geometry_new is None:
return out
else:
return ImageData(out.array, deep_copy=False, geometry=geometry_new, suppress_warning=True)
[docs] def apply_circular_mask(self, radius=0.99, in_place=True):
"""
Apply a circular mask to the horizontal_x and horizontal_y slices. Values outside this mask will be set to zero.
This will most commonly be used to mask edge artefacts from standard CT reconstructions with FBP.
Parameters
----------
radius : float, default 0.99
radius of mask by percentage of size of horizontal_x or horizontal_y, whichever is greater
in_place : boolean, default True
If `True` masks the current data, if `False` returns a new `ImageData` object.
Returns
-------
ImageData
If `in_place = False` returns a new ImageData object with the masked data
"""
ig = self.geometry
# grid
y_range = (ig.voxel_num_y-1)/2
x_range = (ig.voxel_num_x-1)/2
Y, X = numpy.ogrid[-y_range:y_range+1,-x_range:x_range+1]
# use centre from geometry in units distance to account for aspect ratio of pixels
dist_from_center = numpy.sqrt((X*ig.voxel_size_x+ ig.center_x)**2 + (Y*ig.voxel_size_y+ig.center_y)**2)
size_x = ig.voxel_num_x * ig.voxel_size_x
size_y = ig.voxel_num_y * ig.voxel_size_y
if size_x > size_y:
radius_applied =radius * size_x/2
else:
radius_applied =radius * size_y/2
# approximate the voxel as a circle and get the radius
# ie voxel area = 1, circle of area=1 has r = 0.56
r=((ig.voxel_size_x * ig.voxel_size_y )/numpy.pi)**(1/2)
# we have the voxel centre distance to mask. voxels with distance greater than |r| are fully inside or outside.
# values on the border region between -r and r are preserved
mask =(radius_applied-dist_from_center).clip(-r,r)
# rescale to -pi/2->+pi/2
mask *= (0.5*numpy.pi)/r
# the sin of the linear distance gives us an approximation of area of the circle to include in the mask
numpy.sin(mask, out = mask)
# rescale the data 0 - 1
mask = 0.5 + mask * 0.5
# reorder dataset so 'horizontal_y' and 'horizontal_x' are the final dimensions
labels_orig = self.dimension_labels
labels = list(labels_orig)
labels.remove('horizontal_y')
labels.remove('horizontal_x')
labels.append('horizontal_y')
labels.append('horizontal_x')
if in_place == True:
self.reorder(labels)
numpy.multiply(self.array, mask, out=self.array)
self.reorder(labels_orig)
else:
image_data_out = self.copy()
image_data_out.reorder(labels)
numpy.multiply(image_data_out.array, mask, out=image_data_out.array)
image_data_out.reorder(labels_orig)
return image_data_out
[docs]class AcquisitionData(DataContainer, Partitioner):
'''DataContainer for holding 2D or 3D sinogram'''
__container_priority__ = 1
@property
def geometry(self):
return self._geometry
@geometry.setter
def geometry(self, val):
self._geometry = val
@property
def dimension_labels(self):
return self.geometry.dimension_labels
@dimension_labels.setter
def dimension_labels(self, val):
if val is not None:
raise ValueError("Unable to set the dimension_labels directly. Use geometry.set_labels() instead")
def __init__(self,
array = None,
deep_copy=True,
geometry = None,
**kwargs):
dtype = kwargs.get('dtype', numpy.float32)
if geometry is None:
raise AttributeError("AcquisitionData requires a geometry")
labels = kwargs.get('dimension_labels', None)
if labels is not None and labels != geometry.dimension_labels:
raise ValueError("Deprecated: 'dimension_labels' cannot be set with 'allocate()'. Use 'geometry.set_labels()' to modify the geometry before using allocate.")
if array is None:
array = numpy.empty(geometry.shape, dtype=dtype)
elif issubclass(type(array) , DataContainer):
array = array.as_array()
elif issubclass(type(array) , numpy.ndarray):
pass
else:
raise TypeError('array must be a CIL type DataContainer or numpy.ndarray got {}'.format(type(array)))
if array.shape != geometry.shape:
raise ValueError('Shape mismatch got {} expected {}'.format(array.shape, geometry.shape))
super(AcquisitionData, self).__init__(array, deep_copy, geometry=geometry,**kwargs)
[docs] def get_slice(self,channel=None, angle=None, vertical=None, horizontal=None, force=False):
'''
Returns a new dataset of a single slice of in the requested direction. \
'''
try:
geometry_new = self.geometry.get_slice(channel=channel, angle=angle, vertical=vertical, horizontal=horizontal)
except ValueError:
if force:
geometry_new = None
else:
raise ValueError ("Unable to return slice of requested AcquisitionData. Use 'force=True' to return DataContainer instead.")
#get new data
#if vertical = 'centre' slice convert to index and subset, this will interpolate 2 rows to get the center slice value
if vertical == 'centre':
dim = self.geometry.dimension_labels.index('vertical')
centre_slice_pos = (self.geometry.shape[dim]-1) / 2.
ind0 = int(numpy.floor(centre_slice_pos))
w2 = centre_slice_pos - ind0
out = DataContainer.get_slice(self, channel=channel, angle=angle, vertical=ind0, horizontal=horizontal)
if w2 > 0:
out2 = DataContainer.get_slice(self, channel=channel, angle=angle, vertical=ind0 + 1, horizontal=horizontal)
out = out * (1 - w2) + out2 * w2
else:
out = DataContainer.get_slice(self, channel=channel, angle=angle, vertical=vertical, horizontal=horizontal)
if len(out.shape) == 1 or geometry_new is None:
return out
else:
return AcquisitionData(out.array, deep_copy=False, geometry=geometry_new, suppress_warning=True)
[docs]class Processor(object):
'''Defines a generic DataContainer processor
accepts a DataContainer as input
returns a DataContainer
`__setattr__` allows additional attributes to be defined
`store_output` boolian defining whether a copy of the output is stored. Default is False.
If no attributes are modified get_output will return this stored copy bypassing `process`
'''
def __init__(self, **attributes):
if not 'store_output' in attributes.keys():
attributes['store_output'] = False
attributes['output'] = None
attributes['shouldRun'] = True
attributes['input'] = None
for key, value in attributes.items():
self.__dict__[key] = value
def __setattr__(self, name, value):
if name == 'input':
self.set_input(value)
elif name in self.__dict__.keys():
self.__dict__[name] = value
if name == 'shouldRun':
pass
elif name == 'output':
self.__dict__['shouldRun'] = False
else:
self.__dict__['shouldRun'] = True
else:
raise KeyError('Attribute {0} not found'.format(name))
[docs] def get_output(self, out=None):
"""
Runs the configured processor and returns the processed data
Parameters
----------
out : DataContainer, optional
Fills the referenced DataContainer with the processed data and suppresses the return
Returns
-------
DataContainer
The processed data. Suppressed if `out` is passed
"""
if self.output is None or self.shouldRun:
if out is None:
out = self.process()
else:
self.process(out=out)
if self.store_output:
self.output = out.copy()
return out
else:
return self.output.copy()
def set_input_processor(self, processor):
if issubclass(type(processor), DataProcessor):
self.__dict__['input'] = weakref.ref(processor)
else:
raise TypeError("Input type mismatch: got {0} expecting {1}"\
.format(type(processor), DataProcessor))
def process(self, out=None):
raise NotImplementedError('process must be implemented')
def __call__(self, x, out=None):
self.set_input(x)
if out is None:
out = self.get_output()
else:
self.get_output(out=out)
return out
[docs]class DataProcessor(Processor):
'''Basically an alias of Processor Class'''
pass
class DataProcessor23D(DataProcessor):
'''Regularizers DataProcessor
'''
def check_input(self, dataset):
'''Checks number of dimensions input DataContainer
Expected input is 2D or 3D
'''
if dataset.number_of_dimensions == 2 or \
dataset.number_of_dimensions == 3:
return True
else:
raise ValueError("Expected input dimensions is 2 or 3, got {0}"\
.format(dataset.number_of_dimensions))
###### Example of DataProcessors
class AX(DataProcessor):
'''Example DataProcessor
The AXPY routines perform a vector multiplication operation defined as
y := a*x
where:
a is a scalar
x a DataContainer.
'''
def __init__(self):
kwargs = {'scalar':None,
'input':None,
}
#DataProcessor.__init__(self, **kwargs)
super(AX, self).__init__(**kwargs)
def check_input(self, dataset):
return True
def process(self, out=None):
dsi = self.get_input()
a = self.scalar
if out is None:
y = DataContainer( a * dsi.as_array() , True,
dimension_labels=dsi.dimension_labels )
#self.setParameter(output_dataset=y)
return y
else:
out.fill(a * dsi.as_array())
###### Example of DataProcessors
class CastDataContainer(DataProcessor):
'''Example DataProcessor
Cast a DataContainer array to a different type.
y := a*x
where:
a is a scalar
x a DataContainer.
'''
def __init__(self, dtype=None):
kwargs = {'dtype':dtype,
'input':None,
}
#DataProcessor.__init__(self, **kwargs)
super(CastDataContainer, self).__init__(**kwargs)
def check_input(self, dataset):
return True
def process(self, out=None):
dsi = self.get_input()
dtype = self.dtype
if out is None:
y = numpy.asarray(dsi.as_array(), dtype=dtype)
return type(dsi)(numpy.asarray(dsi.as_array(), dtype=dtype),
dimension_labels=dsi.dimension_labels )
else:
out.fill(numpy.asarray(dsi.as_array(), dtype=dtype))
class PixelByPixelDataProcessor(DataProcessor):
'''Example DataProcessor
This processor applies a python function to each pixel of the DataContainer
f is a python function
x a DataSet.
'''
def __init__(self):
kwargs = {'pyfunc':None,
'input':None,
}
#DataProcessor.__init__(self, **kwargs)
super(PixelByPixelDataProcessor, self).__init__(**kwargs)
def check_input(self, dataset):
return True
def process(self, out=None):
pyfunc = self.pyfunc
dsi = self.get_input()
eval_func = numpy.frompyfunc(pyfunc,1,1)
y = DataContainer( eval_func( dsi.as_array() ) , True,
dimension_labels=dsi.dimension_labels )
return y
[docs]class VectorData(DataContainer):
'''DataContainer to contain 1D array'''
@property
def geometry(self):
return self._geometry
@geometry.setter
def geometry(self, val):
self._geometry = val
@property
def dimension_labels(self):
if hasattr(self,'geometry'):
return self.geometry.dimension_labels
else:
return self._dimension_labels
@dimension_labels.setter
def dimension_labels(self, val):
if hasattr(self,'geometry'):
self.geometry.dimension_labels = val
self._dimension_labels = val
def __init__(self, array=None, **kwargs):
self.geometry = kwargs.get('geometry', None)
dtype = kwargs.get('dtype', numpy.float32)
if self.geometry is None:
if array is None:
raise ValueError('Please specify either a geometry or an array')
else:
if len(array.shape) > 1:
raise ValueError('Incompatible size: expected 1D got {}'.format(array.shape))
out = array
self.geometry = VectorGeometry(array.shape[0], **kwargs)
self.length = self.geometry.length
else:
self.length = self.geometry.length
if array is None:
out = numpy.zeros((self.length,), dtype=dtype)
else:
if self.length == array.shape[0]:
out = array
else:
raise ValueError('Incompatible size: expecting {} got {}'.format((self.length,), array.shape))
deep_copy = True
# need to pass the geometry, othewise None
super(VectorData, self).__init__(out, deep_copy, self.geometry.dimension_labels, geometry = self.geometry)
class VectorGeometry(object):
'''Geometry describing VectorData to contain 1D array'''
RANDOM = 'random'
RANDOM_INT = 'random_int'
@property
def dtype(self):
return self._dtype
@dtype.setter
def dtype(self, val):
self._dtype = val
def __init__(self,
length, **kwargs):
self.length = int(length)
self.shape = (length, )
self.dtype = kwargs.get('dtype', numpy.float32)
self.dimension_labels = kwargs.get('dimension_labels', None)
def clone(self):
'''returns a copy of VectorGeometry'''
return copy.deepcopy(self)
def copy(self):
'''alias of clone'''
return self.clone()
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.length == other.length \
and self.shape == other.shape \
and self.dimension_labels == other.dimension_labels:
return True
return False
def __str__ (self):
repres = ""
repres += "Length: {0}\n".format(self.length)
repres += "Shape: {0}\n".format(self.shape)
repres += "Dimension_labels: {0}\n".format(self.dimension_labels)
return repres
def allocate(self, value=0, **kwargs):
'''allocates an VectorData according to the size expressed in the instance
:param value: accepts numbers to allocate an uniform array, or a string as 'random' or 'random_int' to create a random array or None.
:type value: number or string, default None allocates empty memory block
:param dtype: numerical type to allocate
:type dtype: numpy type, default numpy.float32
:param seed: seed for the random number generator
:type seed: int, default None
:param max_value: max value of the random int array
:type max_value: int, default 100'''
dtype = kwargs.get('dtype', self.dtype)
# self.dtype = kwargs.get('dtype', numpy.float32)
out = VectorData(geometry=self.copy(), dtype=dtype)
if isinstance(value, Number):
if value != 0:
out += value
else:
if value == VectorGeometry.RANDOM:
seed = kwargs.get('seed', None)
if seed is not None:
numpy.random.seed(seed)
out.fill(numpy.random.random_sample(self.shape))
elif value == VectorGeometry.RANDOM_INT:
seed = kwargs.get('seed', None)
if seed is not None:
numpy.random.seed(seed)
max_value = kwargs.get('max_value', 100)
r = numpy.random.randint(max_value,size=self.shape, dtype=numpy.int32)
out.fill(numpy.asarray(r, dtype=self.dtype))
elif value is None:
pass
else:
raise ValueError('Value {} unknown'.format(value))
return out
[docs]class DataOrder():
ENGINES = ['astra','tigre','cil']
ASTRA_IG_LABELS = [ImageGeometry.CHANNEL, ImageGeometry.VERTICAL, ImageGeometry.HORIZONTAL_Y, ImageGeometry.HORIZONTAL_X]
TIGRE_IG_LABELS = [ImageGeometry.CHANNEL, ImageGeometry.VERTICAL, ImageGeometry.HORIZONTAL_Y, ImageGeometry.HORIZONTAL_X]
ASTRA_AG_LABELS = [AcquisitionGeometry.CHANNEL, AcquisitionGeometry.VERTICAL, AcquisitionGeometry.ANGLE, AcquisitionGeometry.HORIZONTAL]
TIGRE_AG_LABELS = [AcquisitionGeometry.CHANNEL, AcquisitionGeometry.ANGLE, AcquisitionGeometry.VERTICAL, AcquisitionGeometry.HORIZONTAL]
CIL_IG_LABELS = [ImageGeometry.CHANNEL, ImageGeometry.VERTICAL, ImageGeometry.HORIZONTAL_Y, ImageGeometry.HORIZONTAL_X]
CIL_AG_LABELS = [AcquisitionGeometry.CHANNEL, AcquisitionGeometry.ANGLE, AcquisitionGeometry.VERTICAL, AcquisitionGeometry.HORIZONTAL]
TOMOPHANTOM_IG_LABELS = [ImageGeometry.CHANNEL, ImageGeometry.VERTICAL, ImageGeometry.HORIZONTAL_Y, ImageGeometry.HORIZONTAL_X]
@staticmethod
def get_order_for_engine(engine, geometry):
if engine == 'astra':
if isinstance(geometry, AcquisitionGeometry):
dim_order = DataOrder.ASTRA_AG_LABELS
else:
dim_order = DataOrder.ASTRA_IG_LABELS
elif engine == 'tigre':
if isinstance(geometry, AcquisitionGeometry):
dim_order = DataOrder.TIGRE_AG_LABELS
else:
dim_order = DataOrder.TIGRE_IG_LABELS
elif engine == 'cil':
if isinstance(geometry, AcquisitionGeometry):
dim_order = DataOrder.CIL_AG_LABELS
else:
dim_order = DataOrder.CIL_IG_LABELS
else:
raise ValueError("Unknown engine expected one of {0} got {1}".format(DataOrder.ENGINES, engine))
dimensions = []
for label in dim_order:
if label in geometry.dimension_labels:
dimensions.append(label)
return dimensions
@staticmethod
def check_order_for_engine(engine, geometry):
order_requested = DataOrder.get_order_for_engine(engine, geometry)
if order_requested == list(geometry.dimension_labels):
return True
else:
raise ValueError("Expected dimension_label order {0}, got {1}.\nTry using `data.reorder('{2}')` to permute for {2}"
.format(order_requested, list(geometry.dimension_labels), engine))