"""
Deconvolution-shear-reconvolution-resampling tools.
Functions
---------
InterpMatrix
Constructs an interpolation matrix.
MultiInterp
Performs the interpolation.
test
A test function for InterpMatrix.
"""
import time
import numpy as np
import scipy
[docs]
def InterpMatrix(Rsearch, samp, x_out, y_out, Cov, epsilon=1.0e-7, stest=1, verbose=False):
"""
Constructs a reconvolution + interpolation matrix.
Parameters
----------
Rsearch : float
Search radius (from corners), in gridded pixels.
samp : float
Sampling rate of input image (samples per FWHM).
x_out : np.array of float
Fractional pixel positions in x (0 to 1, inclusive), shape (Npts,).
y_out : np.array of float
Fractional pixel positions in y (0 to 1, inclusive), shape (Npts,).
Cov : np.array of float
Covariance matrix of extra smoothing. length 3, array-like [Cxx, Cxy, Cyy].
epsilon : float, optional
Regularization parameter to prevent singular correlations.
stest : int, optional
Computes diagnostics for every stest-th point instead of every point (default is every point).
Saves time.
verbose : bool, optional
Print timing information?
Returns
-------
posx : np.array of int
x positions of input pixels, shape (NN,)
posy : np.array of int
y positions of input pixels, shape (NN,)
T : np.array of float
Interpolation/smoothing matrix, shape (Npts, NN).
U : np.array of float
Fractional squared leakage, shape (Npts,).
Sigma : np.array of float
Noise amplification = sum_i T_{ai}^2, shape (Npts,).
Notes
-----
This function actually has the same algorithm as IMCOM embedded in it. But the "system
matrix" A is the same in all cases so it is much faster than IMCOM.
"""
t0 = time.time()
# extract parameters
R = np.sqrt(
np.ceil(Rsearch**2) + 0.01
) # the 0.01 guarantees no 'edge cases' where a search depends on <= vs <
N = int(np.ceil(R) + 1) * 2
sigma = samp / np.sqrt(8 * np.log(2))
# Npts = np.size(x_out)
Cxx = float(Cov[0])
Cxy = float(Cov[1])
Cyy = float(Cov[2])
# build mesh of input points
posx1D = np.linspace(-(N // 2) + 1, N // 2, N)
posy1D = np.linspace(-(N // 2) + 1, N // 2, N)
posx, posy = np.meshgrid(posx1D, posy1D)
posx = posx.flatten()
posy = posy.flatten()
g = np.nonzero((np.abs(posx - 0.5) - 0.5) ** 2 + (np.abs(posy - 0.5) - 0.5) ** 2 <= R**2)[
0
] # select points in search radius
posx = posx[g]
posy = posy[g]
NN = np.size(posx)
# system matrix
sige = np.sqrt(1.0 / 2.0)
A = np.identity(NN)
for i in range(1, NN):
for j in range(i):
A[j, i] = A[i, j] = np.exp(-((posx[i] - posx[j]) ** 2) / 4.0 / sigma**2) * np.exp(
-((posy[i] - posy[j]) ** 2) / 4.0 / sigma**2
)
# additional terms for regularization
Ad = A + epsilon * np.identity(NN)
for i in range(1, NN):
for j in range(i):
Ad[j, i] = Ad[i, j] = A[i, j] + epsilon * np.exp(
-((posx[i] - posx[j]) ** 2) / 4.0 / sige**2
) * np.exp(-((posy[i] - posy[j]) ** 2) / 4.0 / sige**2)
# get overlap matrices
detCT = (2 * sigma**2 + Cxx) * (2 * sigma**2 + Cyy) - Cxy**2
ratio_sqrtdet = np.sqrt((sigma**2 + Cxx) * (sigma**2 + Cyy) - Cxy**2) / sigma**2
iCTxx = (2 * sigma**2 + Cyy) / detCT
iCTxy = -Cxy / detCT
iCTyy = (2 * sigma**2 + Cxx) / detCT
# this is the simple algorithm, but we can do better
# dx = posx[:,None]-x_out[None,:]
# dy = posy[:,None]-y_out[None,:]
# b = np.exp(-.5*(iCTxx*dx**2 + 2*iCTxy*dx*dy + iCTyy*dy**2)) * 2*sigma**2/np.sqrt(detCT)
#
# instead, we note that
# .5*(iCTxx*dx**2 + 2*iCTxy*dx*dy + iCTyy*dy**2) = du**2 + dv**2
# where we complete the square:
# dv**2 = iCTyy/2.*(dy+iCTxy/iCTyy*dx)**2
# du**2 = (iCTxx-iCTxy**2/iCTyy)/2.*dx
a_ = np.sqrt((iCTxx - iCTxy**2 / iCTyy) / 2.0)
c_ = np.sqrt(iCTyy / 2.0)
m_ = iCTxy / iCTyy
posu = a_ * posx
u_out = a_ * x_out
posv = c_ * (posy + m_ * posx)
v_out = c_ * (y_out + m_ * x_out)
du = posu[:, None] - u_out[None, :]
dv = posv[:, None] - v_out[None, :]
b = (2 * sigma**2 / np.sqrt(detCT)) * np.exp(-(du**2 + dv**2))
# regularization
bp = np.copy(b)
detCT_ = (2 * sige**2 + Cxx) * (2 * sige**2 + Cyy) - Cxy**2
iCTxx_ = (2 * sige**2 + Cyy) / detCT_
iCTxy_ = -Cxy / detCT_
iCTyy_ = (2 * sige**2 + Cxx) / detCT_
a_ = np.sqrt((iCTxx_ - iCTxy_**2 / iCTyy_) / 2.0)
c_ = np.sqrt(iCTyy_ / 2.0)
m_ = iCTxy_ / iCTyy_
posu = a_ * posx
u_out = a_ * x_out
posv = c_ * (posy + m_ * posx)
v_out = c_ * (y_out + m_ * x_out)
du = posu[:, None] - u_out[None, :]
dv = posv[:, None] - v_out[None, :]
bp += (epsilon * 2 * sige**2 / np.sqrt(detCT_)) * np.exp(-(du**2 + dv**2))
# the interpolation matrix is built from each of the corners, and then interpolated.
# this ensures continuity of the weights across cell boundaries
TT = np.zeros_like(b)
xcorner = [0.0, 1.0, 0.0, 1.0]
ycorner = [0.0, 0.0, 1.0, 1.0]
weights = [
(1 - x_out) * (1 - y_out),
x_out * (1 - y_out),
(1 - x_out) * y_out,
x_out * y_out,
] # list of arrays
for icorner in range(4):
# get list of pixels needed for each corner
g = np.nonzero((posx - xcorner[icorner]) ** 2 + (posy - ycorner[icorner]) ** 2 <= R**2)[0]
# Cholesky decomposition
if icorner == 0:
Ad_ = Ad[g, :][:, g]
cs = scipy.linalg.cho_factor(Ad_)
# # don't need to do this again, since it's always the same matrix
# get T-transpose, since this is faster to generate
# note overwrite_b is safe since b[g,:] is advanced indexing and always returns a copy
TT[g, :] += scipy.linalg.cho_solve(cs, bp[g, :], check_finite=False) * weights[icorner][None, :]
# TT[g,:] += np.linalg.solve(Ad[g,:][:,g],b[g,:]) * weights[icorner][None,:]
# <-- this method was slower
# normalize, transpose back
T = TT.T / np.sum(TT, axis=0)[:, None]
# want U[i] = 1./ratio_sqrtdet + np.dot(A@T[i,:]-2*b[:,i],T[i,:])
U = 1.0 / ratio_sqrtdet + np.sum((T[::stest, :] @ A - 2 * b[:, ::stest].T) * T[::stest, :], axis=1)
# Notes on this expression:
# (T@A-2b.T)[i,j] = T[i,k]A[k,j]-2b[j,i]
# then the sum is sum_j (T@A-2b.T)[i,j] * T[i,j] = sum_j T[i,k]A[k,j]T[i,j] -2 sum_j b[j,i]T[i,j]
if verbose:
print("InterpMatrix time =", time.time() - t0)
return (
np.round(posx).astype(np.int16),
np.round(posy).astype(np.int16),
T,
U,
np.sum(T[::stest, :] ** 2, axis=1),
)
[docs]
def MultiInterp(
in_array,
in_mask,
out_size,
out_origin,
out_transform,
Rsearch,
samp,
Cov,
epsilon=1.0e-7,
stest=1,
blocksize=393216,
):
"""
Interpolates from an input array to a regularly spaced output array, including some additional smoothing..
Parameters
----------
in_array : np.array of float
Array to interpolate from (may be 3D, with multiple layers).
in_mask : np.array of bool
Boolean mask for input array (True = masked; False = good).
out_size : (int, int)
Output array size, format: (ny,nx).
out_origin : np.array
Length 2 vector of origin for mapping input-->output coordinates.
out_transform : np.array
Shape (2,2) matrix of Jacobian for mapping input-->output coordinates.
Rsearch : float
Search radius (from corners) in pixels in `in_array`.
samp : float
Sampling rate of input image (samples per FWHM).
Cov : np.array
Covariance matrix of extra smoothing. length 3, flattened array-like [Cxx, Cxy, Cyy].
epsilon : float, optional
Regularization parameter to prevent singular correlations.
stest : int, optional
Computes diagnostics for every stest-th point instead of every point (default is every point).
Saves time.
blocksize : int, optional
Number of points to compute at once (larger is slightly faster but uses more memory).
Returns
-------
out_array : np.array of float
2D or 3D. Same number of layers as `in_array`.
out_mask : np.array of bool
Boolean mask for output array (True = masked; False = good).
Umax : float
Maximum leakage from the interpolation step.
Smax : float
Maximum noise metric from the interpolation step.
Notes
-----
The mapping between input and output coordinates is, using `out_origin` and `out_transform`, ::
x_in = out_transform[0][0]*x_out + out_transform[0][1]*y_out + out_origin[0]
y_in = out_transform[1][0]*x_out + out_transform[1][1]*y_out + out_origin[1]
Both are 0-offset, C/Python style.
"""
# get dimensions
nlayer = 1
is3D = False
if len(np.shape(in_array)) == 3:
nlayer = np.shape(in_array)[0]
is3D = True
ny_in = np.shape(in_array)[-2]
nx_in = np.shape(in_array)[-1]
# outputs
ny = out_size[0]
nx = out_size[1]
# build output array
out_array = np.zeros((nlayer, ny * nx), dtype=in_array.dtype)
out_mask = np.ones((ny * nx,), dtype=bool) # default to masked
Umax = 0.0
Smax = 0.0
# build output map in units of the block size
istart = 0
while istart < ny * nx:
ngroup = blocksize
if ngroup + istart > ny * nx:
ngroup = ny * nx - istart
# get the pixel positions
pixnum = np.arange(istart, istart + ngroup, dtype=np.int32)
y_out = (pixnum // nx).astype(np.float64)
x_out = (pixnum % nx).astype(np.float64)
x_in = out_transform[0][0] * x_out + out_transform[0][1] * y_out + out_origin[0]
y_in = out_transform[1][0] * x_out + out_transform[1][1] * y_out + out_origin[1]
del pixnum
del x_out
del y_out
# get fractional parts
x_in_int = np.floor(x_in).astype(np.int32)
y_in_int = np.floor(y_in).astype(np.int32)
x_in_frac = x_in - x_in_int
y_in_frac = y_in - y_in_int
# get interpolation weights (T_) and the associated offsets in x_in_offset and y_in_offset
x_in_offset, y_in_offset, T_, U_, S_ = InterpMatrix(
Rsearch, samp, x_in_frac, y_in_frac, Cov, stest=stest
)
bb = max(
-np.amin(x_in_offset), np.amax(x_in_offset - 1), -np.amin(y_in_offset), np.amax(y_in_offset - 1)
)
if 2 * bb >= min(nx_in, ny_in):
break # this will return all zeros and mask everything
Umax = max(Umax, np.amax(U_))
Smax = max(Smax, np.amax(S_))
del U_, S_
# two layers to output mask.
# (1) are the pixels we need in the input array?
# (2) are they valid?
# we'll answer question (1) here
out_mask_sub = np.logical_or.reduce(
[x_in_int < bb, x_in_int + 1 + bb >= nx_in, y_in_int < bb, y_in_int + 1 + bb >= ny_in]
)
# move these pixels to avoid reading off the edge of the array --- they are masked so this is OK
x_in_int[out_mask_sub] = bb
y_in_int[out_mask_sub] = bb
for k in range(np.size(x_in_offset)):
out_mask_sub |= in_mask[y_in_int + y_in_offset[k], x_in_int + x_in_offset[k]]
if is3D:
for j in range(nlayer):
out_array[j, istart : istart + ngroup] += (
T_[:, k] * in_array[j, y_in_int + y_in_offset[k], x_in_int + x_in_offset[k]]
)
else:
out_array[0, istart : istart + ngroup] += (
T_[:, k] * in_array[y_in_int + y_in_offset[k], x_in_int + x_in_offset[k]]
)
out_mask[istart : istart + ngroup] = out_mask_sub
istart += blocksize # move to next block
# set masked values to zero
for j in range(nlayer):
out_array[j][out_mask] = 0.0
if is3D: # I think it is clearer this way. # noqa: SIM108
out_array = out_array.reshape((nlayer, ny, nx))
else:
out_array = out_array.reshape((ny, nx))
out_mask = out_mask.reshape((ny, nx))
return out_array, out_mask, Umax, Smax