Source code for cil.processors.PaganinProcessor

#  Copyright 2024 United Kingdom Research and Innovation
#  Copyright 2024 The University of Manchester
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# Authors:
# CIL Developers, listed at:
# https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt

from cil.framework import Processor, AcquisitionData
from cil.framework.labels import AcquisitionDimension

import numpy as np
from scipy.fft import fft2
from scipy.fft import ifft2
from scipy.fft import ifftshift
from scipy import constants
from tqdm import tqdm
import logging

log = logging.getLogger(__name__)

[docs] class PaganinProcessor(Processor): r""" Processor to retrieve quantitative information from phase contrast images using the Paganin phase retrieval algorithm described in [1] Parameters ---------- delta: float (optional) Real part of the deviation of the material refractive index from 1, where refractive index :math:`n = (1 - \delta) + i \beta` energy- dependent refractive index information for x-ray wavelengths can be found at [2], default is 1 beta: float (optional) Complex part of the material refractive index, where refractive index :math:`n = (1 - \delta) + i \beta` energy-dependent refractive index information for x-ray wavelengths can be found at [2], default is 1e-2 energy: float (optional) Energy of the incident photon, default is 40000 energy_units: string (optional) Energy units, default is 'eV' full_retrieval : bool, optional If True, perform the full phase retrieval and return the thickness. If False, return a filtered image, default is True filter_type: string (optional) The form of the Paganin filter to use, either 'paganin_method' (default) or 'generalised_paganin_method' as described in [3] pad: int (optional) Number of pixels to pad the image in Fourier space to reduce aliasing, default is 0 return_units: string (optional) The distance units to return the sample thickness in, must be one of 'm', 'cm', 'mm' or 'um'. Only applies if full_retrieval=True (default is'cm') Returns ------- AcquisitionData AcquisitionData corrected for phase effects, retrieved sample thickness or (if :code:`full_retrieval=False`) filtered data Example ------- >>> processor = PaganinProcessor(delta=5, beta=0.05, energy=18000) >>> processor.set_input(data) >>> thickness = processor.get_output() Example ------- >>> processor = PaganinProcessor(delta=1,beta=10e2, full_retrieval=False) >>> processor.set_input(data) >>> filtered_image = processor.get_output() Example ------- >>> processor = PaganinProcessor() >>> processor.set_input(data) >>> thickness = processor.get_output(override_filter={'alpha':10}) >>> phase_retrieved_image = thickness*processor.mu Notes ----- This processor will work most efficiently using the cil data order with `data.reorder('cil')` Notes ----- This processor uses the phase retrieval algorithm described by Paganin et al. [1] to retrieve the sample thickness .. math:: T(x,y) = - \frac{1}{\mu}\ln\left (\mathcal{F}^{-1}\left (\frac{\mathcal{F}\left ( M^2I_{norm}(x, y,z = \Delta) \right )}{1 + \alpha\left ( k_x^2 + k_y^2 \right )} \right )\right ), where - :math:`T`, is the sample thickness, - :math:`\mu = \frac{4\pi\beta}{\lambda}` is the material linear attenuation coefficient where :math:`\beta` is the complex part of the material refractive index and :math:`\lambda=\frac{hc}{E}` is the probe wavelength, - :math:`M` is the magnification at the detector, - :math:`I_{norm}` is the input image which is expected to be the normalised transmission data, - :math:`\Delta` is the propagation distance, - :math:`\alpha = \frac{\Delta\delta}{\mu}` is a parameter determining the strength of the filter to be applied in Fourier space where :math:`\delta` is the real part of the deviation of the material refractive index from 1 - :math:`k_x, k_y = \left ( \frac{2\pi p}{N_xW}, \frac{2\pi q}{N_yW} \right )` where :math:`p` and :math:`q` are co-ordinates in a Fourier mesh in the range :math:`-N_x/2` to :math:`N_x/2` for an image with size :math:`N_x, N_y` and pixel size :math:`W`. A generalised form of the Paganin phase retrieval method can be called using :code:`filter_type='generalised_paganin_method'`, which uses the form of the algorithm described in [2] .. math:: T(x,y) = -\frac{1}{\mu}\ln\left (\mathcal{F}^{-1}\left (\frac{ \mathcal{F}\left ( M^2I_{norm}(x, y,z = \Delta) \right )}{1 - \frac{2 \alpha}{W^2}\left ( \cos(Wk_x) + \cos(Wk_y) -2 \right )} \right ) \right ) The phase retrieval is valid under the following assumptions - used with paraxial propagation-induced phase contrast images which can be assumed to be single-material locally - using intensity data which has been flat field corrected - and under the assumption that the Fresnel number :math:`F_N = W^2/(\lambda\Delta) >> 1` To apply a filter to images using the Paganin method, call :code:`full_retrieval=False`. In this case the pre-scaling and conversion to absorption is not applied so the requirement to supply flat field corrected intensity data is relaxed, .. math:: I_{filt} = \mathcal{F}^{-1}\left (\frac{\mathcal{F}\left ( I(x, y,z = \Delta) \right )} {1 - \alpha\left ( k_x^2 + k_y^2 \right )} \right ) References --------- - [1] https://doi.org/10.1046/j.1365-2818.2002.01010.x - [2] https://henke.lbl.gov/optical_constants/getdb2.html - [3] https://iopscience.iop.org/article/10.1088/2040-8986/abbab9 With thanks to colleagues at DTU for help with the initial implementation of the phase retrieval algorithm """ def __init__(self, delta=1, beta=1e-2, energy=40000, energy_units='eV', full_retrieval=True, filter_type='paganin_method', pad=0, return_units='cm'): kwargs = { 'energy' : energy, 'wavelength' : self._energy_to_wavelength(energy, energy_units, return_units), 'delta': delta, 'beta': beta, '_delta_user' : delta, '_beta_user' : beta, 'filter_Nx' : None, 'filter_Ny' : None, 'filter_type' : filter_type, 'mu' : None, 'alpha' : None, 'pixel_size' : None, 'propagation_distance' : None, 'magnification' : None, 'filter' : None, 'full_retrieval' : full_retrieval, 'pad' : pad, 'override_geometry' : None, 'override_filter' : None, 'return_units' : return_units } super(PaganinProcessor, self).__init__(**kwargs) def check_input(self, data): if not isinstance(data, (AcquisitionData)): raise TypeError('Processor only supports AcquisitionData') if data.dtype!=np.float32: raise TypeError('Processor only support dtype=float32') return True def process(self, out=None): data = self.get_input() cil_order = tuple(AcquisitionDimension.get_order_for_engine('cil',data.geometry)) if data.dimension_labels != cil_order: log.warning(msg="This processor will work most efficiently using\ \nCIL data order, consider using `data.reorder('cil')`") # set the geometry parameters to use from data.geometry unless the # geometry is overridden with an override_geometry self._set_geometry(data.geometry, self.override_geometry) if out is None: out = data.geometry.allocate(None) # make slice indices to get the projection slice_proj = [slice(None)]*len(data.shape) angle_axis = data.get_dimension_axis('angle') slice_proj[angle_axis] = 0 if data.geometry.channels>1: channel_axis = data.get_dimension_axis('channel') slice_proj[channel_axis] = 0 else: channel_axis = None data_proj = data.as_array()[tuple(slice_proj)] # create an empty axis if the data is 2D if len(data.shape) == 2: data.array = np.expand_dims(data.array, len(data.shape)) slice_proj.append(slice(None)) data_proj = data.as_array()[tuple(slice_proj)] elif len(data_proj.shape) == 2: pass else: raise(ValueError('Data must be 2D or 3D per channel')) if len(out.shape) == 2: out.array = np.expand_dims(out.array, len(out.shape)) # create a filter based on the shape of the data filter_shape = np.shape(data_proj) self.filter_Nx = filter_shape[0]+self.pad*2 self.filter_Ny = filter_shape[1]+self.pad*2 self._create_filter(self.override_filter) # pre-calculate the scaling factor scaling_factor = -(1/self.mu) # allocate padded buffer padded_buffer = np.zeros(tuple(x+self.pad*2 for x in data_proj.shape), dtype=data.dtype) # make slice indices to unpad the data if self.pad>0: slice_pad = tuple([slice(self.pad,-self.pad)] *len(padded_buffer.shape)) else: slice_pad = tuple([slice(None)]*len(padded_buffer.shape)) # loop over the channels mag2 = self.magnification**2 for j in range(data.geometry.channels): if channel_axis is not None: slice_proj[channel_axis] = j # loop over the projections for i in tqdm(range(len(data.geometry.angles))): slice_proj[angle_axis] = i padded_buffer.fill(0) padded_buffer[slice_pad] = data.array[(tuple(slice_proj))] if self.full_retrieval==True: # apply the filter in fourier space, apply log and scale # by magnification padded_buffer*=mag2 fI = fft2(padded_buffer) iffI = ifft2(fI*self.filter).real np.log(iffI, out=padded_buffer) # apply scaling factor np.multiply(scaling_factor, padded_buffer, out=padded_buffer) else: # apply the filter in fourier space fI = fft2(padded_buffer) padded_buffer[:] = ifft2(fI*self.filter).real if data.geometry.channels>1: out.fill(padded_buffer[slice_pad], angle = i, channel=j) else: out.fill(padded_buffer[slice_pad], angle = i) data.array = np.squeeze(data.array) out.array = np.squeeze(out.array) return out
[docs] def set_input(self, dataset): """ Set the input data to the processor Parameters ---------- dataset : AcquisitionData The input AcquisitionData """ return super().set_input(dataset)
[docs] def get_output(self, out=None, override_geometry=None, override_filter=None): r''' Function to get output from the PaganinProcessor Parameters ---------- out : DataContainer, optional Fills the referenced DataContainer with the processed data override_geometry: dict, optional Geometry parameters to use in the phase retrieval if you want to over-ride values found in `data.geometry`. Specify parameters as a dictionary :code:`{'parameter':value}` where parameter is :code:`'magnification', 'propagation_distance'` or :code:`'pixel_size'` and value is the new value to use. Specify distance parameters in the same units as :code:`return_units` (default is cm). override_filter: dict, optional Over-ride the filter parameters to use in the phase retrieval. Specify parameters as :code:`{'parameter':value}` where parameter is :code:`'delta', 'beta'` or :code:`'alpha'` and value is the new value to use. Returns ------- AcquisitionData AcquisitionData corrected for phase effects, retrieved sample thickness or (if :code:`full_retrieval=False`) filtered data Example ------- >>> processor = PaganinProcessor(delta=5, beta=0.05, energy=18000) >>> processor.set_input(data) >>> thickness = processor.get_output() Example ------- >>> processor = PaganinProcessor(delta=1,beta=10e2, full_retrieval=False) >>> processor.set_input(data) >>> filtered_image = processor.get_output() Example ------- >>> processor = PaganinProcessor() >>> processor.set_input(data) >>> thickness = processor.get_output(override_filter={'alpha':10}) >>> phase_retrieved_image = thickness*processor.mu Notes ----- If :code:`'alpha'` is specified in override_filter the new value will be used and delta will be ignored but beta will still be used to calculate :math:`\mu = \frac{4\pi\beta}{\lambda}` which is used for scaling the thickness, therefore it is only recommended to specify alpha when also using :code:`get_output(full_retrieval=False)`, or re-scaling the result by :math:`\mu` e.g. :code:`thickness*processor.mu` If :code:`alpha` is not specified, it will be calculated :math:`\frac{\Delta\delta\lambda}{4\pi\beta}` ''' self.override_geometry = override_geometry self.override_filter = override_filter return super().get_output(out)
def __call__(self, x, out=None, override_geometry=None, override_filter=None): self.set_input(x) if out is None: out = self.get_output(override_geometry=override_geometry, override_filter=override_filter) else: self.get_output(out=out, override_geometry=override_geometry, override_filter=override_filter) return out def _set_geometry(self, geometry, override_geometry=None): ''' Function to set the geometry parameters for the processor. Values are from the data geometry unless the geometry is overridden with an override_geometry dictionary. ''' parameters = ['magnification', 'propagation_distance', 'pixel_size'] # specify parameter names as defined in geometry geometry_parameters = ['magnification', 'dist_center_detector', ('pixel_size_h', 'pixel_size_v')] # specify if parameter requires unit conversion convert_units = [False, True, True] if override_geometry is None: override_geometry = {} # get and check parameters from over-ride geometry dictionary for parameter in override_geometry.keys(): if parameter not in parameters: raise ValueError('Parameter {} not recognised, expected one of\ {}.'.format(parameter, parameters)) elif (override_geometry[parameter] is None) \ | (override_geometry[parameter] == 0): raise ValueError("Parameter {} cannot be {}, please update \ data.geometry.{} or over-ride with \ processor.get_output(override_geometry= \ {{ '{}' : value }} )"\ .format(parameter, str(getattr(self, parameter)), geometry_parameters[i], parameter)) else: self.__setattr__(parameter, override_geometry[parameter]) # get and check parameters from geometry if they are not in the # over-ride geometry dictionary for i, parameter in enumerate(parameters): if parameter not in override_geometry: if type(geometry_parameters[i])==tuple: param1 = getattr(geometry, geometry_parameters[i][0]) param2 = getattr(geometry, geometry_parameters[i][1]) if (param1 - param2) / (param1 + param2) >= 1e-5: raise ValueError("Parameter {} is not homogeneous up \ to 1e-5: got {} and {}, please update\ geometry using data.geometry.{} and \ data.geometry.{} or over-ride with \ processor.get_output(\ override_geometry={{ '{}' : value }})" .format(parameter, str(param1), str(param2), geometry_parameters[i][0], geometry_parameters[i][1], parameter)) else: param1 = getattr(geometry, geometry_parameters[i]) if (param1 is None) | (param1 == 0): raise ValueError("Parameter {} cannot be {}, please update\ data.geometry.{} or over-ride with \ processor.get_output(override_geometry\ ={{ '{}' : value }} )" .format(parameter, str(param1), str(geometry_parameters[i]), parameter)) else: if convert_units[i]: param1 = self._convert_units(param1, 'distance', geometry.config.units, self.return_units) self.__setattr__(parameter, param1) def _create_filter(self, override_filter=None): ''' Function to create the Paganin filter, either using the paganin [1] or generalised paganin [2] method The filter is created on a mesh in Fourier space kx, ky [1] https://doi.org/10.1046/j.1365-2818.2002.01010.x [2] https://iopscience.iop.org/article/10.1088/2040-8986/abbab9 ''' if override_filter is None: override_filter = {} # update any parameter which has been over-ridden with override_filter if ('alpha' in override_filter) & ('delta' in override_filter): log.warning(msg="Because you specified alpha, it will not be \ calculated and therefore delta will be ignored") if ('delta' in override_filter): self.delta = override_filter['delta'] else: self.delta = self._delta_user if ('beta' in override_filter): self.beta = override_filter['beta'] else: self.beta = self._beta_user self._calculate_mu() if ('alpha' in override_filter): self.alpha = override_filter['alpha'] else: self._calculate_alpha() # create the Fourier mesh kx,ky = np.meshgrid( np.arange(-self.filter_Nx/2, self.filter_Nx/2, 1, dtype=np.float64) * (2*np.pi)/(self.filter_Nx*self.pixel_size), np.arange(-self.filter_Ny/2, self.filter_Ny/2, 1, dtype=np.float64) * (2*np.pi)/(self.filter_Ny*self.pixel_size), sparse=False, indexing='ij' ) # create the filter using either paganin or generalised paganin method if self.filter_type == 'paganin_method': self.filter = ifftshift(1/(1. + self.alpha*(kx**2 + ky**2))) elif self.filter_type == 'generalised_paganin_method': self.filter = ifftshift(1/(1. - (2*self.alpha/self.pixel_size**2) *(np.cos(self.pixel_size*kx) + np.cos(self.pixel_size*ky) -2))) else: raise ValueError("filter_type not recognised: got {0} expected one\ of 'paganin_method' or \ 'generalised_paganin_method'" .format(self.filter_type)) def _calculate_mu(self): ''' Function to calculate the linear attenutation coefficient mu ''' self.mu = 4.0*np.pi*self.beta/self.wavelength def _calculate_alpha(self): ''' Function to calculate alpha, a constant defining the Paganin filter strength ''' self.alpha = self.propagation_distance*self.delta/self.mu def _energy_to_wavelength(self, energy, energy_units, return_units): ''' Function to convert photon energy in eV to wavelength in return_units Parameters ---------- energy: float Photon energy energy_units Energy units return_units Distance units in which to return the wavelength Returns ------- float Photon wavelength in return_units ''' top = self._convert_units(constants.h*constants.speed_of_light, 'distance', 'm', return_units) bottom = self._convert_units(energy, 'energy', energy_units, 'J') return top/bottom def _convert_units(self, value, unit_type, input_unit, output_unit): unit_types = ['distance','energy','angle'] if unit_type == unit_types[0]: unit_list = ['m','cm','mm','um'] unit_multipliers = [1.0, 1e-2, 1e-3, 1e-6] elif unit_type == unit_types[1]: unit_list = ['meV', 'eV', 'keV', 'MeV', 'J'] unit_multipliers = [1e-3, 1, 1e3, 1e6, 1/constants.eV] elif unit_type == unit_types[2]: unit_list = ['deg', 'rad'] unit_multipliers = [1, np.rad2deg(1)] else: raise ValueError("Unit type '{}' not recognised, must be one of {}" .format(unit_type, unit_types)) for x in [input_unit, output_unit]: if x not in unit_list: raise ValueError("Unit '{}' not recognised, must be one of {}.\ \nGeometry units can be updated using geometry.config.units" .format(x, unit_list)) return value*unit_multipliers[unit_list.index(input_unit)]\ /unit_multipliers[unit_list.index(output_unit)]