# Copyright 2020 United Kingdom Research and Innovation
# Copyright 2020 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Authors:
# CIL Developers, listed at: https://github.com/TomographicImaging/CIL/blob/master/NOTICE.txt
# Claire Delplancke (University of Bath)
from cil.optimisation.algorithms import Algorithm
from cil.optimisation.operators import BlockOperator
import numpy as np
import logging
from cil.optimisation.utilities import Sampler
from numbers import Number
import warnings
from cil.framework import BlockDataContainer
log = logging.getLogger(__name__)
[docs]
class SPDHG(Algorithm):
r'''Stochastic Primal Dual Hybrid Gradient (SPDHG) solves separable optimisation problems of the type:
.. math:: \min_{x} f(Kx) + g(x) = \min_{x} \sum f_i(K_i x) + g(x)
where :math:`f_i` and the regulariser :math:`g` need to be proper, convex and lower semi-continuous.
Parameters
----------
f : BlockFunction
Each must be a convex function with a "simple" proximal method of its conjugate
g : Function
A convex function with a "simple" proximal
operator : BlockOperator
BlockOperator must contain Linear Operators
tau : positive float, optional
Step size parameter for the primal problem. If `None` will be computed by algorithm, see note for details.
sigma : list of positive float, optional
List of Step size parameters for dual problem. If `None` will be computed by algorithm, see note for details.
initial : DataContainer, optional
Initial point for the SPDHG algorithm. The default value is a zero DataContainer in the range of the `operator`.
gamma : float, optional
Parameter controlling the trade-off between the primal and dual step sizes
sampler: `cil.optimisation.utilities.Sampler`, optional
A `Sampler` controllingthe selection of the next index for the SPDHG update. If `None`, a sampler will be created for uniform random sampling with replacement. See notes.
prob_weights: list of floats, optional,
Consider that the sampler is called a large number of times this argument holds the expected number of times each index would be called, normalised to 1. Note that this should not be passed if the provided sampler has it as an attribute: if the sampler has a `prob_weight` attribute it will take precedence on this parameter. Should be a list of floats of length `num_indices` that sum to 1. If no sampler with `prob_weights` is passed, it defaults to `[1/len(operator)]*len(operator)`.
Note
-----
The `sampler` can be an instance of the `cil.optimisation.utilities.Sampler` class or a custom class with the `__next__(self)` method implemented, which outputs an integer index from {1, ..., len(operator)}.
Note
-----
"Random sampling with replacement" will select the next index with equal probability from `1 - len(operator)`.
Example
-------
>>> data = dataexample.SIMULATED_PARALLEL_BEAM_DATA.get()
>>> data.reorder('astra')
>>> data = data.get_slice(vertical='centre')
>>> ig = ag.get_ImageGeometry()
>>> data_partitioned = data.partition(num_batches=10, mode='staggered')
>>> A_partitioned = ProjectionOperator(ig, data_partitioned.geometry, device = "cpu")
>>> F = BlockFunction(*[L2NormSquared(b=data_partitioned[i])
for i in range(10)])
>>> alpha = 0.025
>>> G = alpha * TotalVariation()
>>> spdhg = SPDHG(f=F, g=G, operator=A_partitioned, sampler=Sampler.sequential(len(A)),
initial=A.domain_geometry().allocate(1), update_objective_interval=10)
>>> spdhg.run(100)
Example
-------
Further examples of usage see the [CIL demos.](https://github.com/vais-ral/CIL-Demos/blob/master/Tomography/Simulated/Single%20Channel/PDHG_vs_SPDHG.py)
Note
-----
When setting `sigma` and `tau`, there are 4 possible cases considered by setup function. In all cases the probabilities :math:`p_i` are set by a default or user defined sampler:
- Case 1: If neither `sigma` or `tau` are provided then `sigma` is set using the formula:
.. math:: \sigma_i= \frac{0.99}{\|K_i\|^2}
and `tau` is set as per case 2
- Case 2: If `sigma` is provided but not `tau` then `tau` is calculated using the formula
.. math:: \tau = 0.99\min_i( \frac{p_i}{ (\sigma_i \|K_i\|^2) })
- Case 3: If `tau` is provided but not `sigma` then `sigma` is calculated using the formula
.. math:: \sigma_i= \frac{0.99 p_i}{\tau\|K_i\|^2}
- Case 4: Both `sigma` and `tau` are provided.
Note
----
Convergence is guaranteed provided that [2, eq. (12)]:
.. math:: \|\sigma[i]^{1/2} K[i] \tau^{1/2} \|^2 < p_i \text{ for all } i
References
----------
[1]"Stochastic primal-dual hybrid gradient algorithm with arbitrary
sampling and imaging applications",
Chambolle, Antonin, Matthias J. Ehrhardt, Peter Richtárik, and Carola-Bibiane Schonlieb,
SIAM Journal on Optimization 28, no. 4 (2018): 2783-2808. https://doi.org/10.1137/17M1134834
[2]"Faster PET reconstruction with non-smooth priors by randomization and preconditioning",
Matthias J Ehrhardt, Pawel Markiewicz and Carola-Bibiane Schönlieb,
Physics in Medicine & Biology, Volume 64, Number 22, 2019. https://doi.org/10.1088/1361-6560/ab3d07
'''
def __init__(self, f=None, g=None, operator=None, tau=None, sigma=None,
initial=None, sampler=None, prob_weights=None, **kwargs):
update_objective_interval = kwargs.pop('update_objective_interval', 1)
super(SPDHG, self).__init__(
update_objective_interval=update_objective_interval)
self.set_up(f=f, g=g, operator=operator, sigma=sigma, tau=tau,
initial=initial, sampler=sampler, prob_weights=prob_weights, **kwargs)
[docs]
def set_up(self, f, g, operator, sigma=None, tau=None,
initial=None, sampler=None, prob_weights=None, **deprecated_kwargs):
'''set-up of the algorithm
'''
log.info("%s setting up", self.__class__.__name__)
# algorithmic parameters
self.f = f
self.g = g
self.operator = operator
if not isinstance(operator, BlockOperator):
raise TypeError("operator should be a BlockOperator")
self._ndual_subsets = len(self.operator)
self._prob_weights = getattr(sampler, 'prob_weights', prob_weights)
self._deprecated_set_prob(deprecated_kwargs, sampler)
if self._prob_weights is None:
self._prob_weights = [1/self._ndual_subsets]*self._ndual_subsets
if prob_weights is not None and self._prob_weights != prob_weights:
raise ValueError(' You passed a `prob_weights` argument and a sampler with a different attribute `prob_weights`, please remove the `prob_weights` argument.')
if sampler is None:
self._sampler = Sampler.random_with_replacement(
len(operator), prob=self._prob_weights)
else:
self._sampler = sampler
#Set the norms of the operators
self._deprecated_set_norms(deprecated_kwargs)
self._norms = operator.get_norms_as_list()
#Check for other kwargs
if deprecated_kwargs:
raise ValueError("Additional keyword arguments passed but not used: {}".format(deprecated_kwargs))
self.set_step_sizes(sigma=sigma, tau=tau)
# initialize primal variable
if initial is None:
self.x = self.operator.domain_geometry().allocate(0)
else:
self.x = initial.copy()
self._x_tmp = self.operator.domain_geometry().allocate(0)
# initialize dual variable to 0
self._y_old = operator.range_geometry().allocate(0)
if not isinstance(self._y_old, BlockDataContainer): #This can be removed once #1863 is fixed
self._y_old =BlockDataContainer(self._y_old)
# initialize variable z corresponding to back-projected dual variable
self._z = operator.domain_geometry().allocate(0)
self._zbar = operator.domain_geometry().allocate(0)
# relaxation parameter
self._theta = 1
self.configured = True
logging.info("{} configured".format(self.__class__.__name__, ))
def _deprecated_set_prob(self, deprecated_kwargs, sampler):
"""
Handle deprecated keyword arguments for backward compatibility.
Parameters
----------
deprecated_kwargs : dict
Dictionary of keyword arguments.
sampler : Sampler
Sampler class for selecting the next index for the SPDHG update.
Notes
-----
This method is called by the set_up method.
"""
prob = deprecated_kwargs.pop('prob', None)
if prob is not None:
if (self._prob_weights is None) and (sampler is None):
warnings.warn('`prob` is being deprecated to be replaced with a sampler class and `prob_weights`. To randomly sample with replacement use "sampler=Sampler.randomWithReplacement(number_of_subsets, prob=prob). To pass probabilities to the calculation for `sigma` and `tau` please use `prob_weights`. ', DeprecationWarning, stacklevel=2)
self._prob_weights = prob
else:
raise ValueError(
'`prob` is being deprecated to be replaced with a sampler class and `prob_weights`. You passed a `prob` argument, and either a `prob_weights` argument or a sampler. Please remove the `prob` argument.')
def _deprecated_set_norms(self, deprecated_kwargs):
"""
Handle deprecated keyword arguments for backward compatibility.
Parameters
----------
deprecated_kwargs : dict
Dictionary of keyword arguments.
Notes
-----
This method is called by the set_up method.
"""
norms = deprecated_kwargs.pop('norms', None)
if norms is not None:
self.operator.set_norms(norms)
warnings.warn(
' `norms` is being deprecated, use instead the `BlockOperator` function `set_norms`', DeprecationWarning, stacklevel=2)
@property
def sigma(self):
return self._sigma
@property
def tau(self):
return self._tau
[docs]
def set_step_sizes_from_ratio(self, gamma=1.0, rho=0.99):
r""" Sets gamma, the step-size ratio for the SPDHG algorithm. Currently gamma takes a scalar value.
The step sizes `sigma` and `tau` are set using the equations:
.. math:: \sigma_i= \frac{\gamma\rho }{\|K_i\|^2}
.. math:: \tau = \rho\min_i([ \frac{p_i }{\sigma_i \|K_i\|^2})
Parameters
----------
gamma : Positive float
parameter controlling the trade-off between the primal and dual step sizes
rho : Positive float
parameter controlling the size of the product :math:`\sigma\tau`
"""
if isinstance(gamma, Number):
if gamma <= 0:
raise ValueError(
"The step-sizes of SPDHG are positive, gamma should also be positive")
else:
raise ValueError(
"We currently only support scalar values of gamma")
if isinstance(rho, Number):
if rho <= 0:
raise ValueError(
"The step-sizes of SPDHG are positive, rho should also be positive")
else:
raise ValueError(
"We currently only support scalar values of gamma")
self._sigma = [gamma * rho / ni for ni in self._norms]
values = [rho*pi / (si * ni**2) for pi, ni,
si in zip(self._prob_weights, self._norms, self._sigma)]
self._tau = min([value for value in values if value > 1e-8])
[docs]
def set_step_sizes(self, sigma=None, tau=None):
r""" Sets sigma and tau step-sizes for the SPDHG algorithm after the initial set-up. The step sizes can be either scalar or array-objects.
When setting `sigma` and `tau`, there are 4 possible cases considered by setup function:
- Case 1: If neither `sigma` or `tau` are provided then `sigma` is set using the formula:
.. math:: \sigma_i= \frac{0.99}{\|K_i\|^2}
and `tau` is set as per case 2
- Case 2: If `sigma` is provided but not `tau` then `tau` is calculated using the formula
.. math:: \tau = 0.99\min_i( \frac{p_i}{ (\sigma_i \|K_i\|^2) })
- Case 3: If `tau` is provided but not `sigma` then `sigma` is calculated using the formula
.. math:: \sigma_i= \frac{0.99 p_i}{\tau\|K_i\|^2}
- Case 4: Both `sigma` and `tau` are provided.
Parameters
----------
sigma : list of positive float, optional, default= see docstring
List of Step size parameters for dual problem
tau : positive float, optional, default= see docstring
Step size parameter for primal problem
"""
gamma = 1.
rho = .99
if sigma is not None:
if len(sigma) == self._ndual_subsets:
if all(isinstance(x, Number) and x > 0 for x in sigma):
pass
else:
raise ValueError(
"Sigma expected to be a positive number.")
else:
raise ValueError(
"Please pass a list of floats to sigma with the same number of entries as number of operators")
self._sigma = sigma
elif tau is None:
self._sigma = [gamma * rho / ni for ni in self._norms]
else:
self._sigma = [
rho*pi / (tau*ni**2) for ni, pi in zip(self._norms, self._prob_weights)]
if tau is None:
values = [rho*pi / (si * ni**2) for pi, ni,
si in zip(self._prob_weights, self._norms, self._sigma)]
self._tau = min([value for value in values if value > 1e-8])
else:
if not ( isinstance(tau, Number) and tau > 0):
raise ValueError(
"The step-sizes of SPDHG must be positive, passed tau = {}".format(tau))
self._tau = tau
[docs]
def check_convergence(self):
""" Checks whether convergence criterion for SPDHG is satisfied with the current scalar values of tau and sigma
Returns
-------
Boolean
True if convergence criterion is satisfied. False if not satisfied or convergence is unknown.
Note
-----
Convergence criterion currently can only be checked for scalar values of tau.
Note
----
This checks the convergence criterion. Numerical errors may mean some sigma and tau values that satisfy the convergence criterion may not converge.
Alternatively, step sizes outside the convergence criterion may still allow (fast) convergence.
"""
for i in range(self._ndual_subsets):
if isinstance(self._tau, Number) and isinstance(self._sigma[i], Number):
if self._sigma[i] * self._tau * self._norms[i]**2 > self._prob_weights[i]:
return False
return True
else:
raise ValueError('Convergence criterion currently can only be checked for scalar values of tau and sigma[i].')
[docs]
def update(self):
""" Runs one iteration of SPDHG
"""
# Gradient descent for the primal variable
# x_tmp = x - tau * zbar
self._zbar.sapyb(self._tau, self.x, -1., out=self._x_tmp )
self._x_tmp*=-1
self.g.proximal(self._x_tmp, self._tau, out=self.x)
# Choose subset
i = next(self._sampler)
# Gradient ascent for the dual variable
# y_k = y_old[i] + sigma[i] * K[i] x
try:
y_k = self.operator[i].direct(self.x)
except IndexError:
raise IndexError(
'The sampler has outputted an index larger than the number of operators to sample from. Please ensure your sampler samples from {0,1,...,len(operator)-1} only.')
y_k.sapyb(self._sigma[i], self._y_old[i], 1., out=y_k)
y_k = self.f[i].proximal_conjugate(y_k, self._sigma[i])
# Back-project
# x_tmp = K[i]^*(y_k - y_old[i])
y_k.subtract(self._y_old[i], out=self._y_old[i])
self.operator[i].adjoint(self._y_old[i], out=self._x_tmp)
# Update backprojected dual variable and extrapolate
# zbar = z + (1 + theta/p[i]) x_tmp
# z = z + x_tmp
self._z.add(self._x_tmp, out=self._z)
# zbar = z + (theta/p[i]) * x_tmp
self._z.sapyb(1., self._x_tmp, self._theta /
self._prob_weights[i], out=self._zbar)
# save previous iteration
self._save_previous_iteration(i, y_k)
[docs]
def update_objective(self):
# p1 = self.f(self.operator.direct(self.x)) + self.g(self.x)
p1 = 0.
for i, op in enumerate(self.operator.operators):
p1 += self.f[i](op.direct(self.x))
p1 += self.g(self.x)
d1 = - self.f.convex_conjugate(self._y_old)
tmp = self.operator.adjoint(self._y_old)
tmp *= -1
d1 -= self.g.convex_conjugate(tmp)
self.loss.append([p1, d1, p1-d1])
@property
def objective(self):
'''The saved primal objectives.
Returns
-------
list
The saved primal objectives from `update_objective`. The number of saved values depends on the `update_objective_interval` kwarg.
'''
return [x[0] for x in self.loss]
@property
def dual_objective(self):
'''The saved dual objectives.
Returns
-------
list
The saved dual objectives from `update_objective`. The number of saved values depends on the `update_objective_interval` kwarg.
'''
return [x[1] for x in self.loss]
@property
def primal_dual_gap(self):
'''The saved primal-dual gap.
Returns
-------
list
The saved primal dual gap from `update_objective`. The number of saved values depends on the `update_objective_interval` kwarg.
'''
return [x[2] for x in self.loss]
def _save_previous_iteration(self, index, y_current):
''' Internal function used to save the previous iteration
'''
self._y_old[index].fill(y_current)