import functools
import warnings
import numpy as np
from hottbox.utils.generation.basic import residual_tensor
from hottbox.core.structures import Tensor, TensorCPD
from hottbox.core.operations import khatri_rao, hadamard, sampled_khatri_rao
from .base import Decomposition, svd
# TODO: Need to add option of sorting vectors in the factor matrices and making them sign invariant
class BaseCPD(Decomposition):
def __init__(self, init, max_iter, epsilon, tol, random_state, verbose):
super(BaseCPD, self).__init__()
self.init = init
self.max_iter = max_iter
self.epsilon = epsilon
self.tol = tol
self.random_state = random_state
self.verbose = verbose
def copy(self):
""" Copy of the Decomposition as a new object """
new_object = super(BaseCPD, self).copy()
return new_object
@property
def name(self):
""" Name of the decomposition
Returns
-------
decomposition_name : str
"""
decomposition_name = super(BaseCPD, self).name
return decomposition_name
def decompose(self, tensor, rank, keep_meta):
raise NotImplementedError('Not implemented in base (BaseCPD) class')
@property
def converged(self):
""" Checks convergence
Returns
-------
is_converged : bool
"""
try: # This insures that the cost has been computed at least twice without checking number of iterations
is_converged = abs(self.cost[-2] - self.cost[-1]) <= self.tol
except IndexError:
is_converged = False
return is_converged
def _init_fmat(self, tensor, rank):
""" Initialisation of factor matrices
Parameters
----------
tensor : Tensor
Multidimensional data to be decomposed
rank : tuple
Should be of shape (R,1), where R is the desired tensor rank. It should be passed as tuple for consistency.
Returns
-------
fmat : list[np.ndarray]
List of factor matrices
"""
t_rank = rank[0]
fmat = [np.array([])] * tensor.order
# Check if all dimensions are greater then kryskal rank
dim_check = (np.array(tensor.shape) >= t_rank).sum() == tensor.order
if dim_check:
if self.init == 'svd':
for mode in range(tensor.order):
# TODO: don't really like this implementation
k = tensor.unfold(mode, inplace=False).data
fmat[mode], _, _ = svd(k, t_rank)
elif self.init == 'random':
fmat = [np.random.randn(mode_size, t_rank) for mode_size in tensor.shape]
else:
raise NotImplementedError('The given initialization is not available')
else:
fmat = [np.random.randn(mode_size, t_rank) for mode_size in tensor.shape]
if self.verbose and self.init != 'random':
warnings.warn(
"Specified rank value is greater then one of the dimensions of a tensor ({} > {}).\n"
"Factor matrices have been initialized randomly.".format(t_rank, tensor.shape), RuntimeWarning
)
return fmat
def plot(self):
raise NotImplementedError('Not implemented in base (BaseCPD) class')
[docs]class CPD(BaseCPD):
""" Canonical Polyadic Decomposition.
Computed via alternating least squares (ALS)
Parameters
----------
init : str
Type of factor matrix initialisation. Available options are `svd` and `random`
max_iter : int
Maximum number of iteration
epsilon : float
Threshold for the relative error of approximation.
tol : float
Threshold for convergence of factor matrices
random_state : int
verbose : bool
If True, enable verbose output
Attributes
----------
cost : list
A list of relative approximation errors at each iteration of the algorithm.
"""
def __init__(self, init='svd', max_iter=50, epsilon=10e-3, tol=10e-5,
random_state=None, verbose=False) -> None:
super(CPD, self).__init__(init=init,
max_iter=max_iter,
epsilon=epsilon,
tol=tol,
random_state=random_state,
verbose=verbose)
self.cost = []
[docs] def copy(self):
""" Copy of the CPD algorithm as a new object """
new_object = super(CPD, self).copy()
new_object.cost = []
return new_object
@property
def name(self):
""" Name of the decomposition
Returns
-------
decomposition_name : str
"""
decomposition_name = super(CPD, self).name
return decomposition_name
[docs] def decompose(self, tensor, rank, keep_meta=0, kr_reverse=False, factor_mat=None):
""" Performs CPD-ALS on the ``tensor`` with respect to the specified ``rank``
Parameters
----------
tensor : Tensor
Multi-dimensional data to be decomposed
rank : tuple
Desired Kruskal rank for the given ``tensor``. Should contain only one value.
If it is greater then any of dimensions then random initialisation is used
keep_meta : int
Keep meta information about modes of the given ``tensor``.
0 - the output will have default values for the meta data
1 - keep only mode names
2 - keep mode names and indices
kr_reverse : bool
factor_mat : list(np.ndarray)
Initial list of factor matrices.
Specifying this option will ignore ``init``.
Returns
-------
tensor_cpd : TensorCPD
CP representation of the ``tensor``
Notes
-----
khatri-rao product should be of matrices in reversed order. But this will duplicate original data (e.g. images)
Probably this has something to do with data ordering in Python and how it relates to kr product
"""
if not isinstance(tensor, Tensor):
raise TypeError("Parameter `tensor` should be an object of `Tensor` class!")
if not isinstance(rank, tuple):
raise TypeError("Parameter `rank` should be passed as a tuple!")
if len(rank) != 1:
raise ValueError("Parameter `rank` should be tuple with only one value!")
if factor_mat is None:
fmat = self._init_fmat(tensor, rank)
else:
if not isinstance(factor_mat, list):
raise TypeError("Parameter `factor_mat` should be a list object")
if not all(isinstance(m, np.ndarray) for m in factor_mat):
raise TypeError("Parameter `factor_mat` should be a list object of np.ndarray objects")
# Dimensionality checks
if len(factor_mat) != tensor.order:
raise ValueError("Parameter `factor_mat` should be of the same length as the tensor order")
if not all(m.shape == (mode, rank[0]) for m, mode in zip(factor_mat, tensor.shape)):
raise ValueError("Parameter `factor_mat` should have the shape [mode_n x r]. Incorrect shapes!")
fmat = factor_mat.copy()
self.cost = [] # Reset cost every time when method decompose is called
tensor_cpd = None
core_values = np.repeat(np.array([1]), rank)
norm = tensor.frob_norm
for n_iter in range(self.max_iter):
# Update factor matrices
for mode in range(tensor.order):
kr_result = khatri_rao(fmat, skip_matrix=mode, reverse=kr_reverse)
hadamard_result = hadamard([np.dot(mat.T, mat) for i, mat in enumerate(fmat) if i != mode])
# Do consecutive multiplication of np.ndarray
update = functools.reduce(np.dot, [tensor.unfold(mode, inplace=False).data,
kr_result,
np.linalg.pinv(hadamard_result)])
fmat[mode] = update
# Update cost
tensor_cpd = TensorCPD(fmat=fmat, core_values=core_values)
residual = residual_tensor(tensor, tensor_cpd)
self.cost.append(abs(residual.frob_norm / norm))
if self.verbose:
print('Iter {}: relative error of approximation = {}'.format(n_iter, self.cost[-1]))
# Check termination conditions
if self.cost[-1] <= self.epsilon:
if self.verbose:
print('Relative error of approximation has reached the acceptable level: {}'.format(self.cost[-1]))
break
if self.converged:
if self.verbose:
print('Converged in {} iteration(s)'.format(len(self.cost)))
break
if self.verbose and not self.converged and self.cost[-1] > self.epsilon:
print('Maximum number of iterations ({}) has been reached. '
'Variation = {}'.format(self.max_iter, abs(self.cost[-2] - self.cost[-1])))
if keep_meta == 1:
mode_names = {i: mode.name for i, mode in enumerate(tensor.modes)}
tensor_cpd.set_mode_names(mode_names=mode_names)
elif keep_meta == 2:
tensor_cpd.copy_modes(tensor)
else:
pass
return tensor_cpd
@property
def converged(self):
""" Checks convergence of the CPD-ALS algorithm.
Returns
-------
bool
"""
is_converged = super(CPD, self).converged
return is_converged
def _init_fmat(self, tensor, rank):
fmat = super(CPD, self)._init_fmat(tensor=tensor,
rank=rank)
return fmat
def plot(self):
print('At the moment, `plot()` is not implemented for the {}'.format(self.name))
# TODO: Fix efficiency issues with this
[docs]class RandomisedCPD(BaseCPD):
""" Randomised Canonical Polyadic Decomposition.
Computed via sampled alternating least squares (ALS)
Parameters
----------
init : str
Type of factor matrix initialisation. Available options are `svd` and `random`
max_iter : int
Maximum number of iteration
epsilon : float
Threshold for the relative error of approximation.
tol : float
Threshold for convergence of factor matrices
random_state : int
verbose : bool
If True, enable verbose output
Attributes
----------
cost : list
A list of relative approximation errors at each iteration of the algorithm.
References
----------
- Battaglino, C., Ballard, G., & Kolda, T. G. (2018). A Practical Randomized CP Tensor
Decomposition. SIAM Journal on Matrix Analysis and Applications, 39(2), 876–901.
http://doi.org/10.1137/17m1112303
"""
def __init__(self, init='svd', sample_size=None, max_iter=50, epsilon=10e-3, tol=10e-5,
random_state=None, verbose=False) -> None:
super(RandomisedCPD, self).__init__(init=init,
max_iter=max_iter,
epsilon=epsilon,
tol=tol,
random_state=random_state,
verbose=verbose)
self.cost = []
self.sample_size = sample_size
[docs] def copy(self):
""" Copy of the CPD algorithm as a new object """
new_object = super(RandomisedCPD, self).copy()
new_object.cost = []
return new_object
@property
def name(self):
""" Name of the decomposition
Returns
-------
decomposition_name : str
"""
decomposition_name = super(RandomisedCPD, self).name
return decomposition_name
[docs] def decompose(self, tensor, rank, keep_meta=0, kr_reverse=False):
""" Performs CPD-ALS on the ``tensor`` with respect to the specified ``rank``
Parameters
----------
tensor : Tensor
Multi-dimensional data to be decomposed
rank : tuple
Desired Kruskal rank for the given ``tensor``. Should contain only one value.
If it is greater then any of dimensions then random initialisation is used
keep_meta : int
Keep meta information about modes of the given ``tensor``.
0 - the output will have default values for the meta data
1 - keep only mode names
2 - keep mode names and indices
kr_reverse : bool
Returns
-------
tensor_cpd : TensorCPD
CP representation of the ``tensor``
Notes
-----
khatri-rao product should be of matrices in reversed order. But this will duplicate original data (e.g. images)
Probably this has something to do with data ordering in Python and how it relates to kr product
"""
if not isinstance(tensor, Tensor):
raise TypeError("Parameter `tensor` should be an object of `Tensor` class!")
if not isinstance(rank, tuple):
raise TypeError("Parameter `rank` should be passed as a tuple!")
if len(rank) != 1:
raise ValueError("Parameter `rank` should be tuple with only one value!")
self.cost = [] # Reset cost every time when method decompose is called
tensor_cpd = None
fmat = self._init_fmat(tensor, rank)
core_values = np.repeat(np.array([1]), rank)
norm = tensor.frob_norm
lm = np.arange(tensor.order).tolist()
for n_iter in range(self.max_iter):
# Update factor matrices
for mode in lm:
kr_result, idxlist = sampled_khatri_rao(fmat, sample_size=self.sample_size, skip_matrix=mode)
lmodes = lm[:mode] + lm[mode+1:]
xs = np.array([tensor.access(m, lmodes) for m in np.array(idxlist).T.tolist()])
# Solve kr_result^-1 * xs
pos_def = np.dot(kr_result.T, kr_result)
corr_term = np.dot(kr_result.T, xs)
min_result = np.linalg.solve(pos_def, corr_term)
fmat[mode] = min_result.T
# Update cost
tensor_cpd = TensorCPD(fmat=fmat, core_values=core_values)
residual = residual_tensor(tensor, tensor_cpd)
self.cost.append(abs(residual.frob_norm / norm))
if self.verbose:
print('Iter {}: relative error of approximation = {}'.format(n_iter, self.cost[-1]))
# Check termination conditions
if self.cost[-1] <= self.epsilon:
if self.verbose:
print('Relative error of approximation has reached the acceptable level: {}'.format(self.cost[-1]))
break
if self.converged:
if self.verbose:
print('Converged in {} iteration(s)'.format(len(self.cost)))
break
if self.verbose and not self.converged and self.cost[-1] > self.epsilon:
print('Maximum number of iterations ({}) has been reached. '
'Variation = {}'.format(self.max_iter, abs(self.cost[-2] - self.cost[-1])))
if keep_meta == 1:
mode_names = {i: mode.name for i, mode in enumerate(tensor.modes)}
tensor_cpd.set_mode_names(mode_names=mode_names)
elif keep_meta == 2:
tensor_cpd.copy_modes(tensor)
else:
pass
return tensor_cpd
@property
def converged(self):
""" Checks convergence of the Randomised CPD-ALS algorithm.
Returns
-------
bool
"""
is_converged = super(RandomisedCPD, self).converged
return is_converged
def _init_fmat(self, tensor, rank):
fmat = super(RandomisedCPD, self)._init_fmat(tensor=tensor,
rank=rank)
return fmat
def plot(self):
print('At the moment, `plot()` is not implemented for the {}'.format(self.name))
[docs]class Parafac2(BaseCPD):
""" Computes PARAFAC2 for ``tensors`` of order three with respect to a specified ``rank``.
Computed via alternating least squares (ALS)
Parameters
----------
max_iter : int
Maximum number of iteration
epsilon : float
Threshold for the relative error of approximation.
tol : float
Threshold for convergence of factor matrices
random_state : int
verbose : bool
If True, enable verbose output
Attributes
----------
cost : list
A list of relative approximation errors at each iteration of the algorithm.
References
----------
- Kiers, H., ten Berge, J. and Bro, R. (1999). PARAFAC2 - Part I.
A direct fitting algorithm for the PARAFAC2 model. Journal of Chemometrics,
13(3-4), pp.275-294.
"""
# TODO: change init use requiring a change in TensorCPD
def __init__(self, max_iter=50, epsilon=10e-3, tol=10e-5,
random_state=None, verbose=False) -> None:
super(Parafac2, self).__init__(init='random',
max_iter=max_iter,
epsilon=epsilon,
tol=tol,
random_state=random_state,
verbose=verbose)
self.cost = []
[docs] def copy(self):
""" Copy of the CPD algorithm as a new object """
new_object = super(Parafac2, self).copy()
new_object.cost = []
return new_object
@property
def name(self):
""" Name of the decomposition
Returns
-------
decomposition_name : str
"""
decomposition_name = super(Parafac2, self).name
return decomposition_name
# TODO: Parameters differ to base class decomposed - fix
[docs] def decompose(self, tenl, rank):
""" Performs Direct fitting using ALS on a list of tensors of order 2 with respect to the specified ``rank``.
Parameters
----------
tenl : List(np.ndarray)
List of np.ndarray of dimension 2 to be decomposed
rank : tuple
Desired Kruskal rank for the given ``tensor``. Should contain only one value.
If it is greater then any of dimensions then random initialisation is used
Returns
-------
fmat_u, fmat_s, fmat_v, reconstructed : Tuple(np.ndarray)
fmat_u,fmat_s,fmat_v are PARAFAC2 representation of list of tensors
reconstructed is the reconstruction of the original tensor directly using fmat_u, fmat_s, fmat_v
Notes
-----
khatri-rao product should be of matrices in reversed order. But this will duplicate original data (e.g. images)
Probably this has something to do with data ordering in Python and how it relates to kr product
"""
if not isinstance(tenl, list):
raise TypeError("Parameter `tenl` should be a list of np.ndarray objects!")
if not all(isinstance(m, np.ndarray) for m in tenl):
raise TypeError("Parameter `tenl` should be a list of np.ndarray objects!")
if not isinstance(rank, tuple):
raise TypeError("Parameter `rank` should be passed as a tuple!")
if len(rank) != 1:
raise ValueError("Parameter `rank` should be tuple with only one value!")
self.cost = [] # Reset cost every time when method decompose is called
sz = np.array([t.shape for t in tenl])
_m = list(sz[:, 1])
if _m[1:] != _m[:-1]:
raise ValueError("Tensors must be of shape I[k] x J")
num_t = len(sz)
mode_b = _m[0]
# Initialisations
cpd = CPD(max_iter=1)
fmat_h, fmat_v, fmat_s, fmat_u = self._init_fmat(rank, sz)
cpd_fmat = None
for n_iter in range(self.max_iter):
for k in range(num_t):
p, _, q = svd(fmat_h.dot(fmat_s[:, :, k]).dot(fmat_v.T).dot(tenl[k].T), rank=rank[0])
fmat_u[k] = q.T.dot(p.T)
y = np.zeros((rank[0], mode_b, num_t))
for k in range(num_t):
y[:, :, k] = fmat_u[k].T.dot(tenl[k])
fmat = [fmat_h, fmat_v, cpd_fmat]
if n_iter == 0:
fmat = None
decomposed_cpd = cpd.decompose(Tensor(y), rank, factor_mat=fmat)
fmat_h, fmat_v, cpd_fmat = decomposed_cpd.fmat
cpd_fmat = cpd_fmat.dot(np.diag(decomposed_cpd._core_values))
for k in range(num_t):
fmat_s[:, :, k] = np.diag(cpd_fmat[k, :])
reconstructed = [(fmat_u[k].dot(fmat_h).dot(fmat_s[:, :, k])).dot(fmat_v.T) for k in range(num_t)]
err = np.sum([np.sum((tenl[k] - reconstructed[k]) ** 2)
for k in range(num_t)])
self.cost.append(err)
if self.verbose:
print('Iter {}: relative error of approximation = {}'.format(n_iter, self.cost[-1]))
# Check termination conditions
if self.cost[-1] <= self.epsilon:
if self.verbose:
print('Relative error of approximation has reached the acceptable level: {}'.format(self.cost[-1]))
break
if self.converged:
if self.verbose:
print('Converged in {} iteration(s)'.format(len(self.cost)))
break
if self.verbose and not self.converged and self.cost[-1] > self.epsilon:
print('Maximum number of iterations ({}) has been reached. '
'Variation = {}'.format(self.max_iter, abs(self.cost[-2] - self.cost[-1])))
# TODO: possibly make another structure
return fmat_u, fmat_s, fmat_v, reconstructed
@property
def converged(self):
""" Checks convergence of the CPD-ALS algorithm.
Returns
-------
bool
"""
try: # This insures that the cost has been computed at least twice without checking number of iterations
is_converged = abs(self.cost[-2] - self.cost[-1]) <= self.tol*self.cost[-2]
except IndexError:
is_converged = False
return is_converged
def _init_fmat(self, rank, modes):
""" Initialisation of matrices used in Parafac2
Parameters
----------
rank : tuple
Should be of shape (R,1), where R is the desired tensor rank. It should be passed as tuple for consistency.
modes : tuple
np.ndarray of the shapes of matrices
Returns
-------
(fmat_h,fmat_v,fmat_s,fmat_u) : Tuple[np.ndarray]
Factor matrices used in Parafac2
"""
mode_sz = len(modes)
s_mode = modes[0, 1]
modes = modes[:, 0]
fmat_h = np.identity(rank[0])
fmat_v = np.random.randn(s_mode, rank[0])
fmat_s = np.random.randn(rank[0], rank[0], mode_sz)
for k in range(mode_sz):
fmat_s[:, :, k] = np.identity(rank[0])
fmat_u = np.array([np.random.randn(modes[i], rank[0]) for i in range(mode_sz)])
if (np.array(modes) < rank[0]).sum() != 0:
warnings.warn(
"Specified rank value is greater then one of the dimensions of a tensor ({} > {}).\n"
"Factor matrices have been initialized randomly.".format(rank, modes), RuntimeWarning
)
return fmat_h, fmat_v, fmat_s, fmat_u
def plot(self):
print('At the moment, `plot()` is not implemented for the {}'.format(self.name))