Source code for cil.io.NEXUSDataReader

#  Copyright 2019 United Kingdom Research and Innovation
#  Copyright 2019 The University of Manchester
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# Authors:
# CIL Developers, listed at: https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt
# Kyle Pidgeon (UKRI-STFC)

import numpy as np
import os
from cil.framework import AcquisitionData, AcquisitionGeometry, ImageData, ImageGeometry

h5pyAvailable = True
try:
    import h5py
except:
    h5pyAvailable = False


[docs] class NEXUSDataReader(object): """ Create a reader for NeXus files. Parameters ---------- file_name: str the full path to the NeXus file to read. """ def __init__(self, file_name=None): self.file_name = file_name if self.file_name is not None: self.set_up(file_name = self.file_name)
[docs] def set_up(self, file_name = None): """ Initialise reader. Parameters ---------- file_name : str Full path to NeXus file """ self.file_name = os.path.abspath(file_name) # check that h5py library is installed if (h5pyAvailable == False): raise Exception('h5py is not available, cannot load NEXUS files.') if self.file_name == None: raise Exception('Path to nexus file is required.') # check if nexus file exists if not(os.path.isfile(self.file_name)): raise Exception('File\n {}\n does not exist.'.format(self.file_name)) self._geometry = None
def read_dimension_labels(self, attrs): dimension_labels = [None] * 4 for k,v in attrs.items(): if k in ['dim0', 'dim1', 'dim2' , 'dim3']: dimension_labels[int(k[3:])] = v # remove Nones dimension_labels = [i for i in dimension_labels if i] if len(dimension_labels) == 0: dimension_labels = None return dimension_labels
[docs] def get_geometry(self): """ Parse NEXUS file and return acquisition or reconstructed volume parameters, depending on file type. Returns ------- AcquisitionGeometry or ImageGeometry Acquisition or reconstructed volume parameters. Exact type depends on file content. """ with h5py.File(self.file_name,'r') as dfile: if np.bytes_(dfile.attrs['creator']) != np.bytes_('NEXUSDataWriter.py'): raise Exception('We can parse only files created by NEXUSDataWriter.py') ds_data = dfile['entry1/tomo_entry/data/data'] if ds_data.attrs['data_type'] == 'ImageData': self._geometry = ImageGeometry(voxel_num_x = int(ds_data.attrs['voxel_num_x']), voxel_num_y = int(ds_data.attrs['voxel_num_y']), voxel_num_z = int(ds_data.attrs['voxel_num_z']), voxel_size_x = ds_data.attrs['voxel_size_x'], voxel_size_y = ds_data.attrs['voxel_size_y'], voxel_size_z = ds_data.attrs['voxel_size_z'], center_x = ds_data.attrs['center_x'], center_y = ds_data.attrs['center_y'], center_z = ds_data.attrs['center_z'], channels = ds_data.attrs['channels']) if ds_data.attrs.__contains__('channel_spacing') == True: self._geometry.channel_spacing = ds_data.attrs['channel_spacing'] # read the dimension_labels from dim{} dimension_labels = self.read_dimension_labels(ds_data.attrs) else: # AcquisitionData if ds_data.attrs.__contains__('dist_source_center') or dfile['entry1/tomo_entry'].__contains__('config/source/position'): geom_type = 'cone' else: geom_type = 'parallel' if ds_data.attrs.__contains__('num_pixels_v'): num_pixels_v = ds_data.attrs.get('num_pixels_v') elif ds_data.attrs.__contains__('pixel_num_v'): num_pixels_v = ds_data.attrs.get('pixel_num_v') else: num_pixels_v = 1 if num_pixels_v > 1: dim = 3 else: dim = 2 if self.is_old_file_version(): num_pixels_h = ds_data.attrs.get('pixel_num_h', 1) num_channels = ds_data.attrs['channels'] ds_angles = dfile['entry1/tomo_entry/data/rotation_angle'] if geom_type == 'cone' and dim == 3: self._geometry = AcquisitionGeometry.create_Cone3D(source_position=[0, -ds_data.attrs['dist_source_center'], 0], detector_position=[0, ds_data.attrs['dist_center_detector'],0]) elif geom_type == 'cone' and dim == 2: self._geometry = AcquisitionGeometry.create_Cone2D(source_position=[0, -ds_data.attrs['dist_source_center']], detector_position=[0, ds_data.attrs['dist_center_detector']]) elif geom_type == 'parallel' and dim == 3: self._geometry = AcquisitionGeometry.create_Parallel3D() elif geom_type == 'parallel' and dim == 2: self._geometry = AcquisitionGeometry.create_Parallel2D() else: num_pixels_h = ds_data.attrs.get('num_pixels_h', 1) num_channels = ds_data.attrs['num_channels'] ds_angles = dfile['entry1/tomo_entry/config/angles'] rotation_axis_position = list(dfile['entry1/tomo_entry/config/rotation_axis/position']) detector_position = list(dfile['entry1/tomo_entry/config/detector/position']) ds_detector = dfile['entry1/tomo_entry/config/detector'] if ds_detector.__contains__('direction_x'): detector_direction_x = list(dfile['entry1/tomo_entry/config/detector/direction_x']) else: detector_direction_x = list(dfile['entry1/tomo_entry/config/detector/direction_row']) if ds_detector.__contains__('direction_y'): detector_direction_y = list(dfile['entry1/tomo_entry/config/detector/direction_y']) elif ds_detector.__contains__('direction_col'): detector_direction_y = list(dfile['entry1/tomo_entry/config/detector/direction_col']) ds_rotate = dfile['entry1/tomo_entry/config/rotation_axis'] if ds_rotate.__contains__('direction'): rotation_axis_direction = list(dfile['entry1/tomo_entry/config/rotation_axis/direction']) if geom_type == 'cone': source_position = list(dfile['entry1/tomo_entry/config/source/position']) if dim == 2: self._geometry = AcquisitionGeometry.create_Cone2D(source_position, detector_position, detector_direction_x, rotation_axis_position) else: self._geometry = AcquisitionGeometry.create_Cone3D(source_position,\ detector_position, detector_direction_x, detector_direction_y,\ rotation_axis_position, rotation_axis_direction) else: ray_direction = list(dfile['entry1/tomo_entry/config/ray/direction']) if dim == 2: self._geometry = AcquisitionGeometry.create_Parallel2D(ray_direction, detector_position, detector_direction_x, rotation_axis_position) else: self._geometry = AcquisitionGeometry.create_Parallel3D(ray_direction,\ detector_position, detector_direction_x, detector_direction_y,\ rotation_axis_position, rotation_axis_direction) # for all Aquisition data #set angles angles = list(ds_angles) angle_unit = ds_angles.attrs.get('angle_unit','degree') initial_angle = ds_angles.attrs.get('initial_angle',0) self._geometry.set_angles(angles, initial_angle=initial_angle, angle_unit=angle_unit) #set panel pixel_size_v = ds_data.attrs.get('pixel_size_v', ds_data.attrs['pixel_size_h']) origin = ds_data.attrs.get('panel_origin','bottom-left') self._geometry.set_panel((num_pixels_h, num_pixels_v),\ pixel_size=(ds_data.attrs['pixel_size_h'], pixel_size_v),\ origin=origin) # set channels self._geometry.set_channels(num_channels) dimension_labels = [] dimension_labels = self.read_dimension_labels(ds_data.attrs) #set labels self._geometry.set_labels(dimension_labels) return self._geometry
[docs] def get_data_scale(self): """ Parse NEXUS file and return the scale factor applied to compress the dataset. Returns ------- scale : float The scale factor applied to compress the dataset """ with h5py.File(self.file_name,'r') as dfile: ds_data = dfile['entry1/tomo_entry/data/data'] try: scale = ds_data.attrs['scale'] except: scale = 1.0 return scale
[docs] def get_data_offset(self): """ Parse NEXUS file and return the offset factor applied to compress the dataset. Returns ------- offset : float The offset factor applied to compress the dataset """ with h5py.File(self.file_name,'r') as dfile: ds_data = dfile['entry1/tomo_entry/data/data'] try: offset = ds_data.attrs['offset'] except: offset = 0.0 return offset
def __read_as(self, dtype=np.float32): """ Parse NEXUS file and return raw file content. Parameters ---------- dtype : data-type The data type used for storing the parsed data. Returns ------- output : ImageData or AcquisitionData The parsed raw data. Exact type depends on file content. """ if self._geometry is None: self.get_geometry() #allocate data container as requested type output = self._geometry.allocate(None, dtype=dtype) with h5py.File(self.file_name,'r') as dfile: ds_data = dfile['entry1/tomo_entry/data/data'] ds_data.read_direct(output.array) return output
[docs] def read_as_original(self): """ Returns the compressed data from the file. Returns ------- output : ImageData or AcquisitionData The raw, compressed data. Exact type depends on file content. """ with h5py.File(self.file_name,'r') as dfile: ds_data = dfile['entry1/tomo_entry/data/data'] dtype = ds_data.dtype return self.__read_as(dtype)
[docs] def read(self): """ Returns the uncompressed data as numpy.float32. Returns ------- output : ImageData or AcquisitionData The uncompressed data. Exact type depends on file content. """ output = self.__read_as(np.float32) scale = self.get_data_scale() offset = self.get_data_offset() if offset != 0: output -= offset if scale != 1: output /= scale return output
[docs] def load_data(self): """ Alias of `read`. See Also -------- read """ return self.read()
def is_old_file_version(self): #return ds_data.attrs.__contains__('geom_type') with h5py.File(self.file_name,'r') as dfile: if np.bytes_(dfile.attrs['creator']) != np.bytes_('NEXUSDataWriter.py'): raise Exception('We can parse only files created by NEXUSDataWriter.py') ds_data = dfile['entry1/tomo_entry/data/data'] return 'geom_type' in ds_data.attrs.keys()
# return True