Source code for cil.processors.TransmissionAbsorptionConverter

#  Copyright 2021 United Kingdom Research and Innovation
#  Copyright 2021 The University of Manchester
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# Authors:
# CIL Developers, listed at: https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt

from cil.framework import DataProcessor, AcquisitionData, ImageData, DataContainer
import logging
import numpy
import numba
from cil.utilities import multiprocessing as cil_mp

log = logging.getLogger(__name__)

[docs] class TransmissionAbsorptionConverter(DataProcessor): r'''Processor to convert from transmission measurements to absorption based on the Beer-Lambert law, by: - dividing by white_level, - clipping data to min_intensity (any values below min intensity are set to min_intensity) - taking the logarithm and multiplying by minus 1. If the data contains zero or negative values, inf or NaN values will be present in the output. Parameters: ----------- min_intensity: float, default=0.0 Clips data below this value to ensure log is taken of positive numbers only. white_level: float, default=1.0 A float defining incidence intensity in the Beer-Lambert law. accelerated: bool, default=True Specify whether to use multi-threading using numba. Returns: -------- AcquisitionData, ImageData or DataContainer depending on input data type, return is suppressed if 'out' is passed Notes: ------ If it's unknown whether the data contains zero or negative values, it's recommended to set min_intensity to a small positive value. ''' def __init__(self, min_intensity = 0.0, white_level = 1.0, accelerated = True, ): kwargs = {'min_intensity': min_intensity, 'white_level': white_level, '_accelerated': accelerated} super(TransmissionAbsorptionConverter, self).__init__(**kwargs) def check_input(self, data): if not (issubclass(type(data), DataContainer)): raise TypeError('Processor supports only following data types:\n' + ' - ImageData\n - AcquisitionData\n' + ' - DataContainer') if self.min_intensity <= 0: log.info(f"\n Current min_intensity = {self.min_intensity}: output may contain NaN or inf. Ensure your data only contains positive values or set min_intensity to a small positive value.") return True def process(self, out=None): data = self.get_input() if out is None: out = data.geometry.allocate(None) arr_in = data.as_array() arr_out = out.as_array() # we choose an arbitrary chunk size of 6400, which is a multiple of 32, to allow for efficient threading chunk_size = 6400 num_chunks = data.size // chunk_size if (self._accelerated): remainder = data.size % chunk_size num_threads_original = numba.get_num_threads() numba.set_num_threads(cil_mp.NUM_THREADS) numba_loop(arr_in, num_chunks, chunk_size, remainder, 1/self.white_level, self.min_intensity, arr_out) numba.set_num_threads(num_threads_original) else: if self.white_level != 1: numpy.multiply(arr_in, 1/self.white_level, out=arr_out) arr_in = arr_out if self.min_intensity > 0: numpy.clip(arr_in, self.min_intensity, None, out=arr_out) arr_in = arr_out numpy.log(arr_in, out=arr_out) numpy.negative(arr_out,out=arr_out) out.fill(arr_out) return out
@numba.njit(parallel=True) def numba_loop(arr_in, num_chunks, chunk_size, remainder, multiplier, min_intensity, arr_out): in_flat = arr_in.ravel() out_flat = arr_out.ravel() for i in numba.prange(num_chunks): start = i * chunk_size end = start + chunk_size out_flat[start:end] = numpy.multiply(in_flat[start:end], multiplier) out_flat[start:end] = numpy.clip(out_flat[start:end], min_intensity, None) out_flat[start:end] = -numpy.log(out_flat[start:end]) if remainder > 0: start = num_chunks * chunk_size end = start + remainder out_flat[start:end] = numpy.multiply(in_flat[start:end], multiplier) out_flat[start:end] = numpy.clip(out_flat[start:end], min_intensity, None) out_flat[start:end] = -numpy.log(out_flat[start:end])