Source code for pyimcom.diagnostics.starcube_nonoise

"""
Code to generate a catalog of injected stars and their properties in the coadded image.

Functions
---------
gen_starcube_nonoise
    Extracts the noiseless star cube and the moments.

"""

import json
import re
import sys
import warnings
from os.path import exists

import galsim
import healpy
import numpy as np
from astropy import wcs
from astropy.io import fits

from ..compress.compressutils import ReadFile
from ..config import Config
from .outimage_utils.helper import HDU_to_bels


[docs] def gen_starcube_nonoise(infile_fcn, outstem, nblockmax=100): """ Extracts the noiseless star cube and the moments. Arguments --------- infile_fcn : function A function that returns the input file path given block (ix,iy). outstem : str Output file stem. nblockmax : int, optional Maximum number of blocks on each axis of a mosaic. Returns ------- output : dict The return parameters dictionary contains two strings: key 'STARCAT' points to the file name for the star catalog, and key 'FIDHIST' points to the file name for the fidelity histogram. """ output = {"STARCAT": None} bd = 40 # padding size bd2 = 8 # fidelity extraction size # if needed, shrink the padding size try: configStruct = Config(infile_fcn(0, 0), inmode="block") n2_ = configStruct.n2 print("# n2 =", n2_) if n2_ < bd: bd = n2_ except (FileNotFoundError, ValueError): pass ncol = 22 # number of columns in star catalog pos = np.zeros((1, ncol)) # prototype object (will be stripped at the end) image = np.zeros((1, bd * 2 - 1, bd * 2 - 1), dtype=np.float32) outfile_g = outstem + "_StarCat_galsim.fits" fhist = np.zeros((81,), dtype=np.uint32) is_first = True for ibx in range(nblockmax): for iby in range(nblockmax): try: infile = infile_fcn(ibx, iby) except (FileNotFoundError, ValueError): continue # extract information from the header of the first file if is_first: with ReadFile(infile) as f: n = np.shape(f[0].data)[-1] # size of output images config = "" for g in f["CONFIG"].data["text"].tolist(): config += g + " " configStruct = json.loads(config) blocksize = ( int(configStruct["OUTSIZE"][0]) * int(configStruct["OUTSIZE"][1]) * float(configStruct["OUTSIZE"][2]) / 3600.0 * np.pi / 180 ) # radians rs = 1.5 * blocksize / np.sqrt(2.0) # search radius n2 = int(configStruct["OUTSIZE"][1]) # will be used for coverage outscale = float(configStruct["OUTSIZE"][2]) # in arcsec force_scale = 0.40 / outscale # in output pixels # padding region around the edge bdpad = int(configStruct["OUTSIZE"][1]) * int(configStruct["PAD"]) # figure out which layer we want layers = [""] + configStruct["EXTRAINPUT"] print("#", layers) for i in range(len(layers))[::-1]: m = re.match(r"^gsstar(\d+)$", layers[i]) if m: use_slice = i res = int(m.group(1)) print( "# using layer", use_slice, "resolution", res, "output pix =", outscale, "arcsec n=", n, ) print("# rs=", rs) is_first = False # don't do this again if not exists(infile): continue # get WCS and data with ReadFile(infile, layers=[use_slice]) as f: mywcs = wcs.WCS(f[0].header) map = f[0].data[0, use_slice, :, :] wt = np.sum(np.where(f["INWEIGHT"].data[0, :, :, :] > 0.01, 1, 0), axis=0) fmap = ( f["FIDELITY"].data[0, :, :].astype(np.float32) * HDU_to_bels(f["FIDELITY"]) / 0.1 ) # convert to dB fmap = np.floor(fmap).astype(np.int16) # and round to integer for fy in range(81): fhist[fy] += np.count_nonzero(fmap[bdpad:-bdpad, bdpad:-bdpad] == fy) # extract HEALPix pixels with the stars ra_cent, dec_cent = mywcs.all_pix2world( [(n - 1) / 2], [(n - 1) / 2], [0.0], [0.0], 0, ra_dec_order=True ) ra_cent = ra_cent[0] dec_cent = dec_cent[0] vec = healpy.ang2vec(ra_cent, dec_cent, lonlat=True) qp = healpy.query_disc(2**res, vec, rs, nest=False) ra_hpix, dec_hpix = healpy.pix2ang(2**res, qp, nest=False, lonlat=True) npix = len(ra_hpix) x, y, z1, z2 = mywcs.all_world2pix(ra_hpix, dec_hpix, np.zeros((npix,)), np.zeros((npix,)), 0) xi = np.rint(x).astype(np.int16) yi = np.rint(y).astype(np.int16) grp = np.where( np.logical_and( np.logical_and(xi >= bdpad, xi < n - bdpad), np.logical_and(yi >= bdpad, yi < n - bdpad) ) ) ra_hpix = ra_hpix[grp] dec_hpix = dec_hpix[grp] x = x[grp] y = y[grp] npix = len(x) newpos = np.zeros((npix, ncol)) xi = np.rint(x).astype(np.int16) yi = np.rint(y).astype(np.int16) # position information dx = x - xi dy = y - yi newpos[:, 0] = ra_hpix newpos[:, 1] = dec_hpix newpos[:, 2] = ibx newpos[:, 3] = iby newpos[:, 4] = x newpos[:, 5] = y newpos[:, 6] = xi newpos[:, 7] = yi newpos[:, 8] = dx newpos[:, 9] = dy newimage = np.zeros((npix, bd * 2 - 1, bd * 2 - 1)) print(ibx, iby, infile, npix) for k in range(npix): try: newimage[k, :, :] = map[yi[k] + 1 - bd : yi[k] + bd, xi[k] + 1 - bd : xi[k] + bd] except ValueError: # we end up here if there is an overflow warnings.warn("Postage stamp overflow, padding with zeros.") newimage[k, :, :] = np.pad(map, bd)[ yi[k] + 1 : yi[k] + 2 * bd, xi[k] + 1 : xi[k] + 2 * bd ] # PSF shape try: moms = galsim.Image(newimage[k, :, :]).FindAdaptiveMom() except (RuntimeError, galsim.errors.GalSimHSMError): continue newpos[k, 10] = moms.moments_amp newpos[k, 11] = moms.moments_centroid.x - bd - dx[k] newpos[k, 12] = moms.moments_centroid.y - bd - dy[k] newpos[k, 13] = moms.moments_sigma newpos[k, 14] = moms.observed_shape.g1 newpos[k, 15] = moms.observed_shape.g2 # higher moments x_, y_ = np.meshgrid( np.array(range(1, bd * 2)) - moms.moments_centroid.x, np.array(range(1, bd * 2)) - moms.moments_centroid.y, ) e1 = moms.observed_shape.e1 e2 = moms.observed_shape.e2 Mxx = moms.moments_sigma**2 * (1 + e1) / np.sqrt(1 - e1**2 - e2**2) Myy = moms.moments_sigma**2 * (1 - e1) / np.sqrt(1 - e1**2 - e2**2) Mxy = moms.moments_sigma**2 * e2 / np.sqrt(1 - e1**2 - e2**2) D = Mxx * Myy - Mxy**2 zeta = D * (Mxx + Myy + 2 * np.sqrt(D)) u_ = ((Myy + np.sqrt(D)) * x_ - Mxy * y_) / zeta**0.5 v_ = ((Mxx + np.sqrt(D)) * y_ - Mxy * x_) / zeta**0.5 wti = newimage[k, :, :] * np.exp(-0.5 * (u_**2 + v_**2)) newpos[k, 16] = np.sum(wti * (u_**4 - v_**4)) / np.sum(wti) newpos[k, 17] = 2 * np.sum(wti * (u_**3 * v_ + u_ * v_**3)) / np.sum(wti) # moments with forced scale length wti2 = newimage[k, :, :] * np.exp(-0.5 * (x_**2 + y_**2) / force_scale**2) newpos[k, 18] = np.sum(wti2 * (x_**2 - y_**2)) / np.sum(wti2) / force_scale**2 newpos[k, 19] = np.sum(wti2 * (2 * x_ * y_)) / np.sum(wti2) / force_scale**2 # fidelity newpos[k, 20] = np.mean(fmap[yi[k] + 1 - bd2 : yi[k] + bd2, xi[k] + 1 - bd2 : xi[k] + bd2]) # coverage newpos[k, 21] = wt[yi[k] // n2, xi[k] // n2] # flush sys.stdout.flush() # end loop over simulated stars pos = np.concatenate((pos, newpos), axis=0) image = np.concatenate((image, newimage.astype(np.float32)), axis=0) # end block loop # strip fictitious first object pos = pos[1:, :] image = image[1:, :, :] fits.HDUList([fits.PrimaryHDU(image)]).writeto(outfile_g, overwrite=True) ofile = outstem + "_StarCat.txt" np.savetxt(ofile, pos, header=f" {np.median(pos[:,13]):14.8E}") output["STARCAT"] = ofile # fidelity histogram ofile = outstem + "_fidHist.txt" with open(ofile, "w") as f: for fy in range(20, 81): f.write(f"{fy:2d} {fhist[fy]/np.sum(fhist):8.6f} {np.sum(fhist[:fy+1])/np.sum(fhist):8.6f}") output["FIDHIST"] = ofile return output