""" Module implementing alignment estimators on ndarrays
"""
import numpy as np
import scipy
from scipy.spatial.distance import cdist
from scipy import linalg
from scipy.sparse import diags
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.linear_assignment_ import linear_assignment
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.linear_model import RidgeCV
from joblib import Parallel, delayed
def scaled_procrustes(X, Y, scaling=False, primal=None):
"""Compute a mixing matrix R and a scaling sc such that Frobenius norm
||sc RX - Y||^2 is minimized and R is an orthogonal matrix.
Parameters
----------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data
scaling: bool
If scaling is true, computes a floating scaling parameter sc such that:
||sc * RX - Y||^2 is minimized and
- R is an orthogonal matrix
- sc is a scalar
If scaling is false sc is set to 1
primal: bool or None, optional,
Whether the SVD is done on the YX^T (primal) or Y^TX (dual)
if None primal is used iff n_features <= n_timeframes
Returns
----------
R: (n_features, n_features) nd array
transformation matrix
sc: int
scaling parameter
"""
X = X.astype(np.float64, copy=False)
Y = Y.astype(np.float64, copy=False)
if np.linalg.norm(X) == 0 or np.linalg.norm(Y) == 0:
return diags(np.ones(X.shape[1])).tocsr(), 1
if primal is None:
primal = X.shape[0] >= X.shape[1]
if primal:
A = Y.T.dot(X)
if A.shape[0] == A.shape[1]:
A += + 1.e-18 * np.eye(A.shape[0])
U, s, V = linalg.svd(A, full_matrices=0)
R = U.dot(V)
else: # "dual" mode
Uy, sy, Vy = linalg.svd(Y, full_matrices=0)
Ux, sx, Vx = linalg.svd(X, full_matrices=0)
A = np.diag(sy).dot(Uy.T).dot(Ux).dot(np.diag(sx))
U, s, V = linalg.svd(A)
R = Vy.T.dot(U).dot(V).dot(Vx)
if scaling:
sc = s.sum() / (np.linalg.norm(X) ** 2)
else:
sc = 1
return R.T, sc
def optimal_permutation(X, Y):
"""Compute the optmal permutation matrix of X toward Y
Parameters
----------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data
Returns
----------
permutation : (n_features, n_features) nd array
transformation matrix
"""
dist = pairwise_distances(X.T, Y.T)
u = linear_assignment(dist)
permutation = scipy.sparse.csr_matrix(
(np.ones(X.shape[1]), (u[:, 0], u[:, 1]))).T
return permutation
def _projection(x, y):
"""Compute scalar d minimizing ||dx-y||
Parameters
----------
x: (n_features) nd array
source vector
y: (n_features) nd array
target vector
Returns
--------
d: int
scaling factor
"""
return np.dot(x, y) / np.linalg.norm(x)**2
def _voxelwise_signal_projection(X, Y, n_jobs=1, parallel_backend='threading'):
"""Compute D, list of scalar d_i minimizing :
||d_i * x_i - y_i|| for every x_i, y_i in X, Y
Parameters
----------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data
Returns
--------
D: list of ints
List of optimal scaling factors
"""
return Parallel(n_jobs, parallel_backend)(delayed(_projection)(
voxel_source, voxel_target)
for voxel_source, voxel_target in zip(X, Y))
class Alignment(BaseEstimator, TransformerMixin):
def __init__(self):
pass
def fit(self, X, Y):
pass
def transform(self, X):
pass
[docs]class Identity(Alignment):
"""Compute no alignment, used as baseline for benchmarks : RX = X.
"""
[docs]class DiagonalAlignment(Alignment):
'''Compute the voxelwise projection factor between X and Y.
Parameters
----------
n_jobs: integer, optional (default = 1)
The number of CPUs to use to do the computation. -1 means
'all CPUs', -2 'all CPUs but one', and so on.
parallel_backend: str, ParallelBackendBase instance, None (default: 'threading')
Specify the parallelization backend implementation. For more
informations see joblib.Parallel documentation
Attributes
-----------
R : scipy.sparse.diags
Scaling matrix containing the optimal shrinking factor for every voxel
'''
[docs] def __init__(self, n_jobs=1, parallel_backend='threading'):
self.n_jobs = n_jobs
self.parallel_backend = parallel_backend
[docs] def fit(self, X, Y):
'''
Parameters
--------------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data'''
shrinkage_coefficients = _voxelwise_signal_projection(
X.T, Y.T, self.n_jobs, self.parallel_backend)
self.R = diags(shrinkage_coefficients)
return
[docs]class ScaledOrthogonalAlignment(Alignment):
"""Compute a orthogonal mixing matrix R and a scaling sc such that Frobenius norm \
||sc RX - Y||^2 is minimized.
Parameters
-----------
scaling : boolean, optional
Determines whether a scaling parameter is applied to improve transform.
Attributes
-----------
R : ndarray (n_features, n_features)
Optimal orthogonal transform
"""
[docs] def __init__(self, scaling=True):
self.scaling = scaling
self.scale = 1
[docs] def fit(self, X, Y):
""" Fit orthogonal R s.t. ||sc XR - Y||^2
Parameters
-----------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data
"""
R, sc = scaled_procrustes(X, Y, scaling=self.scaling)
self.scale = sc
self.R = sc * R
return self
[docs]class RidgeAlignment(Alignment):
""" Compute a scikit-estimator R using a mixing matrix M s.t Frobenius \
norm || XM - Y ||^2 + alpha * ||M||^2 is minimized with cross-validation
Parameters
----------
R : scikit-estimator from sklearn.linear_model.RidgeCV
with methods fit, predict
alpha : numpy array of shape [n_alphas]
Array of alpha values to try. Regularization strength; \
must be a positive float. Regularization improves the conditioning \
of the problem and reduces the variance of the estimates. \
Larger values specify stronger regularization. Alpha corresponds to \
``C^-1`` in other models such as LogisticRegression or LinearSVC.
cv : int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy.\
Possible inputs for cv are:
-None, to use the efficient Leave-One-Out cross-validation
- integer, to specify the number of folds.
- An object to be used as a cross-validation generator.
- An iterable yielding train/test splits.
"""
[docs] def __init__(self, alphas=[0.1, 1.0, 10.0, 100, 1000], cv=4):
self.alphas = [alpha for alpha in alphas]
self.cv = cv
[docs] def fit(self, X, Y):
""" Fit R s.t. || XR - Y ||^2 + alpha ||R||^2 is minimized with cv
Parameters
-----------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data
"""
self.R = RidgeCV(alphas=self.alphas, fit_intercept=True,
normalize=False, scoring=None, cv=self.cv)
self.R.fit(X, Y)
return self
[docs]class Hungarian(Alignment):
'''Compute the optimal permutation matrix of X toward Y
Attributes
----------
R : scipy.sparse.csr_matrix
Mixing matrix containing the optimal permutation
'''
[docs] def fit(self, X, Y):
'''
Parameters
-----------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data'''
self.R = optimal_permutation(X, Y).T
return self
def _import_ot():
'''Import the optional dependency ot (POT module) if installed or give
back a clear error message to the user if not installed
'''
try:
import ot
except ImportError:
from fmralign.version import REQUIRED_MODULE_METADATA
for module, metadata in REQUIRED_MODULE_METADATA:
if module == 'POT':
POT_min_version = metadata['min_version']
raise ImportError("To use optimal transport solver, POT module(v > {}) \
is necessary but not installed by default with fmralign. To install \
it run 'pip install POT'".format(POT_min_version))
else:
return ot
[docs]class OptimalTransportAlignment(Alignment):
'''Compute the optimal coupling between X and Y with entropic regularization.
Parameters
----------
solver : str (optional)
solver from POT called to find optimal coupling 'sinkhorn', \
'greenkhorn', 'sinkhorn_stabilized','sinkhorn_epsilon_scaling', 'exact' \
see POT/ot/bregman on Github for source code of solvers
metric : str(optional)
metric used to create transport cost matrix, \
see full list in scipy.spatial.distance.cdist doc
reg : int (optional)
level of entropic regularization
Attributes
----------
R : scipy.sparse.csr_matrix
Mixing matrix containing the optimal permutation
'''
[docs] def __init__(self, solver='sinkhorn_epsilon_scaling',
metric='euclidean', reg=1):
self.ot = _import_ot()
self.solver = solver
self.metric = metric
self.reg = reg
[docs] def fit(self, X, Y):
'''Parameters
--------------
X: (n_samples, n_features) nd array
source data
Y: (n_samples, n_features) nd array
target data'''
n = len(X.T)
a = np.ones(n) * 1 / n
b = np.ones(n) * 1 / n
M = cdist(X.T, Y.T, metric=self.metric)
if self.solver == 'exact':
self.R = self.ot.lp.emd(a, b, M) * n
else:
self.R = self.ot.sinkhorn(
a, b, M, self.reg, method=self.solver) * n
return self