"""
Utilities to generate additional layers and handle masks.
Classes
-------
GalSimInject
Utilities to inject objects using GalSim.
GridInject
Utilities to inject stars using furry-parakeet C routine.
CplxNoise
Utilities to generate 1/f noise.
Mask
Utilities for permanent and cosmic ray masks.
Functions
---------
_get_sca_imagefile
Returns path to required SCA image file.
check_if_idsca_exists
Determines whether an observation (id,sca) pair exists.
get_all_data
Makes a 3D array of the image data.
"""
import re
import sys
import time
import warnings
from os.path import exists
import asdf
import galsim
import healpy
import numpy as np
import scipy.linalg
from astropy.io import fits
from filelock import FileLock, Timeout
from scipy.ndimage import convolve
from .config import Settings as Stn
from .config import fpaCoords
try:
from furry_parakeet.pyimcom_croutines import iD5512C
except ImportError:
try:
from pyimcom_croutines import iD5512C
except ImportError:
from .routine import iD5512C
from .wcsutil import PyIMCOM_WCS, local_partial_pixel_derivatives2
[docs]
class GalSimInject:
"""
Utilities to inject objects on a HEALPix grid using GalSim into an SCA image.
Methods
-------
galsim_star_grid
Makes a grid of stars (staticmethod).
subgen
Helper function to generate a subsequence of a random number set (staticmethod).
subgen_multirow
Helper function to generate multiple subsequences of a random number set (staticmethod).
genobj
Generates parameters for random objects on a grid (staticmethod).
galsim_extobj_grid
Makes a grid of galaxies (staticmethod).
Notes
-----
The random number tools are designed to consistently generate the same objects when called with different
SCAs (so a different set of HEALPix pixels is chosen).
Based on fluffy-garbanzo/inject_galsim_obj.py (fluffy-garbanzo is now deprecated).
"""
@staticmethod
[docs]
def galsim_star_grid(res, mywcs, inpsf, idsca, obsdata, sca_nside, inpsf_oversamp, extraargs=None):
"""
Draws a grid of stars on an SCA image.
Parameters
----------
res : int
HEALPix resolution (nside = 2**res).
mywcs : astropy.wcs.WCS or pyimcom.wcsutil.PyIMCOM_WCS
The WCS object for the SCA.
inpsf : function
A function that takes a position and returns a PSF (normally an InImage.get_psf_pos method).
idsca : (int,int)
Tuple of observation ID and SCA number (SCA in 1..18)
obsdata : astropy.io.fits.FITS_rec
Observation table (needed for some data formats).
sca_nside : int
Side length of the SCA (4088 for Roman).
inpsf_oversamp: float or int
Oversampling factor of the PSF image (relative to native).
extraargs : dict or None, optional
If a dictionary, then contains more parameters to pass to some of the functions..
Returns
-------
np.array
An array of shape (nside,nside) containing the image of all the objects drawn.
Notes
-----
The following parameters are allowed as keys in `extraargs`:
* angleTransient : bool
If True, then includes a transient source that is on or off
depending on the roll angle (maps to time of year).
Odd pixels are on for PA=0, even for PA=180.
* FieldDependentModulation : float
If provided, then changes the apparent flux of the drawn stars depending on where
they are in the field. The flux varies from 1 at the center of the focal plane up
to 1 + FieldDependentModulation at the corners.
"""
# extract the extraargs
#
angleTransient = False
FieldDependentModulation = False
if isinstance(extraargs, dict):
# angleTransient: transient depending on roll angle?
if "angleTransient" in extraargs:
angleTransient = extraargs["angleTransient"]
if angleTransient:
# need to know whether the image points 'up' or 'down'
ra1, dec1 = mywcs.all_pix2world(0.5 * (fpaCoords.nside - 1), fpaCoords.nside - 1.0, 0)
ra2, dec2 = mywcs.all_pix2world(0.5 * (fpaCoords.nside - 1), 0.0, 0)
s = 0
if dec2 > dec1:
s = 1
if idsca[1] % 3 == 0:
s = 1 - s # top row of SCAs is flipped
print(".. idsca", idsca, "ddec", dec1 - dec2, "direction", s, "# 0 for PA 0, 1 for PA 180")
# FieldDependentModulation: change intensity depending on distance from field center?
if "FieldDependentModulation" in extraargs:
FieldDependentModulation = True
FieldDependentModulationAmplitude = extraargs["FieldDependentModulation"]
ra_cent, dec_cent = mywcs.all_pix2world((sca_nside - 1) / 2, (sca_nside - 1) / 2, 0)
search_radius = (sca_nside * 0.11) / 3600 * (np.pi / 180.0) * np.sqrt(2)
vec = healpy.ang2vec(ra_cent, dec_cent, lonlat=True)
qp = healpy.query_disc(2**res, vec, search_radius, nest=True)
ra_hpix, dec_hpix = healpy.pix2ang(2**res, qp, nest=True, lonlat=True)
# convert to SCA coordinates
x_sca, y_sca = mywcs.all_world2pix(ra_hpix, dec_hpix, 0)
d = 16
msk_sca = (x_sca >= -d) & (x_sca <= 4087 + d) & (y_sca >= -d) & (y_sca <= 4087 + d)
x_sca = x_sca[msk_sca]
y_sca = y_sca[msk_sca]
my_ra = ra_hpix[msk_sca]
my_dec = dec_hpix[msk_sca]
num_obj = len(x_sca)
n_in_stamp = 280
pad = n_in_stamp + 2 * (d + 1)
sca_image = galsim.ImageF(sca_nside + pad, sca_nside + pad, scale=0.11)
firsttime = True
for n in range(num_obj):
# if angle transient mode is on, check if we really need this object
if angleTransient and (qp[n] + s) % 2 == 1:
continue
psf = inpsf((my_ra[n], my_dec[n])) # now with PSF variation
psf_image = galsim.Image(psf, scale=0.11 / inpsf_oversamp)
# interp_psf = galsim.InterpolatedImage(psf_image, x_interpolant='lanczos32')
# the first time, get the preferred stepk and maxk
if firsttime:
interp_psf_test = galsim.InterpolatedImage(psf_image, x_interpolant="lanczos32")
stepk = interp_psf_test.stepk
maxk = interp_psf_test.maxk
del interp_psf_test
firsttime = False
psf_image.setCenter(0, 0)
interp_psf = galsim._InterpolatedImage(
psf_image, x_interpolant=galsim.interpolant.Lanczos(32), force_stepk=stepk, force_maxk=maxk
)
xy = galsim.PositionD(x_sca[n], y_sca[n])
xyI = xy.round()
draw_offset = (xy - xyI) + galsim.PositionD(0.5, 0.5)
b = galsim.BoundsI(
xmin=xyI.x - n_in_stamp // 2 + pad // 2 + 1,
ymin=xyI.y - n_in_stamp // 2 + pad // 2 + 1,
xmax=xyI.x + n_in_stamp // 2 + pad // 2,
ymax=xyI.y + n_in_stamp // 2 + pad // 2,
)
sub_image = sca_image[b]
if FieldDependentModulation:
xfpa, yfpa = fpaCoords.pix2fpa(idsca[1], x_sca[n], y_sca[n])
st_model = galsim.DeltaFunction(
flux=1.0 + FieldDependentModulationAmplitude * (xfpa**2 + yfpa**2) / fpaCoords.Rfpa**2
)
else:
st_model = galsim.DeltaFunction(flux=1.0)
source = galsim.Convolve([interp_psf, st_model])
source.drawImage(sub_image, offset=draw_offset, add_to_image=True, method="no_pixel")
return sca_image.array[pad // 2 : -pad // 2, pad // 2 : -pad // 2]
@staticmethod
[docs]
def _advance(rngX, delta):
"""
Advances the random number generator.
Using ``GalSimInject._advance(rngX, delta)`` should be equivalent to
``rngX.advance(delta)``, but this is designed to work with a 32-bit limit
on advance (so needed on some platforms).
Parameters
----------
rngX : np.random.BitGenerator
The random number generator; must support the advance method.
delta : int
The number of steps to advance. Must be >=0.
Returns
-------
None
"""
while delta >= 2**30:
rngX.advance(2**30)
delta = delta - 2**30
if delta > 0:
rngX.advance(int(np.int32(delta))) # conversion to avoid overflow
@staticmethod
[docs]
def subgen(rngX, lenpix, subpix):
"""
Returns a subset of the next large block of random numbers.
This is designed to work even when the number of random numbers requested (`lenpix`)
is too large for memory. It assumes no repeated entries in `subpix` (but these don't
have to be sorted).
Parameters
----------
rngX : np.random.BitGenerator
The random number generator; must support the advance method.
lenpix : int
Number of random numbers to draw.
subpix : np.array
1D array of integers indicating which entries to return.
Returns
-------
np.array
The array of ``R[subpix[0]] .. R[subpix[-1]]``, where ``R[0] .. R[lenpix-1]``
would be the full-length sequence of random numbers.
See Also
--------
subgen_multirow : Generate multiple rows.
"""
N = np.size(subpix)
# skip if empty
if N == 0:
GalSimInject._advance(rngX, lenpix)
return np.zeros(0)
out_temp = np.zeros(N)
k = np.argsort(subpix)
subpix_sort = subpix[k]
nskip = subpix_sort - 1
nskip[1:] -= subpix_sort[:-1]
nskip[0] += 1
for i in range(N):
GalSimInject._advance(rngX, nskip[i])
out_temp[i] = np.random.Generator(rngX).uniform()
GalSimInject._advance(rngX, lenpix - subpix_sort[-1] - 1)
out = np.zeros(N)
for i in range(N):
out[k[i]] = out_temp[i]
return out
@staticmethod
[docs]
def subgen_multirow(rngX, lenpix, subpix, P):
"""
Returns multiple subsets of the next large block of random numbers.
This is designed to work even when the number of random numbers requested in each row (`lenpix`)
is too large for memory. It assumes no repeated entries in `subpix` (but these don't
have to be sorted).
Parameters
----------
rngX : np.random.BitGenerator
The random number generator; must support the advance method.
lenpix : int
Number of random numbers to draw.
subpix : np.array
1D array of integers indicating which entries to return.
P : int
Number of repititions to generate.
Returns
-------
np.array
Shape (`P`, len(`subpix`)). Each row is an array ``R[subpix[0]] .. R[subpix[-1]]``,
where ``R[0] .. R[lenpix-1]`` would be a full-length sequence of random numbers.
See Also
--------
subgen : Generate a single row.
"""
out = np.zeros((P, np.size(subpix)))
for j in range(P):
out[j, :] = GalSimInject.subgen(rngX, lenpix, subpix)
return out
@staticmethod
[docs]
def genobj(lenpix, subpix, galstring, seed, morph_extraargs={}):
"""
Generates parameters for a set of random galaxies to draw on specific pixels.
Parameters
----------
lenpix : int
Number of pixels (should be 12 * 4**nside for HEALPix applications).
subpix : np.array
Pixel indices that we want to draw. Array of integers, shape (nobj,).
Does not need to be sorted, but repititions are not allowed.
galstring : str
String specifying the type.
seed : int
Random number generator seed.
morph_extraargs : dict
Dictionary of galaxy morphology: sersic index, effective radius, and intrinsic shape
Returns
-------
dict
A dictionary containing a bunch of arrays of galaxy information.
Notes
-----
The possible `galstring` values, and the keyword/value returned, are as follows:
* ``'exp1'`` : exponential profile, random shear (up to 0.5), log distribution radius
in 0.125 .. 0.5 arcsec.
Keyword/values:
* ``'sersic'`` : Sersic profile arrays.
* ``'n'`` : Sersic index; float or array of float, shape (nobj,).
* ``'r'`` : Effective radius in arcsec; float or array of float, shape (nobj,).
* ``'t__r'`` : Truncation in effective radii; float or array of float, shape (nobj,).
* ``'g'`` : Ellipticity (g-convention); array of float, shape (2,nobj).
"""
# nobj = np.size(subpix)
rngX = np.random.PCG64(seed=seed)
# now consider each type of object
if galstring == "exp1":
data = GalSimInject.subgen_multirow(rngX, lenpix, subpix, 3)
g1 = 0.5 * np.sqrt(data[1, :]) * np.cos(2 * np.pi * data[2, :])
g2 = 0.5 * np.sqrt(data[1, :]) * np.sin(2 * np.pi * data[2, :])
mydict = {"sersic": {"n": 1.0, "r": 0.5 / 4 ** data[0, :], "t__r": 8.0}, "g": np.stack((g1, g2))}
## in case user wants to set galaxy morphology
for arg in morph_extraargs:
if arg == "n":
mydict["sersic"]["n"] = morph_extraargs["n"]
if arg == "hlr":
mydict["sersic"]["r"] = morph_extraargs["hlr"]
if arg == "shape":
g1, g2 = morph_extraargs["shape"][0], morph_extraargs["shape"][1]
mydict["g"] = np.stack((g1, g2))
else:
mydict = {}
return mydict
@staticmethod
[docs]
def _value(obj, n):
"""Helper function to get obj if it is a scalar or obj[n] if it is an array."""
try:
out = obj[n]
except (TypeError, IndexError):
out = obj
return out
@staticmethod
[docs]
def galsim_extobj_grid(res, mywcs, inpsf, sca_nside, inpsf_oversamp, extraargs=[]):
"""
Draws a grid of galaxies on an SCA image.
Parameters
----------
res : int
HEALPix resolution (nside = 2**res).
mywcs : astropy.wcs.WCS or pyimcom.wcsutil.PyIMCOM_WCS
The WCS object for the SCA.
inpsf : function
A function that takes a position and returns a PSF (normally an InImage.get_psf_pos method).
sca_nside : int
Side length of the SCA (4088 for Roman).
inpsf_oversamp : int or float
PSF oversampling factor.
extraargs : list of str, optional
List of extra arguments to pass for drawing galaxies.
An example would be ``extraargs=['seed=12345', 'rot=90', 'shear=0.2:0.1']``.
Returns
-------
np.array
An array of shape (nside,nside) containing the image of all the objects drawn.
Notes
-----
The extra arguments that can be given include:
* "seed=n" (n : int) : use n as the random number generator seed
* "rot=theta" (theta : float) : rotate the galaxies by theta degrees prior to any shear
* "shear=g1:g2" (g1, g2 : float) : shear the galaxies by (g1,g2), conserving area
* "n=n" (n : int) : Sersic index
* "hlr=hlr" (hlr : float) : Effective radius in arcsec
* "shape=g1:g2" (g1, g2 : float) : intrinsi galaxy shape by (g1,g2), conserving area
"""
# default parameters
seed = 4096
rot = None
shear = None
# unpack extraargs
morph_extraargs = {} # dictionary to store galaxy morphology if specified by user
for arg in extraargs:
m = re.search(r"^seed=(\d+)$", arg, re.IGNORECASE)
if m:
seed = int(m.group(1))
m = re.search(r"^rot=(\S+)$", arg, re.IGNORECASE)
if m:
rot = float(m.group(1))
m = re.search(r"^shear=([^ \:]+)\:([^ \:]+)$", arg, re.IGNORECASE)
if m:
shear = [float(m.group(1)), float(m.group(2))]
## added extra arguments for input user to provide specific galaxy morphology
m = re.search(r"^n=(\S+)$", arg, re.IGNORECASE)
if m:
morph_extraargs["n"] = float(m.group(1)) # sersic index
m = re.search(r"^hlr=(\S+)$", arg, re.IGNORECASE)
if m:
morph_extraargs["hlr"] = float(m.group(1)) # half-light radius
m = re.search(r"^shape=([^ \:]+)\:([^ \:]+)$", arg, re.IGNORECASE)
if m:
# intrinsic galaxy shape (g1,g2)
morph_extraargs["shape"] = [float(m.group(1)), float(m.group(2))]
# print('rng seed =', seed, ' transform: rot=', rot, 'shear=', shear)
refscale = 0.11 # reference pixel size in arcsec
ra_cent, dec_cent = mywcs.all_pix2world((sca_nside - 1) / 2, (sca_nside - 1) / 2, 0)
search_radius = (sca_nside * 0.11) / 3600 * (np.pi / 180.0) * np.sqrt(2)
vec = healpy.ang2vec(ra_cent, dec_cent, lonlat=True)
qp = healpy.query_disc(2**res, vec, search_radius, nest=True)
ra_hpix, dec_hpix = healpy.pix2ang(2**res, qp, nest=True, lonlat=True)
# convert to SCA coordinates
x_sca, y_sca = mywcs.all_world2pix(ra_hpix, dec_hpix, 0)
d = 128
msk_sca = (x_sca >= -d) & (x_sca <= 4087 + d) & (y_sca >= -d) & (y_sca <= 4087 + d)
x_sca = x_sca[msk_sca]
y_sca = y_sca[msk_sca]
ipix = qp[msk_sca] # pixel index of the objects within the SCA
my_ra = ra_hpix[msk_sca]
my_dec = dec_hpix[msk_sca]
num_obj = len(x_sca)
# generate object parameters
galstring = "exp1"
galtype = GalSimInject.genobj(12 * 4**res, ipix, galstring, seed, morph_extraargs)
n_in_stamp = 280
pad = n_in_stamp + 2 * (d + 1)
sca_image = galsim.ImageF(sca_nside + pad, sca_nside + pad, scale=refscale)
t0 = time.time()
for n in range(num_obj):
psf = inpsf((my_ra[n], my_dec[n])) # now with PSF variation
psf_image = galsim.Image(psf, scale=0.11 / inpsf_oversamp)
# interp_psf = galsim.InterpolatedImage(psf_image, x_interpolant='lanczos32')
# the first time, get the preferred stepk and maxk
if n == 0:
interp_psf_test = galsim.InterpolatedImage(psf_image, x_interpolant="lanczos32")
stepk = interp_psf_test.stepk
maxk = interp_psf_test.maxk
del interp_psf_test
psf_image.setCenter(0, 0)
interp_psf = galsim._InterpolatedImage(
psf_image, x_interpolant=galsim.interpolant.Lanczos(32), force_stepk=stepk, force_maxk=maxk
)
# Jacobian
Jac = local_partial_pixel_derivatives2(mywcs, x_sca[n], y_sca[n])
# convert to reference pixel size
Jac /= refscale / 3600.0
# now we have d(X,Y)|_{zweibein: X=E,Y=N} / d(X,Y)|_{pixel coords}
xy = galsim.PositionD(x_sca[n], y_sca[n])
xyI = xy.round()
draw_offset = (xy - xyI) + galsim.PositionD(0.5, 0.5)
b = galsim.BoundsI(
xmin=xyI.x - n_in_stamp // 2 + pad // 2 + 1,
ymin=xyI.y - n_in_stamp // 2 + pad // 2 + 1,
xmax=xyI.x + n_in_stamp // 2 + pad // 2,
ymax=xyI.y + n_in_stamp // 2 + pad // 2,
)
sub_image = sca_image[b]
st_model_round = galsim.DeltaFunction(flux=1.0)
# now consider the possible non-point profiles
#
if "sersic" in galtype:
st_model_round = galsim.Sersic(
GalSimInject._value(galtype["sersic"]["n"], n),
half_light_radius=GalSimInject._value(galtype["sersic"]["r"], n),
flux=1.0,
trunc=galtype["sersic"]["t__r"] * GalSimInject._value(galtype["sersic"]["r"], n),
)
#
# now transform the round object if desired
if "g" in galtype:
g1 = GalSimInject._value(galtype["g"][0], n)
g2 = GalSimInject._value(galtype["g"][1], n)
jshear = np.asarray([[1 + g1, g2], [g2, 1 - g1]]) / np.sqrt(1.0 - g1**2 - g2**2)
st_model_undist = galsim.Transformation(
st_model_round, jac=jshear, offset=(0.0, 0.0), flux_ratio=1
)
else:
st_model_undist = st_model_round
# rotate, if desired
if rot is not None:
theta = rot * np.pi / 180.0 # convert to radians
jrot = np.asarray([[np.cos(theta), -np.sin(theta)], [np.sin(theta), np.cos(theta)]])
st_model_undist = galsim.Transformation(
st_model_undist, jac=jrot, offset=(0.0, 0.0), flux_ratio=1
)
# applied shear, if desired
if shear is not None:
jsh = scipy.linalg.expm(np.asarray([[shear[0], shear[1]], [shear[1], -shear[0]]]))
st_model_undist = galsim.Transformation(
st_model_undist, jac=jsh, offset=(0.0, 0.0), flux_ratio=1
)
# and convert to image coordinates
st_model = galsim.Transformation(
st_model_undist,
jac=np.linalg.inv(Jac),
offset=(0.0, 0.0),
flux_ratio=np.abs(np.linalg.det(Jac)),
)
source = galsim.Convolve([interp_psf, st_model])
source.drawImage(sub_image, offset=draw_offset, add_to_image=True, method="no_pixel")
t1 = time.time() - t0
print(f"n={n}, tot={t1}")
sys.stdout.flush()
return sca_image.array[pad // 2 : -pad // 2, pad // 2 : -pad // 2]
[docs]
class GridInject:
"""
Utilities to inject stars using furry-parakeet C routine.
Based on fluffy-garbanzo/grid_inject.py.
Methods
-------
make_sph_grid
Get HEALPix pixels that are in a circular region (staticmethod).
generate_star_grid
Make a table of star parameters that are in a circular region (staticmethod).
make_image_from_grid
Make an image with a grid of stars from a WCS (staticmethod).
"""
@staticmethod
[docs]
def make_sph_grid(res, ra, dec, radius):
"""
Get Healpix pixels at resolution res that are within the given radius of (ra, dec).
Parameters
----------
res : int
HEALPix resolution (nside == 2**res).
ra : float
Right ascension of patch center in radians.
dec : float
Declination of patch center in radians.
radius : float
Radius of patch in radians.
Returns
-------
dict
The output contains a dictionary of:
* 'npix' : int, number of pixels used
* 'ipix' : np.array of int, pixel indices found
* 'rapix' : np.array of float, right ascension of pixels, in radians
* 'decpix' : np.array of float, declination of pixels, in radians
"""
# get healpix nside
nside = 2**res
# get bounding range of pixels
# extended radius, overlap by 2 rings so there is no clipping later
radext = radius + 3 / nside
dmin = max(dec - radext, -np.pi / 2.0)
dmax = min(dec + radext, np.pi / 2.0)
pmin = healpy.pixelfunc.ang2pix(nside, np.pi / 2.0 - dmax, ra, nest=False, lonlat=False)
pmax = healpy.pixelfunc.ang2pix(nside, np.pi / 2.0 - dmin, ra, nest=False, lonlat=False)
# and now the pixel values and their positions
pvec = np.asarray(range(pmin, pmax + 1)).astype(np.int64)
theta, phi = healpy.pixelfunc.pix2ang(nside, pvec, nest=False, lonlat=False)
thetac = np.pi / 2.0 - theta
mu = np.sin(thetac) * np.sin(dec) + np.cos(thetac) * np.cos(dec) * np.cos(ra - phi)
good = np.where(mu >= np.cos(radius))
ipix = pvec[good]
rapix = phi[good]
decpix = thetac[good]
npix = np.size(ipix)
return {"res": res, "nside": nside, "npix": npix, "ipix": ipix, "rapix": rapix, "decpix": decpix}
@staticmethod
[docs]
def generate_star_grid(res, myWCS, scapar={"nside": 4088, "pix_arcsec": 0.11}):
"""
Makes a grid of positions to inject simulated sources into an SCA image.
Parameters
----------
res : int
HEALPix resolution (nside == 2**res)
myWCS : astropy.wcs.WCS or pyimcom.wcsutil.PyIMCOM_WCS
World coordinate system for the SCA
scapar : dict, optional
Should have keyword/values:
* 'sca_nside' : int, side length of SCA
* 'pix_arcsec' : float, reference pixel scale in arcsec
Returns
-------
np.array of int
array of HEALPix pixel indices
np.array of float
x positions on the SCA
np.array of float
y positions on the SCA
np.array of float
right ascensions of the pixels drawn
np.array of float
declinations of the pixels drawn
"""
# SCA side length in radians
degree = np.pi / 180
sidelength = scapar["nside"] * scapar["pix_arcsec"] / 3600 * degree
radius = sidelength
# and center position
cpos_local = (scapar["nside"] - 1) / 2
cpos_world = myWCS.all_pix2world([[cpos_local, cpos_local]], 0)[0]
ra_ctr = cpos_world[0] * degree
dec_ctr = cpos_world[1] * degree
# stars
stargrid = GridInject.make_sph_grid(res, ra_ctr, dec_ctr, radius)
# and positions in the SCA image
px, py = myWCS.all_world2pix(stargrid["rapix"] / degree, stargrid["decpix"] / degree, 0)
return (stargrid["ipix"], px, py, stargrid["rapix"] / degree, stargrid["decpix"] / degree)
@staticmethod
[docs]
def make_image_from_grid(res, inpsf, idsca, obsdata, mywcs, nside_sca, inpsf_oversamp):
"""
Make an SCA image with this grid of stars with a PSF from a specified file with unit flux.
Parameters
----------
res : int
HEALPix resolution (nside == 2**res)
inpsf : function
Should be an InImage.get_psf_pos method
idsca : (int,int)
Observation ID, SCA pair (SCA in 1 .. 18)
obsdata : astropy.io.fits.FITS_rec
Observation table (needed for some data formats).
mywcs : astropy.wcs.WCS or pyimcom.wcsutil.PyIMCOM_WCS
World coordinate system for the SCA
nside_sca : int
Side length of the SCA (in pixels)
inpsf_oversamp: float or int
Oversampling factor of the PSF image (relative to native).
Returns
-------
np.array
The image of the SCA with the stars drawn. Shape (`nside_sca`, `nside_sca`).
"""
thisimage = np.zeros((nside_sca, nside_sca))
ipix, xsca, ysca, rapix, decpix = GridInject.generate_star_grid(res, mywcs)
p = 6 # padding for interpolation (n/2+1 for nxn interpolation kernel)
d = 64 # region to draw
for istar in range(len(ipix)):
thispsf = inpsf((rapix[istar], decpix[istar])) # now with PSF variation
this_xmax = min(nside_sca, int(xsca[istar]) + d)
this_xmin = max(0, int(xsca[istar]) - d)
this_ymax = min(nside_sca, int(ysca[istar]) + d)
this_ymin = max(0, int(ysca[istar]) - d)
pnx = this_xmax - this_xmin
pny = this_ymax - this_ymin
if pnx < 1 or pny < 1:
continue
# draw at this location
inX = np.zeros((pny, pnx))
inY = np.zeros((pny, pnx))
inX[:, :] = (np.array(range(this_xmin, this_xmax)) - xsca[istar])[None, :]
inY[:, :] = (np.array(range(this_ymin, this_ymax)) - ysca[istar])[:, None]
interp_array = np.zeros((1, pny * pnx))
(ny, nx) = np.shape(thispsf)
iD5512C(
np.pad(thispsf, p).reshape((1, ny + 2 * p, nx + 2 * p)),
inpsf_oversamp * inX.flatten() + (nx - 1) / 2.0 + p,
inpsf_oversamp * inY.flatten() + (ny - 1) / 2.0 + p,
interp_array,
)
thisimage[this_ymin:this_ymax, this_xmin:this_xmax] = (
thisimage[this_ymin:this_ymax, this_xmin:this_xmax]
+ interp_array.reshape((pny, pnx)) * inpsf_oversamp**2
)
return thisimage
[docs]
class CplxNoise:
"""
Utilities to generate 1/f noise.
Based on fluffy-garbanzo/inject_complex_noise.py.
Methods
-------
noise_1f_frame
1/f noise generator (staticmethod)
"""
@staticmethod
[docs]
def noise_1f_frame(seed):
"""
A simple 1/f noise generator, independent in each output channel.
Parameters
----------
seed : int
The random number generator seed.
Returns
-------
np.array
An image of the 1/f noise.
"""
this_array = np.zeros((4096, 4096))
rng = np.random.default_rng(seed)
len_ = 8192 * 128
# get frequencies and amplitude ~ sqrt{power}
freq = np.linspace(0, 1 - 1.0 / len_, len_)
freq[len_ // 2 :] -= 1.0
amp = (1.0e-99 + np.abs(freq * len_)) ** (-0.5)
amp[0] = 0.0
for ch in range(32):
# get array
ftsignal = np.zeros((len_,), dtype=np.complex128)
ftsignal[:] = rng.normal(loc=0.0, scale=1.0, size=(len_,))
ftsignal[:] += 1j * rng.normal(loc=0.0, scale=1.0, size=(len_,))
ftsignal *= amp
block = np.fft.fft(ftsignal).real[: len_ // 2] / np.sqrt(2.0)
block -= np.mean(block)
xmin = ch * 128
xmax = xmin + 128
# mapping into the image depends on L->R or R->L read order
if ch % 2 == 0:
this_array[:, xmin:xmax] = block.reshape((4096, 128))
else:
this_array[:, xmin:xmax] = block.reshape((4096, 128))[:, ::-1]
return this_array[4:4092, 4:4092].astype(np.float32)
[docs]
class Mask:
"""
Utilities for permanent and cosmic ray masks.
Methods
-------
randmask
Simulated a random mask (staticmethod)
load_permanent_mask
Loads a permanent mask from a block file (staticmethod)
load_mask_from_maskfile
Loads a permanent mask from a file (staticmethod)
load_cr_mask
Simulate a cosmic ray mask (staticmethod)
"""
@staticmethod
[docs]
def randmask(idsca, pcut, hitinfo=None):
"""
Makes a psuedorandom mask that randomly removes groups of pixels (intended for CR simulation).
Parameters
----------
idsca : (int, int)
Observation ID, SCA pair.
pcut : float
Probability that any given pixel is hit.
hitinfo : dict or None, optional
For future expansion, right now always use None.
Returns
-------
np.array of bool
An image of True for good pixels and False for CR-affected pixels.
"""
seed = 100000000 + idsca[0]
rng = np.random.default_rng(seed)
pad = 10
g = rng.uniform(size=(18, 2 * pad + Stn.sca_nside, 2 * pad + Stn.sca_nside))[idsca[1] - 1, :, :]
crhits = np.where(g < pcut, 1.0, 0.0) # hit mask
# different ways of making a mask
if hitinfo is None:
return np.where(
convolve(crhits, np.ones((3, 3)), mode="constant")[pad:-pad, pad:-pad] < 0.5, True, False
)
@staticmethod
[docs]
def load_permanent_mask(block):
"""
Builds a permanent mask from a FITS file referenced in the configuration.
Parameters
----------
block : pyimcom.coadd.Block
The block that we are coadding.
Returns
-------
np.array of bool
An image of True for good pixels and False for bad pixels.
"""
if block.cfg.permanent_mask is None:
print("No permanent mask")
permanent_mask = None
else:
# permanent mask is 'True' if the pixel should be used.
# 'GOODVAL' keyword in the input FITS file allows us to indicate
# whether an unflagged pixel is all 0's (GOODVAL=0) or 1's (GOODVAL!=0 or missing)
with fits.open(block.cfg.permanent_mask) as f:
if f[0].header.get("GOODVAL") == 0:
permanent_mask = np.where(f[0].data == 0, True, False)
else:
permanent_mask = np.where(f[0].data, True, False)
nonzero_count = np.count_nonzero(permanent_mask)
print(
"Permanent mask loaded --> ",
nonzero_count,
"good pixels",
nonzero_count / (18 * 4088**2) * 100,
"%",
)
return permanent_mask
@staticmethod
[docs]
def load_mask_from_maskfile(cfg, obsdata, idsca):
"""
Loads a mask from a cached mask file.
Parameters
----------
cfg : pyimcom.config.Config
The configuration file.
obsdata : astropy.io.fits.FITS_rec
Observation table (needed for some data formats).
idsca : (int,int)
Tuple of observation ID and SCA number (SCA in 1..18)
Returns
-------
np.array of bool
An image of True for good pixels and False for bad pixels.
"""
without_maskfiles = ["dc2_sim", "anlsim"] # old formats without input masks
if cfg.informat not in without_maskfiles:
filename = _get_sca_imagefile(
cfg.inpath, idsca, obsdata, cfg.informat, extraargs={"type": "mask"}
)
if filename[-5:] == ".fits":
with fits.open(filename) as f:
return f["MASK"].data == 0
if filename[-5:] == ".asdf":
with asdf.open(filename) as f:
if "mask" not in f:
# revert to old file
warnings.warn("No mask in ASDF file, looking for FITS mask.")
with fits.open(filename[:-5] + "_mask.fits") as ff:
return ff["MASK"].data == 0
return f["mask"] == 0
raise ValueError("Unrecognized file extension for mask file.")
# default return all good
return np.ones((Stn.sca_nside, Stn.sca_nside), dtype=bool)
@staticmethod
[docs]
def load_cr_mask(inimage):
"""
Generates a cosmic ray mask for an image (including lab noise CRs, if labnoise is used).
Parameters
----------
inimage : pyimcom.coadd.InImage
An input image structure.
Returns
-------
np.array of bool
An image of True for good pixels and False for CR-affected pixels.
"""
config = inimage.blk.cfg # shortcut
if config.cr_mask_rate > 0:
cr_mask = Mask.randmask(inimage.idsca, config.cr_mask_rate)
print("Cosmic ray mask: good pix --> ", np.count_nonzero(cr_mask), "/", 4088**2)
try:
idx = config.extrainput.index("labnoise")
except (KeyError, ValueError):
pass
else:
cr_mask = np.logical_and(cr_mask, np.abs(inimage.indata[idx]) < config.labnoisethreshold)
print("Lab noise threshold: good pix --> ", np.count_nonzero(cr_mask), "/", 4088**2)
else:
cr_mask = None
return cr_mask
[docs]
def _get_sca_imagefile(path, idsca, obsdata, format_, extraargs=None):
"""
Main function to get input file names.
Parameters
----------
path : str
Directory for the files.
idsca : (int, int)
Observation ID, SCA pair.
obsdata : astropy.io.fits.FITS_rec
Observation table (needed for some data formats).
format_ : str
Input file format type identifier from the configuration.
extraargs : dict or None, optional
Additional arguments that can be provided for some types of files.
Default is None.
Returns
-------
str or None
The file name (returns None if unrecognized format).
Notes
-----
The valid formats are currently:
* dc2_imsim
* anlsim
* L2_2506
If `sca` is -1, then returns a format string (i.e., with '{:d}' or similar
instead of the number itself).
"""
# all formats currently have no leading 0 on the SCA number
scastr = f"{idsca[1]:d}"
if idsca[1] == -1:
scastr = "{:d}"
# get the filter
filter = obsdata if isinstance(obsdata, str) else Stn.RomanFilters[obsdata["filter"][idsca[0]]]
# for Level 2 summer 2025 format
if format_ == "L2_2506":
out = path + f"/sim_L2_{filter:s}_{idsca[0]:d}_{scastr:s}.asdf"
# insert sim layers here
if extraargs is not None and "type" in extraargs:
if extraargs["type"] == "mask":
out = path + f"/sim_L2_{filter:s}_{idsca[0]:d}_{scastr:s}.asdf" # now in same file
if extraargs["type"] == "labnoise":
out = path + f"/labnoise/slope_{idsca[0]:d}_{scastr:s}.fits"
if extraargs["type"] == "truth":
out = path + f"/truth/Roman_WAS_truth_{filter:s}_{idsca[0]:d}_{scastr:s}.fits"
if extraargs["type"] == "noise":
out = path + f"/sim_L2_{filter:s}_{idsca[0]:d}_{scastr:s}_noise.asdf"
return out
# for the ANL sims
if format_ == "anlsim":
out = path + f"/simple/Roman_WAS_simple_model_{filter:s}_{idsca[0]:d}_{scastr:s}.fits"
# insert ANL sim layers here
if extraargs is not None and "type" in extraargs and extraargs["type"] == "labnoise":
out = path + f"/labnoise/slope_{idsca[0]:d}_{scastr:s}.fits"
return out
# right now this is the only other type defined
if format_ != "dc2_imsim":
return None
out = path + f"/simple/dc2_{filter:s}_{idsca[0]:d}_{scastr:s}.fits"
if extraargs is not None and "type" in extraargs:
if extraargs["type"] == "truth":
out = path + f"/truth/dc2_{filter:s}_{idsca[0]:d}_{scastr:s}.fits"
elif extraargs["type"] == "labnoise":
out = path + f"/labnoise/slope_{idsca[0]:d}_{scastr:s}.fits"
elif extraargs["type"] == "skyerr":
out = path + f"/simple/dc2_{filter:s}_{idsca[0]:d}_{scastr:s}.fits"
return out
[docs]
def check_if_idsca_exists(cfg, obsdata, idsca):
"""
Determines whether an observation (id,sca) pair exists.
Parameters
----------
cfg : pyimcom.config.Config
Configuration information.
obsdata : astropy.io.fits.FITS_rec
Observation table (needed for some data formats).
idsca : (int, int)
Observation ID, SCA pair.
Returns:
exists_ : bool
Whether the file exists.
fname : str
The file name.
"""
fname = _get_sca_imagefile(cfg.inpath, idsca, obsdata, cfg.informat)
exists_ = exists(fname)
return exists_, fname
[docs]
def get_all_data(inimage):
"""
Makes a 3D array of all the layers of an input image.
The indexing of the return array is [input layer (e.g., 0=sci or sim), y, x].
Parameters
----------
inimage : pyimcom.coadd.InImage
An input image structure.
Returns
-------
np.array
The input image; shape = (nlayer, nside_sca, nside_sca)
"""
# read arguments from InImage attributes
n_inframe = inimage.blk.cfg.n_inframe # number of input frames
idsca = inimage.idsca # which observation to use (tuple (obsid, sca) (sca in 1..18))
obsdata = inimage.blk.obsdata # observation data table (information needed for some formats)
path = inimage.blk.cfg.inpath # directory for the files
format_ = inimage.blk.cfg.informat # string describing type of file name
inwcs = inimage.inwcs # input WCS of *this* observation
inpsf = inimage.get_psf_pos # now this is a **function**
# input PSF information (to be passed to GalSimInject or GridInject routines if we draw objects)
inpsf_oversamp = inimage.blk.cfg.inpsf_oversamp
extrainput = (
inimage.blk.cfg.extrainput
) # make multiple maps (list of types, first should be None, rest strings)
# does the user want us to save the input layer cube?
InLayerCache = False
if bool(inimage.blk.cfg.inlayercache):
InLayerCache = True
inlayer_filepath = inimage.blk.cfg.inlayercache + f"_{idsca[0]:08d}_{idsca[1]:02d}.fits"
inlayer_lockpath = inlayer_filepath + ".lock"
lock = FileLock(inlayer_lockpath)
# try to load the data, if not available keep going
if InLayerCache:
try:
with lock.acquire(timeout=30):
if exists(inlayer_filepath):
print("loading input layer <<", inlayer_filepath)
with fits.open(inlayer_filepath) as f:
inimage.indata = f[0].data.astype(np.float32)
sys.stdout.flush()
return
except Timeout:
pass
# not available yet, we have to build it. start by making the input data array
inimage.indata = np.zeros((n_inframe, Stn.sca_nside, Stn.sca_nside), dtype=np.float32)
# now fill in each slice in *this* observation
# (missing files are blank)
filename = _get_sca_imagefile(path, idsca, obsdata, format_)
if exists(filename):
# these input formats both use the SCI data frame and have the sky embedded in the header
if format_ in ["dc2_imsim", "anlsim"]:
with fits.open(filename) as f:
inimage.indata[0, :, :] = f["SCI"].data - float(f["SCI"].header["SKY_MEAN"])
if format_ in ["L2_2506"]:
with asdf.open(filename) as f:
inimage.indata[0, :, :] = f["roman"]["data"]
#
# now for the extra inputs
# if n_inframe == 1: return <-- for saving purposes, don't want to exit here
for i in range(1, n_inframe):
sys.stdout.flush()
# truth image (no noise)
if (
extrainput[i].casefold() == "truth".casefold()
or extrainput[i][:6].casefold() == "truth,".casefold()
):
rescale = 1.0
m = re.search(r"^truth,(.+)$", extrainput[i], re.IGNORECASE)
if m:
rescale = float(m.group(1))
filename = _get_sca_imagefile(path, idsca, obsdata, format_, extraargs={"type": "truth"})
if exists(filename):
if filename[-5:] == ".fits":
with fits.open(filename) as f:
inimage.indata[i, :, :] = f[0].data
if format_ == "L2_2506":
# FITS truth files have to be flipped
if idsca[1] % 3 == 0:
inimage.indata[i, :, :] = inimage.indata[i, :, ::-1]
else:
inimage.indata[i, :, :] = inimage.indata[i, ::-1, :]
if filename[-5:] == ".asdf":
with fits.open(filename) as f:
inimage.indata[i, :, :] = f["roman"]["data"]
# scaling
inimage.indata[i, :, :] *= rescale
# white noise frames (generated from RNG, not file)
m = re.search(r"^whitenoise(\d+)$", extrainput[i], re.IGNORECASE)
if m:
q = int(m.group(1))
seed = 1000000 * (18 * q + idsca[1]) + idsca[0]
print(f"noise rng: frame_q={q:d}, seed={seed:d}")
rng = np.random.default_rng(seed)
inimage.indata[i, :, :] = rng.normal(loc=0.0, scale=1.0, size=(Stn.sca_nside, Stn.sca_nside))
del rng
# 1/f noise frames (generated from RNG, not file)
m = re.search(r"^1fnoise(\d+)$", extrainput[i], re.IGNORECASE)
if m:
q = int(m.group(1))
seed = 1000000 * (18 * q + idsca[1]) + idsca[0]
print(f"noise rng: frame_q={q:d}, seed={seed:d}", "--> 1/f")
inimage.indata[i, :, :] = CplxNoise.noise_1f_frame(seed)
# lab noise frames (if applicable)
if extrainput[i].casefold() == "labnoise".casefold():
filename = _get_sca_imagefile(path, idsca, obsdata, format_, extraargs={"type": "labnoise"})
print("lab noise: searching for " + filename)
if exists(filename):
with fits.open(filename) as f:
try:
inimage.indata[i, :, :] = f[0].data
except (KeyError, ValueError, RuntimeError):
inimage.indata[i, :, :] = f[0].data[4:4092, 4:4092]
print(
" -> pulled out 4088x4088 subregion: 10th & 90th percentiles = "
f"{np.percentile(f[0].data[4:4092, 4:4092], 10):6.3f} "
f"{np.percentile(f[0].data[4:4092, 4:4092], 90):6.3f}"
)
if format_ == "L2_2506":
# FITS labnoise files have to be flipped
if idsca[1] % 3 == 0:
inimage.indata[i, :, :] = inimage.indata[i, :, ::-1]
else:
inimage.indata[i, :, :] = inimage.indata[i, ::-1, :]
else:
print("Warning: labnoise file not found, skipping ...")
# skyerr (not supported in L2_2506)
if extrainput[i].casefold() == "skyerr".casefold():
filename = _get_sca_imagefile(path, idsca, obsdata, format_, extraargs={"type": "skyerr"})
if exists(filename):
with fits.open(filename) as f:
inimage.indata[i, :, :] = f["ERR"].data - float(f["SCI"].header["SKY_MEAN"])
# C routine star grid
m = re.search(r"^cstar(\d+)$", extrainput[i], re.IGNORECASE)
if m:
res = int(m.group(1))
print("making grid using C routines: ", res, idsca)
inimage.indata[i, :, :] = GridInject.make_image_from_grid(
res, inpsf, idsca, obsdata, inwcs, Stn.sca_nside, inpsf_oversamp
)
# noisy star grid
# nstar<resolution>,<star_intensity>,<background>,<seed_index>
m = re.search(r"^nstar(\d+),", extrainput[i], re.IGNORECASE)
if m:
res = int(m.group(1))
extargs = extrainput[i].split(",")[1:]
tot_int = float(extargs[0])
bg = float(extargs[1])
q = int(extargs[2])
seed = 1000000 * (18 * q + idsca[1]) + idsca[0]
print(f"noise rng: frame_q={q:d}, seed={seed:d}")
rng = np.random.default_rng(seed)
print(
"making noisy stars: ",
res,
idsca,
" total brightness =",
tot_int,
"background =",
bg,
"seed index =",
q,
)
brightness = GridInject.make_image_from_grid(
res, inpsf, idsca, obsdata, inwcs, Stn.sca_nside, inpsf_oversamp
)
print("min/max brightness =", np.amin(brightness), np.amax(brightness))
sys.stdout.flush()
inimage.indata[i, :, :] = rng.poisson(lam=brightness * tot_int + bg) - bg
del rng
# galsim star grid
m = re.search(r"^gsstar(\d+)$", extrainput[i], re.IGNORECASE)
if m:
res = int(m.group(1))
print("making grid using GalSim: ", res, idsca)
inimage.indata[i, :, :] = GalSimInject.galsim_star_grid(
res, inwcs, inpsf, idsca, obsdata, Stn.sca_nside, inpsf_oversamp
)
# galsim angle-based transient star grid
# (a test of what happens when a point source is present in one pass but not the other)
m = re.search(r"^gstrstar(\d+)$", extrainput[i], re.IGNORECASE)
if m:
res = int(m.group(1))
print("making grid using GalSim: ", res, idsca)
inimage.indata[i, :, :] = GalSimInject.galsim_star_grid(
res,
inwcs,
inpsf,
idsca,
obsdata,
Stn.sca_nside,
inpsf_oversamp,
extraargs={"angleTransient": True},
)
# galsim field-dependent amplitude star grid
# (a test of what a source does if its SED causes it to be brighter (+) or fainter (-) as one moves
# away from zero field angle)
m = re.search(r"^gsfdstar(\d+),(.+)+$", extrainput[i], re.IGNORECASE)
if m:
res = int(m.group(1))
amp = float(m.group(2))
print("making grid using GalSim: ", res, idsca)
inimage.indata[i, :, :] = GalSimInject.galsim_star_grid(
res,
inwcs,
inpsf,
idsca,
obsdata,
Stn.sca_nside,
inpsf_oversamp,
extraargs={"FieldDependentModulation": amp},
)
# galsim extobj grid
m = re.search(r"^gsext(\d+)", extrainput[i], re.IGNORECASE)
if m:
res = int(m.group(1))
extargs = extrainput[i].split(",")[1:]
print("making grid using GalSim: ", res, idsca, "extended object type:", extargs)
inimage.indata[i, :, :] = GalSimInject.galsim_extobj_grid(
res, inwcs, inpsf, Stn.sca_nside, inpsf_oversamp, extraargs=extargs
)
# noise realizations saved from romanimpreprocess
# supported in L2_2506 and later
m = re.search(r"^noise,(\S+)", extrainput[i], re.IGNORECASE)
if m:
noiselabel = str(m.group(1))
filename = _get_sca_imagefile(path, idsca, obsdata, format_, extraargs={"type": "noise"})
print("extracting noise layer " + noiselabel + " from " + filename)
if exists(filename):
with asdf.open(filename) as f:
# figure out which noise slice to use
noise_realizations = f["config"]["NOISE"]["LAYER"]
jn_use = -1
for jn in range(len(noise_realizations)):
if noise_realizations[jn] == noiselabel:
if jn_use >= 0:
warnings.warn(
"label "
+ noiselabel
+ " repeated in "
+ filename
+ ": using first instance"
)
break
jn_use = jn
# now extract that slice
if jn_use >= 0:
inimage.indata[i, :, :] = f["noise"][jn_use, :, :]
else:
warnings.warn(
"WARNING: cannot find slice " + noiselabel + " in " + filename + ": continuing"
)
else:
warnings.warn("WARNING: cannot find noise file: " + filename + ": continuing")
sys.stdout.flush()
# this is the end of building the data cube
# save it if specified in the configuration file
if InLayerCache:
try:
with lock.acquire(timeout=1):
if not exists(inlayer_filepath):
print("saving input layer >>", inlayer_filepath)
pr = fits.PrimaryHDU(inimage.indata)
# the sciwcs HDU is simply here to save a 2D image with the science WCS --- it doesn't
# have science data in it need relax=True to write SIP coefficients
#
# if a gwcs is used, saves that in an ancillary ASDF file and writes 'GWCS' in the FITS
# header
is_astropy_wcs = True
inwcs = inimage.inwcs
if isinstance(inwcs, PyIMCOM_WCS):
if inwcs.constructortype == "GWCS":
is_astropy_wcs = False
sciwcs = fits.ImageHDU()
sciwcs.header["WCSTYPE"] = "GWCS"
fits.HDUList([pr, sciwcs]).writeto(inlayer_filepath)
af = asdf.AsdfFile()
af["wcs"] = inwcs.obj
af.write_to(inlayer_filepath[:-5] + "_wcs.asdf")
else:
inwcs = inwcs.obj
if is_astropy_wcs:
sciwcs = fits.ImageHDU(
np.zeros((Stn.sca_nside, Stn.sca_nside), dtype=np.uint8),
header=inwcs.to_header(relax=True),
name="SCIWCS",
)
sciwcs["WCSTYPE"] = "FITS"
fits.HDUList([pr, sciwcs]).writeto(inlayer_filepath)
except Timeout:
pass