"""
Report section for noise diagnostics.
Classes
-------
NoiseReport
Noise report section.
Notes
-----
This incorporates functionality that was previously in noisespecs.py.
It also includes the updated functionality from the Laliotis et al. analysis (August 2024).
This version is trying to implement some things to reduce imcom-related correlations, including:
#. only FFTing the interior postage stamp region (throwing out padded regions)
#. convolving noise images with a window function before FFTing
We have changed the clipping to use the full unique region ``[bdpad:L+bdpad,bdpad:L+bdpad]`` in each image.
"""
import json
import os
import re
import subprocess
import sys
from collections import namedtuple
from os.path import exists
import matplotlib
import matplotlib.colors as colors
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits
from scipy import ndimage
from scipy.fft import fft2 as scipy_fft2
from skimage.filters import window
from ..compress.compressutils import ReadFile
from ..config import Settings
from .context_figure import ReportFigContext
from .report import ReportSection
[docs]
RomanFilters = ["W146", "F184", "H158", "J129", "Y106", "Z087", "R062", "PRSM", "DARK", "GRSM", "K213"]
[docs]
AreaArray = [22085, 4840, 7340, 7111, 7006, 6635, 9011, 0, 0, 0, 4654]
# in cm^2; 0 for prism/dark/grism as these are not imaging elements
# Set up a named tuple for the results that will contain relevant information
[docs]
class NoiseReport(ReportSection):
"""
The noise section of the report.
Inherits from pyimcom.diagnostics.report.ReportSection. Overrides build.
Attributes
----------
nblock : int
Number of blocks in the (sub)mosaic used.
psfiles : list of str
List of power spectrum file names.
outslab : list of int
Indices of the layers used for the power spectrum computation.
orignames : list of str
Names of the layers used for the power spectrum computation.
L : int
Side length of the unique region in a block, in output pixels.
noiselayers : dict
A dictionary with values that are integers corresponding to the input layer
corresponding to the type of noise indicated in the key. (Keys should be strings.)
NLK : list of str
The keys of `noiselayers`, sorted in the same order as EXTRAINPUT in the configuration.
"""
[docs]
def build(self, nblockmax=100, m_ab=23.9, bin_flag=1, alpha=0.9, tarfiles=True):
"""
Builds the noise section of the report.
Parameters
----------
nblockmax : int, optional
Maximum size of mosaic to build.
m_ab : float, optional
Scaling magnitude (not currently used).
bin_flag : int, optional
Whether to bin? (1 = bin 8x8, 0 = do not bin)
alpha : float, optional
Tukey window width for noise power spectrum.
tarfiles : bool, optional
Generate a tarball of the data files?
Returns
-------
None
"""
# pulled the reference magnitude back up here
# also the binning flag (1 = bin 8x8, 0 = do not bin)
self.nblock = min(nblockmax, self.cfg.nblock)
# will keep appending so all the power spectrum files with all information in their names is in here
self.psfiles = []
# added to block output file
self.suffix = ""
self.tex += "\\section{Injected noise layers section}\n\n"
# example output slabs for white, 1/f, lab, and simulated noise
self.outslab = [None, None, None, None]
# there are several sets of files to build here
self.build_noisespec(m_ab, bin_flag, alpha)
self.average_spectra(bin_flag)
# make one example figure, the 2D power spectrum
self.gen_overview_fig()
# add variances
filter = Settings.RomanFilters[self.cfg.use_filter][0]
for k in range(len(self.orignames)):
with fits.open(self.datastem + "_" + filter + self.suffix + "_ps_avg.fits") as f:
self.data += (
f"LAYER{k:02d}"
+ " "
+ f"{self.orignames[k]:24s}"
+ f" {np.average(f[0].data[k, :, :]) / self.s_out**2:11.5E}\n"
)
# tarball the files if requested
if tarfiles:
tarfile_head, tarfile_tail = os.path.split(self.datastem + "_blockps" + self.suffix + ".tar")
lf = []
for f in self.psfiles:
pdir, pname = os.path.split(f)
lf += [pname]
pwd = os.getcwd()
os.chdir(pdir)
proc = subprocess.run(["tar", "--create", "--file=" + tarfile_tail] + lf, capture_output=True)
print("tar output -->\n", proc.stdout)
print("errors -->\n", proc.stderr)
for f in lf:
os.remove(f)
os.chdir(pwd)
# --- noisespec --- #
[docs]
def build_noisespec(self, m_ab, bin_flag, alpha):
"""
Computes the noise power spectrum.
Parameters
----------
m_ab : float
Reference star brightness (not used)
bin_flag : int, optional
Whether to bin? (1 = bin 8x8, 0 = do not bin)
Note that binning is disabled if the input image is too small.
alpha : float, optional
Tukey window width for noise power spectrum.
Returns
-------
str
Status string ('Completed').
"""
# pars = [] # list of parameters, replaces global construction when wrapped
# Set useful constants for te lab noise data
tfr = 3.08 # sec
gain = 1.458 # electrons/DN
# ABstd = 3.631e-20 # erg/cm^2
h_erg = 6.62607015e-27 # erg/Hz
h_jy = h_erg * 1e29 # microJy*cm^2*s
s_in = Settings.pixscale_native * 648000.0 / np.pi # arcsec # updated to refer back to Settings
B0 = 0.38 # e/px/s, background estimate
t_exp = 139.8 # s
area = AreaArray[self.cfg.use_filter]
filter = Settings.RomanFilters[self.cfg.use_filter][0]
# extra background to add for lab noise
B1 = 0.0
if filter == "K":
B1 = 4.65
whitenoisekey = None # avoids error if there isn't a "whitenoise" layer
# which blocks?
is_first = True
for iby in range(self.nblock):
for ibx in range(self.nblock):
blockid = f"{filter:s}_{ibx:02d}_{iby:02d}"
if alpha > 0:
win = True
blockid += "_alpha_" + str(alpha)
else:
win = False
if bin_flag == 0:
blockid += "_nobin"
# loop over blocks
infile = self.infile(ibx, iby)
if not exists(infile):
return None
# the first time
if is_first:
is_first = False
with ReadFile(infile, layers=[]) as f:
# n = np.shape(f[0].data)[-1] # size of output images
config = ""
for g in f["CONFIG"].data["text"].tolist():
config += g + " "
configStruct = json.loads(config)
configdata = f["CONFIG"].data
# blocksize = (
# int(configStruct["OUTSIZE"][0])
# * int(configStruct["OUTSIZE"][1])
# * float(configStruct["OUTSIZE"][2])
# / 3600.0
# * np.pi
# / 180
# ) # block size in radians
L = self.L = int(configStruct["OUTSIZE"][0]) * int(
configStruct["OUTSIZE"][1]
) # side length in px
# snap to nearest multiple of 2 or 16
if L >= 32:
L = (L // 16) * 16
else:
L = (L // 2) * 2
bin_flag = 0
self.s_out = s_out = float(configStruct["OUTSIZE"][2]) # in arcsec
# size of padding region around each edge (in px)
bdpad = int(configStruct["OUTSIZE"][1]) * int(configStruct["PAD"])
# figure out which noise layers are there
layers = [""] + configStruct["EXTRAINPUT"]
print("# Layers:", layers)
noiselayers = {}
for i in range(len(layers)):
m = re.match(r"^whitenoise(\d+)$", layers[i])
if m:
noiselayers[str(m[0])] = i
whitenoisekey = str(m[0])
m = re.match(r"^1fnoise(\d+)$", layers[i])
if m:
noiselayers[str(m[0])] = i
m = re.match(r"^labnoise$", layers[i])
if m:
noiselayers[str(m[0])] = i
m = re.match(r"^noise,(\S+)$", layers[i])
if m:
noiselayers[str(m[0])] = i
print("# Noise Layers (format is layer:use_slice): ", noiselayers)
NLK = list(noiselayers.keys()) # save for shorthand -- note this is in insertion order!
print("# Running file: " + infile, "whitenoisekey =", whitenoisekey)
# read layers and get mean coverage
pad = int(configStruct["PAD"])
with ReadFile(infile, layers=sorted([noiselayers[q] for q in NLK])) as f:
infile_data = np.copy(
f[0].data[0, :, bdpad : L + bdpad, bdpad : L + bdpad].astype(np.float32)
)
n = np.shape(f["INWEIGHT"].data)[-1]
mean_coverage = np.mean(
np.sum(np.where(f["INWEIGHT"].data[0, :, :, :] > 0, 1, 0), axis=0)[
pad : n - pad, pad : n - pad
]
)
if bin_flag == 0:
ps2d_all = np.zeros((L, L, len(noiselayers)))
ps1d_all = np.zeros((L // 2, 4, len(noiselayers)))
elif bin_flag == 1:
ps2d_all = np.zeros((L // 8, L // 8, len(noiselayers)))
ps1d_all = np.zeros((L // 16, 4, len(noiselayers)))
else:
raise Exception("Error: bin flag must be 0 (no binning) or 1 (8x8 binning)")
for i_layer, noiselayer in enumerate(NLK):
use_slice = noiselayers[noiselayer]
indata = infile_data[use_slice, :, :]
# Number of radial bins is side length div. into 8 from binning and then (floor) div. by 2
nradbins = L // 16
# Note that with the new clipping this is L again, the Aug. 2024 clipping needed (L-bdpad)
if bin_flag == 0:
nradbins *= 8
norm = (
(L / s_out) ** 2
) # normalization factor to get power spectrum in units of [image units]^2 * arcsec^2
# special normalization for the lab data
m = re.search(r"lab", noiselayer)
if m:
norm_LN = (
(s_in**2) * area * tfr / (h_jy * gain)
) # factor to convert LN from flux DN/fr/s to intensity microJy/arcsec^2
if filter == "K" and whitenoisekey is not None:
wndata = np.copy(infile_data[noiselayers[whitenoisekey]])
wndata *= np.sqrt((B1 - B0) / t_exp) * tfr / gain # convert WN to DN/fr
indata += wndata # add to lab noise
indata = indata / norm_LN
powerspectrum = NoiseReport.get_powerspectra(
indata,
L,
norm,
nradbins,
use_slice=use_slice,
bin_flag=bin_flag,
win=win,
alpha=alpha,
)
# norm and nradbins now required arguments; use_slice is needed to get the correct format
# in the output file
ps2d_all[:, :, i_layer] = powerspectrum.ps_2d
ps1d_all[:, 0, i_layer] = powerspectrum.k
ps1d_all[:, 1, i_layer] = powerspectrum.ps_image
ps1d_all[:, 2, i_layer] = powerspectrum.ps_image_err
ps1d_all[:, 3, i_layer] = powerspectrum.noiselayer
# Reshape things for fits files
ps2d_all = np.transpose(ps2d_all, (2, 0, 1))
# print("# TRANSPOSED ps2d shape:", np.shape(ps2d_all))
# reshape P1D's:
ps1d_all = np.transpose(ps1d_all, (2, 0, 1)).reshape(-1, np.shape(ps1d_all)[1])
# print("# TRANSPOSED ps1d shape:", np.shape(ps1d_all))
# Save power spectra data in a fits file
# Two HDUs: Primary contains 2D spectrum, second is a table with 1D spectrum and MC values
hdu_ps2d = fits.PrimaryHDU(ps2d_all)
hdr = hdu_ps2d.header
hdr["INSTEM"] = infile[:-11] # updated from original script
hdr["MEANCOVG"] = mean_coverage
hdr["LAYERKEY"] = str(noiselayers)
hdr["NLAYERS"] = (len(noiselayers), "Number of layers with noise")
key_layer2 = [""] * (1 + len(configStruct["EXTRAINPUT"]))
for d in NLK:
d_ = noiselayers[d]
key_layer2[d_] = d
key_layer = []
self.orignames = []
for k in range(len(key_layer2)):
if key_layer2[k] != "":
key_layer.append(key_layer2[k])
self.orignames.append(configStruct["EXTRAINPUT"][k - 1])
del key_layer2
for il in range(len(NLK)):
key_ = f"LAYER{il:02d}"
hdr[key_] = (key_layer[il], f"Noise layer {il:d} in intermediate file")
if key_layer[il][:10] == "whitenoise":
self.outslab[0] = il
if key_layer[il][:7] == "1fnoise":
self.outslab[1] = il
if key_layer[il][:8] == "labnoise":
self.outslab[2] = il
if key_layer[il][:6] == "noise," and "b" in key_layer[il]:
self.outslab[3] = il
if len(NLK) >= 1:
del key_
hdr["AREAUNIT"] = "arcsec**2"
col1 = fits.Column(name="Wavenumber", format="E", array=ps1d_all[:, 0])
col2 = fits.Column(name="Power", format="E", array=ps1d_all[:, 1])
col3 = fits.Column(name="Error", format="E", array=ps1d_all[:, 2])
col4 = fits.Column(name="NoiseLayerID", format="I", array=ps1d_all[:, 3])
p1d_cols = fits.ColDefs([col1, col2, col3, col4])
hdu_ps1d = fits.BinTableHDU.from_columns(p1d_cols, name="P1D_TABLE")
hdu_config = fits.BinTableHDU(data=configdata, name="CONFIG")
hdul = fits.HDUList([hdu_ps2d, hdu_config, hdu_ps1d])
self.psfiles.append(self.datastem + "_" + blockid + "_ps.fits")
hdul.writeto(self.psfiles[-1], overwrite=True)
print("# Results saved to ", self.datastem, "_", blockid, "_ps.fits")
self.suffix = blockid[7:]
self.noiselayers = noiselayers # save this for reference later
self.NLK = NLK
return "Completed"
## Utility functions below here ##
@staticmethod
[docs]
def measure_power_spectrum(noiseframe, L, norm=1.0, bin=True, win=True, alpha=0.9):
"""
Measure the 2D power spectrum of image.
Parameters
----------
noiseframe : np.array
The 2D input image to measure the power spectrum of.
In this case, a noise frame from the simulations
L : int
The length of the FFT (must be a multiple of 8).
norm : float, optional
The normalization to use (power spectrum is |FFT|^2/norm).
bin : bool, optional
Whether to bin the 2D spectrum.
Default=True, bins spectrum into L/8 x L/8 image.
Potential extra rows are cut off.
win : bool, optional
Whether to convolve the noise frame with a Tukey window function.
alpha : float, optional
Tukey window parameter.
Returns
-------
np.array
The 2D power spectrum of the image.
"""
# get the window function and its normalization
if win:
w = window(("tukey", alpha), (np.shape(noiseframe)))
norm = norm * np.average(w**2)
noiseframe = noiseframe * w
fft = np.fft.fftshift(scipy_fft2(noiseframe, workers=4))
ps = ((np.abs(fft)) ** 2) / norm
if bin:
# print('# 2D spectrum is 8x8 binned\n')
binned_ps = np.average(np.reshape(ps, (L // 8, 8, L // 8, 8)), axis=(1, 3))
# print('# Binned PS has shape ', np.shape(binned_ps))
return binned_ps
else:
return ps
@staticmethod
[docs]
def _get_wavenumbers(window_length, num_radial_bins):
"""
Calculate wavenumbers for the input image.
Parameters
----------
window_length : int
The length of one axis of the image.
num_radial_bins: int
Number of radial bins the image should be averaged into.
Returns
-------
kmean : np.array
1D array of the wavenumbers for the image
"""
k = np.fft.fftshift(np.fft.fftfreq(window_length))
kx, ky = np.meshgrid(k, k)
k = np.sqrt(kx**2 + ky**2)
k, kmean, kerr = NoiseReport.azimuthal_average(k, num_radial_bins)
return kmean
@staticmethod
[docs]
def azimuthal_average(image, num_radial_bins):
"""
Compute radial profile of image.
Parameters
----------
image : np.array
Input image, 2D.
num_radial_bins : int
Number of radial bins in profile.
Returns
-------
r : np.array
Value of radius at each point
radial_mean : np.array
Mean intensity within each annulus. Main result
radial_err : np.array
Standard error on the mean: sigma / sqrt(N).
"""
ny, nx = image.shape
yy, xx = np.mgrid[:ny, :nx]
center = np.array(image.shape) / 2
r = np.hypot(xx - center[1], yy - center[0])
rbin = (num_radial_bins * r / r.max()).astype(int)
radial_mean = ndimage.mean(image, labels=rbin, index=np.arange(1, rbin.max() + 1))
radial_stddev = ndimage.standard_deviation(image, labels=rbin, index=np.arange(1, rbin.max() + 1))
npix = ndimage.sum(np.ones_like(image), labels=rbin, index=np.arange(1, rbin.max() + 1))
radial_err = radial_stddev / np.sqrt(npix)
return r, radial_mean, radial_err
@staticmethod
[docs]
def get_powerspectra(noiseframe, L, norm, num_radial_bins, use_slice=-1, bin_flag=1, win=True, alpha=0.9):
"""
Calculate the azimuthally-averaged 1D power spectrum of the image.
Parameters
----------
noiseframe: np.array
The 2D input image to be averaged over.
L : int
Length of FFT (must be a multiple of 8).
norm : float
Normalization of |FFT|^2->power spectrum.
num_radial_bins : int
Number of bins, should match bin number in get_wavenumbers
use_slice : int, optional
Noise slice number used.
bin_flag : int, optional
Binning? (1=yes, 0=no).
win : bool, optional
Whether to convolve the noise frame with a Tukey window function.
alpha : float, optional
Tukey window parameter.
Returns
-------
results : collection.namedtuple
Power spectrum results.
"""
noise = noiseframe.copy()
if bin_flag == 0:
ps_2d = NoiseReport.measure_power_spectrum(noise, L, norm=norm, bin=False, win=win, alpha=alpha)
else:
ps_2d = NoiseReport.measure_power_spectrum(noise, L, norm=norm, bin=True, win=win, alpha=alpha)
ps_r, ps_1d, ps_image_err = NoiseReport.azimuthal_average(ps_2d, num_radial_bins)
wavenumbers = NoiseReport._get_wavenumbers(noise.shape[0], num_radial_bins)
npix = np.prod(noiseframe.shape)
comment = [use_slice] * num_radial_bins
# consolidate results
results = PspecResults(
ps_image=ps_1d,
ps_image_err=ps_image_err,
npix=npix,
k=wavenumbers,
ps_2d=ps_2d,
noiselayer=comment,
)
return results
# --- average_spectra --- #
[docs]
def average_spectra(self, bin_flag):
"""
Averages together all the power spectra in one band.
Parameters
----------
bin_flag
Whether to bin? (1 = bin 8x8, 0 = do not bin)
Returns
-------
None
"""
for iblock in range(self.nblock**2):
ibx = iblock % self.nblock
iby = iblock // self.nblock
infile = self.psfiles[iblock]
print(ibx, iby, infile)
sys.stdout.flush()
# extract information from the header of the first file
if iblock == 0:
with fits.open(infile) as f:
n = np.shape(f["PRIMARY"])[0]
ll = (f["P1D_TABLE"].data).shape[0]
total_2D = np.zeros(np.shape(np.transpose(f["PRIMARY"].data, (1, 2, 0))))
total_1D = np.zeros((ll, 4))
# header = np.copy(f["PRIMARY"].header)
if not exists(infile):
continue
with fits.open(infile) as f:
indata_2D = np.copy(np.transpose(f["PRIMARY"].data, (1, 2, 0))).astype(np.float32)
indata_1D = np.copy(f["P1D_TABLE"].data)
for k in range(0, n):
total_2D[:, :, k] += indata_2D[:, :, k]
for k in range(0, ll):
for m in range(0, 4):
total_1D[k, m] += indata_1D[k][m]
for k in range(0, n):
total_2D[:, :, k] = total_2D[:, :, k] / self.nblock**2
total_1D = total_1D / self.nblock**2
hdu1 = fits.PrimaryHDU(np.transpose(total_2D, (2, 0, 1)))
hdr = hdu1.header
with fits.open(self.psfiles[0]) as g:
copykey = ["INSTEM", "LAYERKEY", "NLAYERS"]
for il in range(len(self.noiselayers)):
copykey.append(f"LAYER{il:02d}")
copykey.append("AREAUNIT")
for key in copykey:
hdr[key] = g[0].header[key]
col1 = fits.Column(name="Wavenumber", format="E", array=total_1D[:, 0])
col2 = fits.Column(name="Power", format="E", array=total_1D[:, 1])
col3 = fits.Column(name="Error", format="E", array=total_1D[:, 2])
col4 = fits.Column(name="NoiseLayerID", format="I", array=total_1D[:, 3])
p1d_cols = fits.ColDefs([col1, col2, col3, col4])
hdu_ps1d = fits.BinTableHDU.from_columns(p1d_cols, name="P1D_TABLE")
hdul = fits.HDUList([hdu1, hdu_ps1d])
filter = Settings.RomanFilters[self.cfg.use_filter][0]
outfile = self.datastem + "_" + filter + self.suffix + "_ps_avg.fits"
hdul.writeto(outfile, overwrite=True)
print("# Average power spectrum saved to " + outfile)
# --- figures --- #
[docs]
def gen_overview_fig(self):
"""
Makes a simple overview figure.
Returns
-------
str
File name of the figure written.
"""
with ReportFigContext(matplotlib, plt):
filter = Settings.RomanFilters[self.cfg.use_filter][0]
print(self.outslab)
matplotlib.rcParams.update({"font.size": 10})
F = plt.figure(figsize=(9, 5.5))
ntypes = ["white", "1/f", "lab", "simulated"]
vmax = [0.01, 0.3, 0.05, 5e-5]
pos = ["Upper left", "Upper right", "Lower left", "Lower right"]
um = 0.5 / self.s_out
unit_ = ["arcsec$^2$", "arcsec$^2$", r"$\mu$Jy$^2$/arcsec$^2$", r"(DN/s)$^2$ arcsec$^2$"]
for k in range(4):
if self.outslab[k] is not None:
S = F.add_subplot(2, 2, k + 1)
S.set_title("Power spectrum: " + ntypes[k] + " noise\n" + unit_[k], usetex=True)
S.set_xlabel("u [cycles/arcsec]")
S.set_ylabel("v [cycles/arcsec]")
with fits.open(self.datastem + "_" + filter + self.suffix + "_ps_avg.fits") as f:
im = S.imshow(
f[0].data[self.outslab[k], :, :],
cmap="gnuplot",
aspect=1,
interpolation="nearest",
origin="lower",
extent=(-um, um, -um, um),
norm=colors.LogNorm(vmin=vmax[k] / 300.0, vmax=vmax[k] * 1.0000001, clip=True),
)
F.colorbar(im, location="right")
outfile = self.datastem + "_" + filter + self.suffix + "_3panel.pdf"
if hasattr(F, "set_layout_engine"):
F.set_layout_engine("tight")
else:
F.set_tight_layout(True)
F.savefig(outfile)
plt.close(F)
# the caption
self.tex += "\\begin{figure}\n"
self.tex += (
"\\includegraphics[width=6.5in]{"
+ self.datastem_from_dir
+ "_"
+ filter
+ self.suffix
+ "_3panel.pdf}\n"
)
self.tex += "\\caption{\\label{fig:noise3panel}The 2D power spectra of the noise realizations.\n"
for k in range(4):
self.tex += r" {\em " + pos[k] + " panel} (" + ntypes[k] + " noise): "
if self.outslab[k] is not None:
self.tex += (
f"layer {self.noiselayers[self.NLK[self.outslab[k]]]:d} "
f"(PyIMCOM) $\\rightarrow$ {self.outslab[k]:d} (PS table), name="
)
self.tex += "{\\tt " + self.orignames[self.outslab[k]] + "}."
else:
self.tex += "not run."
self.tex += " \n"
self.tex += "}\n\\end{figure}\n\n"
self.tex += "The noise power spectra are shown in Fig.~\\ref{fig:noise3panel}.\n"
return outfile